Support UDP multicast. (#618)

Motivation:

A large number of very useful protocols are implemented using multicast
with UDP. As a result, it would be helpful to add support for joining and
leaving IP multicast groups using SwiftNIO.

Modifications:

- Defines a MulticastChannel protocol for channels that support joining and
  leaving multicast groups.
- Adds an implementation of MulticastChannel to DatagramChannel.
- Adds a interfaceIndex property to NIONetworkInterface.
- Adds if_nametoindex to the Posix enum.
- Adds a isMulticast computed property to SocketAddress
- Adds a demo multicast chat application.
- Add a number of multicast-related socket options to SocketOptionProvider.

Result:

NIO users will be able to write channels that handle multicast UDP.
This commit is contained in:
Cory Benfield 2018-09-24 10:24:09 +01:00 committed by GitHub
parent 79267e893b
commit c73eb57694
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 964 additions and 7 deletions

View File

@ -53,6 +53,8 @@ var targets: [PackageDescription.Target] = [
dependencies: ["NIO", "NIOHTTP1", "NIOWebSocket"]),
.target(name: "NIOPerformanceTester",
dependencies: ["NIO", "NIOHTTP1", "NIOFoundationCompat"]),
.target(name: "NIOMulticastChat",
dependencies: ["NIO"]),
.testTarget(name: "NIOTests",
dependencies: ["NIO", "NIOFoundationCompat"]),
.testTarget(name: "NIOConcurrencyHelpersTests",
@ -77,6 +79,7 @@ let package = Package(
targets: ["NIOWebSocketServer"]),
.executable(name: "NIOPerformanceTester",
targets: ["NIOPerformanceTester"]),
.executable(name: "NIOMulticastChat", targets: ["NIOMulticastChat"]),
.library(name: "NIO", targets: ["NIO"]),
.library(name: "NIOTLS", targets: ["NIOTLS"]),
.library(name: "NIOHTTP1", targets: ["NIOHTTP1"]),

View File

@ -331,12 +331,29 @@ public enum ChannelError: Error {
}
/// This should be inside of `ChannelError` but we keep it separate to not break API.
// TODO: For 2.0: bring this inside of `ChannelError`
// TODO: For 2.0: bring this inside of `ChannelError`. https://github.com/apple/swift-nio/issues/620
public enum ChannelLifecycleError: Error {
/// An operation that was inappropriate given the current `Channel` state was attempted.
case inappropriateOperationForState
}
/// This should be inside of `ChannelError` but we keep it separate to not break API.
// TODO: For 2.0: bring this inside of `ChannelError`. https://github.com/apple/swift-nio/issues/620
public enum MulticastError: Error {
/// The local address of the `Channel` could not be determined.
case unknownLocalAddress
/// The address family of the multicast group was not valid for this `Channel`.
case badMulticastGroupAddressFamily
/// The address family of the provided multicast group join is not valid for this `Channel`.
case badInterfaceAddressFamily
/// An attempt was made to join a multicast group that does not correspond to a multicast
/// address.
case illegalMulticastAddress(SocketAddress)
}
extension ChannelError: Equatable {
public static func ==(lhs: ChannelError, rhs: ChannelError) -> Bool {
switch (lhs, rhs) {

View File

@ -60,6 +60,9 @@ public final class NIONetworkInterface {
/// instead.
public let pointToPointDestinationAddress: SocketAddress?
/// The index of the interface, as provided by `if_nametoindex`.
public let interfaceIndex: Int
/// Create a brand new network interface.
///
/// This constructor will fail if NIO does not understand the format of the underlying
@ -88,6 +91,12 @@ public final class NIONetworkInterface {
self.broadcastAddress = nil
self.pointToPointDestinationAddress = nil
}
do {
self.interfaceIndex = Int(try Posix.if_nametoindex(caddr.ifa_name))
} catch {
return nil
}
}
}
@ -105,6 +114,7 @@ extension NIONetworkInterface: Equatable {
lhs.address == rhs.address &&
lhs.netmask == rhs.netmask &&
lhs.broadcastAddress == rhs.broadcastAddress &&
lhs.pointToPointDestinationAddress == rhs.pointToPointDestinationAddress
lhs.pointToPointDestinationAddress == rhs.pointToPointDestinationAddress &&
lhs.interfaceIndex == rhs.interfaceIndex
}
}

View File

@ -0,0 +1,91 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
/// A `MulticastChannel` is a `Channel` that supports IP multicast operations: that is, a channel that can join multicast
/// groups.
///
/// - note: As with `Channel`, all operations on a `MulticastChannel` are thread-safe.
public protocol MulticastChannel: Channel {
/// Request that the `MulticastChannel` join the multicast group given by `group`.
///
/// - parameters:
/// - group: The IP address corresponding to the relevant multicast group.
/// - promise: The `EventLoopPromise` that will be notified once the operation is complete, or
/// `nil` if you are not interested in the result of the operation.
func joinGroup(_ group: SocketAddress, promise: EventLoopPromise<Void>?)
/// Request that the `MulticastChannel` join the multicast group given by `group` on the interface
/// given by `interface`.
///
/// - parameters:
/// - group: The IP address corresponding to the relevant multicast group.
/// - interface: The interface on which to join the given group, or `nil` to allow the kernel to choose.
/// - promise: The `EventLoopPromise` that will be notified once the operation is complete, or
/// `nil` if you are not interested in the result of the operation.
func joinGroup(_ group: SocketAddress, interface: NIONetworkInterface?, promise: EventLoopPromise<Void>?)
/// Request that the `MulticastChannel` leave the multicast group given by `group`.
///
/// - parameters:
/// - group: The IP address corresponding to the relevant multicast group.
/// - promise: The `EventLoopPromise` that will be notified once the operation is complete, or
/// `nil` if you are not interested in the result of the operation.
func leaveGroup(_ group: SocketAddress, promise: EventLoopPromise<Void>?)
/// Request that the `MulticastChannel` leave the multicast group given by `group` on the interface
/// given by `interface`.
///
/// - parameters:
/// - group: The IP address corresponding to the relevant multicast group.
/// - interface: The interface on which to leave the given group, or `nil` to allow the kernel to choose.
/// - promise: The `EventLoopPromise` that will be notified once the operation is complete, or
/// `nil` if you are not interested in the result of the operation.
func leaveGroup(_ group: SocketAddress, interface: NIONetworkInterface?, promise: EventLoopPromise<Void>?)
}
// MARK:- Default implementations for MulticastChannel
public extension MulticastChannel {
func joinGroup(_ group: SocketAddress, promise: EventLoopPromise<Void>?) {
self.joinGroup(group, interface: nil, promise: promise)
}
func joinGroup(_ group: SocketAddress) -> EventLoopFuture<Void> {
let promise: EventLoopPromise<Void> = self.eventLoop.newPromise()
self.joinGroup(group, promise: promise)
return promise.futureResult
}
func joinGroup(_ group: SocketAddress, interface: NIONetworkInterface?) -> EventLoopFuture<Void> {
let promise: EventLoopPromise<Void> = self.eventLoop.newPromise()
self.joinGroup(group, interface: interface, promise: promise)
return promise.futureResult
}
func leaveGroup(_ group: SocketAddress, promise: EventLoopPromise<Void>?) {
self.leaveGroup(group, interface: nil, promise: promise)
}
func leaveGroup(_ group: SocketAddress) -> EventLoopFuture<Void> {
let promise: EventLoopPromise<Void> = self.eventLoop.newPromise()
self.leaveGroup(group, promise: promise)
return promise.futureResult
}
func leaveGroup(_ group: SocketAddress, interface: NIONetworkInterface?) -> EventLoopFuture<Void> {
let promise: EventLoopPromise<Void> = self.eventLoop.newPromise()
self.leaveGroup(group, interface: interface, promise: promise)
return promise.futureResult
}
}

View File

@ -310,3 +310,29 @@ extension SocketAddress: Equatable {
}
}
extension SocketAddress {
/// Whether this `SocketAddress` corresponds to a multicast address.
public var isMulticast: Bool {
switch self {
case .unixDomainSocket:
// No multicast on unix sockets.
return false
case .v4(let v4Addr):
// For IPv4 a multicast address is in the range 224.0.0.0/4.
// The easy way to check if this is the case is to just mask off
// the address.
let v4WireAddress = v4Addr.address.sin_addr.s_addr
let mask = in_addr_t(0xF000_0000).bigEndian
let subnet = in_addr_t(0xE000_0000).bigEndian
return v4WireAddress & mask == subnet
case .v6(let v6Addr):
// For IPv6 a multicast address is in the range ff00::/8.
// Here we don't need a bitmask, as all the top bits are set,
// so we can just ask for equality on the top byte.
var v6WireAddress = v6Addr.address.sin6_addr
return withUnsafeBytes(of: &v6WireAddress) { $0[0] == 0xff }
}
}
}

View File

@ -728,3 +728,123 @@ extension DatagramChannel: CustomStringConvertible {
return "DatagramChannel { selectable = \(self.selectable), localAddress = \(self.localAddress.debugDescription), remoteAddress = \(self.remoteAddress.debugDescription) }"
}
}
extension DatagramChannel: MulticastChannel {
/// The socket options for joining and leaving multicast groups are very similar.
/// This enum allows us to write a single function to do all the work, and then
/// at the last second pull out the correct socket option name.
private enum GroupOperation {
/// Join a multicast group.
case join
/// Leave a multicast group.
case leave
/// Given a socket option level, returns the appropriate socket option name for
/// this group operation.
///
/// - parameters:
/// - level: The socket option level. Must be one of `IPPROTO_IP` or
/// `IPPROTO_IPV6`. Will trap if an invalid value is provided.
/// - returns: The socket option name to use for this group operation.
func optionName(level: CInt) -> CInt {
switch (self, level) {
case (.join, CInt(IPPROTO_IP)):
return CInt(IP_ADD_MEMBERSHIP)
case (.leave, CInt(IPPROTO_IP)):
return CInt(IP_DROP_MEMBERSHIP)
case (.join, CInt(IPPROTO_IPV6)):
return CInt(IPV6_JOIN_GROUP)
case (.leave, CInt(IPPROTO_IPV6)):
return CInt(IPV6_LEAVE_GROUP)
default:
preconditionFailure("Unexpected socket option level: \(level)")
}
}
}
public func joinGroup(_ group: SocketAddress, interface: NIONetworkInterface?, promise: EventLoopPromise<Void>?) {
if eventLoop.inEventLoop {
self.performGroupOperation0(group, interface: interface, promise: promise, operation: .join)
} else {
eventLoop.execute {
self.performGroupOperation0(group, interface: interface, promise: promise, operation: .join)
}
}
}
public func leaveGroup(_ group: SocketAddress, interface: NIONetworkInterface?, promise: EventLoopPromise<Void>?) {
if eventLoop.inEventLoop {
self.performGroupOperation0(group, interface: interface, promise: promise, operation: .leave)
} else {
eventLoop.execute {
self.performGroupOperation0(group, interface: interface, promise: promise, operation: .leave)
}
}
}
/// The implementation of `joinGroup` and `leaveGroup`.
///
/// Joining and leaving a multicast group ultimately corresponds to a single, carefully crafted, socket option.
private func performGroupOperation0(_ group: SocketAddress,
interface: NIONetworkInterface?,
promise: EventLoopPromise<Void>?,
operation: GroupOperation) {
assert(self.eventLoop.inEventLoop)
guard self.isActive else {
promise?.fail(error: ChannelLifecycleError.inappropriateOperationForState)
return
}
// We need to check that we have the appropriate address types in all cases. They all need to overlap with
// the address type of this channel, or this cannot work.
guard let localAddress = self.localAddress else {
promise?.fail(error: MulticastError.unknownLocalAddress)
return
}
guard localAddress.protocolFamily == group.protocolFamily else {
promise?.fail(error: MulticastError.badMulticastGroupAddressFamily)
return
}
// Ok, now we need to check that the group we've been asked to join is actually a multicast group.
guard group.isMulticast else {
promise?.fail(error: MulticastError.illegalMulticastAddress(group))
return
}
// Ok, we now have reason to believe this will actually work. We need to pass this on to the socket.
do {
switch (group, interface?.address) {
case (.unixDomainSocket, _):
preconditionFailure("Should not be reachable, UNIX sockets are never multicast addresses")
case (.v4(let groupAddress), .some(.v4(let interfaceAddress))):
// IPv4Binding with specific target interface.
let multicastRequest = ip_mreq(imr_multiaddr: groupAddress.address.sin_addr, imr_interface: interfaceAddress.address.sin_addr)
try self.socket.setOption(level: CInt(IPPROTO_IP), name: operation.optionName(level: CInt(IPPROTO_IP)), value: multicastRequest)
case (.v4(let groupAddress), .none):
// IPv4 binding without target interface.
let multicastRequest = ip_mreq(imr_multiaddr: groupAddress.address.sin_addr, imr_interface: in_addr(s_addr: INADDR_ANY))
try self.socket.setOption(level: CInt(IPPROTO_IP), name: operation.optionName(level: CInt(IPPROTO_IP)), value: multicastRequest)
case (.v6(let groupAddress), .some(.v6)):
// IPv6 binding with specific target interface.
let multicastRequest = ipv6_mreq(ipv6mr_multiaddr: groupAddress.address.sin6_addr, ipv6mr_interface: UInt32(interface!.interfaceIndex))
try self.socket.setOption(level: CInt(IPPROTO_IPV6), name: operation.optionName(level: CInt(IPPROTO_IPV6)), value: multicastRequest)
case (.v6(let groupAddress), .none):
// IPv6 binding with no specific interface requested.
let multicastRequest = ipv6_mreq(ipv6mr_multiaddr: groupAddress.address.sin6_addr, ipv6mr_interface: 0)
try self.socket.setOption(level: CInt(IPPROTO_IPV6), name: operation.optionName(level: CInt(IPPROTO_IPV6)), value: multicastRequest)
case (.v4, .some(.v6)), (.v6, .some(.v4)), (.v4, .some(.unixDomainSocket)), (.v6, .some(.unixDomainSocket)):
// Mismatched group and interface address: this is an error.
throw MulticastError.badInterfaceAddressFamily
}
promise?.succeed(result: ())
} catch {
promise?.fail(error: error)
return
}
}
}

View File

@ -99,6 +99,114 @@ public extension SocketOptionProvider {
public func getSoLinger() -> EventLoopFuture<linger> {
return self.unsafeGetSocketOption(level: SocketOptionLevel(SOL_SOCKET), name: SO_LINGER)
}
/// Sets the socket option IP_MULTICAST_IF to `value`.
///
/// - parameters:
/// - value: The value to set IP_MULTICAST_IF to.
/// - returns: An `EventLoopFuture` that fires when the option has been set,
/// or if an error has occurred.
public func setIPMulticastIF(_ value: in_addr) -> EventLoopFuture<Void> {
return self.unsafeSetSocketOption(level: IPPROTO_IP, name: IP_MULTICAST_IF, value: value)
}
/// Gets the value of the socket option IP_MULTICAST_IF.
///
/// - returns: An `EventLoopFuture` containing the value of the socket option, or
/// any error that occurred while retrieving the socket option.
public func getIPMulticastIF() -> EventLoopFuture<in_addr> {
return self.unsafeGetSocketOption(level: IPPROTO_IP, name: IP_MULTICAST_IF)
}
/// Sets the socket option IP_MULTICAST_TTL to `value`.
///
/// - parameters:
/// - value: The value to set IP_MULTICAST_TTL to.
/// - returns: An `EventLoopFuture` that fires when the option has been set,
/// or if an error has occurred.
public func setIPMulticastTTL(_ value: CUnsignedChar) -> EventLoopFuture<Void> {
return self.unsafeSetSocketOption(level: IPPROTO_IP, name: IP_MULTICAST_TTL, value: value)
}
/// Gets the value of the socket option IP_MULTICAST_TTL.
///
/// - returns: An `EventLoopFuture` containing the value of the socket option, or
/// any error that occurred while retrieving the socket option.
public func getIPMulticastTTL() -> EventLoopFuture<CUnsignedChar> {
return self.unsafeGetSocketOption(level: IPPROTO_IP, name: IP_MULTICAST_TTL)
}
/// Sets the socket option IP_MULTICAST_LOOP to `value`.
///
/// - parameters:
/// - value: The value to set IP_MULTICAST_LOOP to.
/// - returns: An `EventLoopFuture` that fires when the option has been set,
/// or if an error has occurred.
public func setIPMulticastLoop(_ value: CUnsignedChar) -> EventLoopFuture<Void> {
return self.unsafeSetSocketOption(level: IPPROTO_IP, name: IP_MULTICAST_LOOP, value: value)
}
/// Gets the value of the socket option IP_MULTICAST_LOOP.
///
/// - returns: An `EventLoopFuture` containing the value of the socket option, or
/// any error that occurred while retrieving the socket option.
public func getIPMulticastLoop() -> EventLoopFuture<CUnsignedChar> {
return self.unsafeGetSocketOption(level: IPPROTO_IP, name: IP_MULTICAST_LOOP)
}
/// Sets the socket option IPV6_MULTICAST_IF to `value`.
///
/// - parameters:
/// - value: The value to set IPV6_MULTICAST_IF to.
/// - returns: An `EventLoopFuture` that fires when the option has been set,
/// or if an error has occurred.
public func setIPv6MulticastIF(_ value: CUnsignedInt) -> EventLoopFuture<Void> {
return self.unsafeSetSocketOption(level: IPPROTO_IPV6, name: IPV6_MULTICAST_IF, value: value)
}
/// Gets the value of the socket option IPV6_MULTICAST_IF.
///
/// - returns: An `EventLoopFuture` containing the value of the socket option, or
/// any error that occurred while retrieving the socket option.
public func getIPv6MulticastIF() -> EventLoopFuture<CUnsignedInt> {
return self.unsafeGetSocketOption(level: IPPROTO_IPV6, name: IPV6_MULTICAST_IF)
}
/// Sets the socket option IPV6_MULTICAST_HOPS to `value`.
///
/// - parameters:
/// - value: The value to set IPV6_MULTICAST_HOPS to.
/// - returns: An `EventLoopFuture` that fires when the option has been set,
/// or if an error has occurred.
public func setIPv6MulticastHops(_ value: CInt) -> EventLoopFuture<Void> {
return self.unsafeSetSocketOption(level: IPPROTO_IPV6, name: IPV6_MULTICAST_HOPS, value: value)
}
/// Gets the value of the socket option IPV6_MULTICAST_HOPS.
///
/// - returns: An `EventLoopFuture` containing the value of the socket option, or
/// any error that occurred while retrieving the socket option.
public func getIPv6MulticastHops() -> EventLoopFuture<CInt> {
return self.unsafeGetSocketOption(level: IPPROTO_IPV6, name: IPV6_MULTICAST_HOPS)
}
/// Sets the socket option IPV6_MULTICAST_LOOP to `value`.
///
/// - parameters:
/// - value: The value to set IPV6_MULTICAST_LOOP to.
/// - returns: An `EventLoopFuture` that fires when the option has been set,
/// or if an error has occurred.
public func setIPv6MulticastLoop(_ value: CUnsignedInt) -> EventLoopFuture<Void> {
return self.unsafeSetSocketOption(level: IPPROTO_IPV6, name: IPV6_MULTICAST_LOOP, value: value)
}
/// Gets the value of the socket option IPV6_MULTICAST_LOOP.
///
/// - returns: An `EventLoopFuture` containing the value of the socket option, or
/// any error that occurred while retrieving the socket option.
public func getIPv6MulticastLoop() -> EventLoopFuture<CUnsignedInt> {
return self.unsafeGetSocketOption(level: IPPROTO_IPV6, name: IPV6_MULTICAST_LOOP)
}
}

View File

@ -54,6 +54,7 @@ private let sysGetpeername: @convention(c) (CInt, UnsafeMutablePointer<sockaddr>
private let sysGetsockname: @convention(c) (CInt, UnsafeMutablePointer<sockaddr>?, UnsafeMutablePointer<socklen_t>?) -> CInt = getsockname
private let sysGetifaddrs: @convention(c) (UnsafeMutablePointer<UnsafeMutablePointer<ifaddrs>?>?) -> CInt = getifaddrs
private let sysFreeifaddrs: @convention(c) (UnsafeMutablePointer<ifaddrs>?) -> Void = freeifaddrs
private let sysIfNameToIndex: @convention(c) (UnsafePointer<CChar>?) -> CUnsignedInt = if_nametoindex
private let sysAF_INET = AF_INET
private let sysAF_INET6 = AF_INET6
private let sysAF_UNIX = AF_UNIX
@ -449,6 +450,12 @@ internal enum Posix {
}
}
@inline(never)
public static func if_nametoindex(_ name: UnsafePointer<CChar>?) throws -> CUnsignedInt {
return try wrapSyscall {
sysIfNameToIndex(name)
}
}
}
#if os(macOS) || os(iOS) || os(watchOS) || os(tvOS)

View File

@ -0,0 +1,111 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import NIO
/// Implements a simple chat protocol.
private final class ChatMessageDecoder: ChannelInboundHandler {
public typealias InboundIn = AddressedEnvelope<ByteBuffer>
public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
let envelope = self.unwrapInboundIn(data)
var buffer = envelope.data
// To begin with, the chat messages are simply whole datagrams, no other length.
guard let message = buffer.readString(length: buffer.readableBytes) else {
print("Error: invalid string received")
return
}
print("\(envelope.remoteAddress): \(message)")
}
}
private final class ChatMessageEncoder: ChannelOutboundHandler {
public typealias OutboundIn = AddressedEnvelope<String>
public typealias OutboundOut = AddressedEnvelope<ByteBuffer>
func write(ctx: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
let message = self.unwrapOutboundIn(data)
var buffer = ctx.channel.allocator.buffer(capacity: message.data.utf8.count)
buffer.write(string: message.data)
ctx.write(self.wrapOutboundOut(AddressedEnvelope(remoteAddress: message.remoteAddress, data: buffer)), promise: promise)
}
}
// We allow users to specify the interface they want to use here.
var targetInterface: NIONetworkInterface? = nil
if let interfaceAddress = CommandLine.arguments.dropFirst().first,
let targetAddress = try? SocketAddress(ipAddress: interfaceAddress, port: 0) {
for interface in try! System.enumerateInterfaces() {
if interface.address == targetAddress {
targetInterface = interface
break
}
}
if targetInterface == nil {
fatalError("Could not find interface for \(interfaceAddress)")
}
}
// For this chat protocol we temporarily squat on 224.1.0.26. This is a reserved multicast IPv4 address,
// so your machine is unlikely to have already joined this group. That helps properly demonstrate correct
// operation. We use port 7654 because, well, because why not.
let chatMulticastGroup = try! SocketAddress(ipAddress: "224.1.0.26", port: 7654)
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
// Begin by setting up the basics of the bootstrap.
var datagramBootstrap = DatagramBootstrap(group: group)
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEPORT), value: 1)
.channelInitializer { channel in
return channel.pipeline.add(handler: ChatMessageEncoder()).then {
channel.pipeline.add(handler: ChatMessageDecoder())
}
}
// We cast our channel to MulticastChannel to obtain the multicast operations.
let datagramChannel = try datagramBootstrap
.bind(host: "0.0.0.0", port: 7654)
.then { channel -> EventLoopFuture<Channel> in
let channel = channel as! MulticastChannel
return channel.joinGroup(chatMulticastGroup, interface: targetInterface).map { channel }
}.then { channel -> EventLoopFuture<Channel> in
guard let targetInterface = targetInterface else {
return channel.eventLoop.newSucceededFuture(result: channel)
}
let provider = channel as! SocketOptionProvider
switch targetInterface.address {
case .v4(let addr):
return provider.setIPMulticastIF(addr.address.sin_addr).map { channel }
case .v6:
return provider.setIPv6MulticastIF(CUnsignedInt(targetInterface.interfaceIndex)).map { channel }
case .unixDomainSocket:
preconditionFailure("Should not be possible to create a multicast socket on a unix domain socket")
}
}.wait()
print("Now broadcasting, happy chatting.\nPress ^D to exit.")
while let line = readLine(strippingNewline: false) {
datagramChannel.writeAndFlush(AddressedEnvelope(remoteAddress: chatMulticastGroup, data: line), promise: nil)
}
// Close the channel.
try! datagramChannel.close().wait()
try! group.syncShutdownGracefully()

View File

@ -73,6 +73,7 @@ import XCTest
testCase(IntegerTypesTest.allTests),
testCase(MarkedCircularBufferTests.allTests),
testCase(MessageToByteEncoderTest.allTests),
testCase(MulticastTest.allTests),
testCase(NIOConcurrencyHelpersTests.allTests),
testCase(NonBlockingFileIOTest.allTests),
testCase(PendingDatagramWritesManagerTests.allTests),
@ -81,7 +82,7 @@ import XCTest
testCase(SniHandlerTest.allTests),
testCase(SocketAddressTest.allTests),
testCase(SocketChannelTest.allTests),
testCase(SocketOptionChannelTest.allTests),
testCase(SocketOptionProviderTest.allTests),
testCase(SystemTest.allTests),
testCase(ThreadTest.allTests),
testCase(TypeAssistedChannelHandlerTest.allTests),

View File

@ -0,0 +1,36 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
//
// MulticastTest+XCTest.swift
//
import XCTest
///
/// NOTE: This file was generated by generate_linux_tests.rb
///
/// Do NOT edit this file directly as it will be regenerated automatically when needed.
///
extension MulticastTest {
static var allTests : [(String, (MulticastTest) -> () throws -> Void)] {
return [
("testCanJoinBasicMulticastGroupIPv4", testCanJoinBasicMulticastGroupIPv4),
("testCanJoinBasicMulticastGroupIPv6", testCanJoinBasicMulticastGroupIPv6),
("testCanLeaveAnIPv4MulticastGroup", testCanLeaveAnIPv4MulticastGroup),
("testCanLeaveAnIPv6MulticastGroup", testCanLeaveAnIPv6MulticastGroup),
]
}
}

View File

@ -0,0 +1,301 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import NIO
import XCTest
final class PromiseOnReadHandler: ChannelInboundHandler {
typealias InboundIn = AddressedEnvelope<ByteBuffer>
private let promise: EventLoopPromise<InboundIn>
init(promise: EventLoopPromise<InboundIn>) {
self.promise = promise
}
func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
self.promise.succeed(result: self.unwrapInboundIn(data))
_ = ctx.pipeline.remove(ctx: ctx)
}
}
final class MulticastTest: XCTestCase {
private var group: MultiThreadedEventLoopGroup!
override func setUp() {
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
}
override func tearDown() {
try? self.group.syncShutdownGracefully()
}
struct NoSuchInterfaceError: Error { }
struct MulticastInterfaceMismatchError: Error { }
struct ReceivedDatagramError: Error { }
private var supportsIPv6: Bool {
do {
let ipv6Loopback = try SocketAddress(ipAddress: "::1", port: 0)
return try System.enumerateInterfaces().filter { $0.address == ipv6Loopback }.first != nil
} catch {
return false
}
}
private func interfaceForAddress(address: String) throws -> NIONetworkInterface {
let targetAddress = try SocketAddress(ipAddress: address, port: 0)
guard let interface = try System.enumerateInterfaces().lazy.filter({ $0.address == targetAddress }).first else {
throw NoSuchInterfaceError()
}
return interface
}
private func bindMulticastChannel(host: String, port: Int, multicastAddress: String, interface: NIONetworkInterface) -> EventLoopFuture<MulticastChannel> {
return DatagramBootstrap(group: self.group)
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.bind(host: host, port: port)
.then { channel in
let channel = channel as! MulticastChannel
do {
let multicastAddress = try SocketAddress(ipAddress: multicastAddress, port: channel.localAddress!.port!)
return channel.joinGroup(multicastAddress, interface: interface).map { channel }
} catch {
return channel.eventLoop.newFailedFuture(error: error)
}
}.then { (channel: MulticastChannel) -> EventLoopFuture<MulticastChannel> in
let provider = channel as! SocketOptionProvider
switch channel.localAddress! {
case .v4:
return provider.setIPMulticastLoop(1).map { channel }
case .v6:
return provider.setIPv6MulticastLoop(1).map { channel }
case .unixDomainSocket:
preconditionFailure("Multicast is meaningless on unix domain sockets")
}
}
}
private func configureSenderMulticastIf(sender: Channel, multicastInterface: NIONetworkInterface) -> EventLoopFuture<Void> {
let provider = sender as! SocketOptionProvider
switch (sender.localAddress!, multicastInterface.address) {
case (.v4, .v4(let addr)):
return provider.setIPMulticastIF(addr.address.sin_addr)
case (.v6, .v6):
return provider.setIPv6MulticastIF(CUnsignedInt(multicastInterface.interfaceIndex))
default:
XCTFail("Cannot join channel bound to \(sender.localAddress!) to interface at \(multicastInterface.address)")
return sender.eventLoop.newFailedFuture(error: MulticastInterfaceMismatchError())
}
}
private func leaveMulticastGroup(channel: Channel, multicastAddress: String, interface: NIONetworkInterface) -> EventLoopFuture<Void> {
let channel = channel as! MulticastChannel
do {
let multicastAddress = try SocketAddress(ipAddress: multicastAddress, port: channel.localAddress!.port!)
return channel.leaveGroup(multicastAddress, interface: interface)
} catch {
return channel.eventLoop.newFailedFuture(error: error)
}
}
private func assertDatagramReaches(multicastChannel: Channel, sender: Channel, multicastAddress: SocketAddress, file: StaticString = #file, line: UInt = #line) throws {
let receivedMulticastDatagram: EventLoopPromise<AddressedEnvelope<ByteBuffer>> = multicastChannel.eventLoop.newPromise()
XCTAssertNoThrow(try multicastChannel.pipeline.add(handler: PromiseOnReadHandler(promise: receivedMulticastDatagram)).wait())
var messageBuffer = sender.allocator.buffer(capacity: 24)
messageBuffer.write(staticString: "hello, world!")
XCTAssertNoThrow(
try sender.writeAndFlush(AddressedEnvelope(remoteAddress: multicastAddress, data: messageBuffer)).wait(),
file: file,
line: line
)
let receivedDatagram = try assertNoThrowWithValue(receivedMulticastDatagram.futureResult.wait(), file: file, line: line)
XCTAssertEqual(receivedDatagram.remoteAddress, sender.localAddress!)
XCTAssertEqual(receivedDatagram.data, messageBuffer)
}
private func assertDatagramDoesNotReach(multicastChannel: Channel,
after timeout: TimeAmount,
sender: Channel,
multicastAddress: SocketAddress,
file: StaticString = #file, line: UInt = #line) throws {
let timeoutPromise: EventLoopPromise<Void> = multicastChannel.eventLoop.newPromise()
let receivedMulticastDatagram: EventLoopPromise<AddressedEnvelope<ByteBuffer>> = multicastChannel.eventLoop.newPromise()
XCTAssertNoThrow(try multicastChannel.pipeline.add(handler: PromiseOnReadHandler(promise: receivedMulticastDatagram)).wait())
// If we receive a datagram, or the reader promise fails, we must fail the timeoutPromise.
receivedMulticastDatagram.futureResult.map { (_: AddressedEnvelope<ByteBuffer>) in
timeoutPromise.fail(error: ReceivedDatagramError())
}.cascadeFailure(promise: timeoutPromise)
var messageBuffer = sender.allocator.buffer(capacity: 24)
messageBuffer.write(staticString: "hello, world!")
XCTAssertNoThrow(
try sender.writeAndFlush(AddressedEnvelope(remoteAddress: multicastAddress, data: messageBuffer)).wait(),
file: file,
line: line
)
_ = multicastChannel.eventLoop.scheduleTask(in: timeout) { timeoutPromise.succeed(result: ()) }
XCTAssertNoThrow(try timeoutPromise.futureResult.wait(), file: file, line: line)
}
func testCanJoinBasicMulticastGroupIPv4() throws {
let multicastInterface = try assertNoThrowWithValue(self.interfaceForAddress(address: "127.0.0.1"))
// We avoid the risk of interference due to our all-addresses bind by only joining this multicast
// group on the loopback.
let listenerChannel = try assertNoThrowWithValue(self.bindMulticastChannel(host: "0.0.0.0",
port: 0,
multicastAddress: "224.0.2.66",
interface: multicastInterface).wait())
defer {
XCTAssertNoThrow(try listenerChannel.close().wait())
}
let multicastAddress = try assertNoThrowWithValue(try SocketAddress(ipAddress: "224.0.2.66", port: listenerChannel.localAddress!.port!))
// Now that we've joined the group, let's send to it.
let sender = try assertNoThrowWithValue(DatagramBootstrap(group: self.group)
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.bind(host: "127.0.0.1", port: 0)
.wait()
)
defer {
XCTAssertNoThrow(try sender.close().wait())
}
XCTAssertNoThrow(try configureSenderMulticastIf(sender: sender, multicastInterface: multicastInterface).wait())
try self.assertDatagramReaches(multicastChannel: listenerChannel, sender: sender, multicastAddress: multicastAddress)
}
func testCanJoinBasicMulticastGroupIPv6() throws {
guard self.supportsIPv6 else {
// Skip on non-IPv6 systems
return
}
let multicastInterface = try assertNoThrowWithValue(self.interfaceForAddress(address: "::1"))
// We avoid the risk of interference due to our all-addresses bind by only joining this multicast
// group on the loopback.
let listenerChannel = try assertNoThrowWithValue(self.bindMulticastChannel(host: "::1",
port: 0,
multicastAddress: "ff12::beeb",
interface: multicastInterface).wait())
defer {
XCTAssertNoThrow(try listenerChannel.close().wait())
}
let multicastAddress = try assertNoThrowWithValue(try SocketAddress(ipAddress: "ff12::beeb", port: listenerChannel.localAddress!.port!))
// Now that we've joined the group, let's send to it.
let sender = try assertNoThrowWithValue(DatagramBootstrap(group: self.group)
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.bind(host: "::1", port: 0)
.wait()
)
defer {
XCTAssertNoThrow(try sender.close().wait())
}
XCTAssertNoThrow(try configureSenderMulticastIf(sender: sender, multicastInterface: multicastInterface).wait())
try self.assertDatagramReaches(multicastChannel: listenerChannel, sender: sender, multicastAddress: multicastAddress)
}
func testCanLeaveAnIPv4MulticastGroup() throws {
let multicastInterface = try assertNoThrowWithValue(self.interfaceForAddress(address: "127.0.0.1"))
// We avoid the risk of interference due to our all-addresses bind by only joining this multicast
// group on the loopback.
let listenerChannel = try assertNoThrowWithValue(self.bindMulticastChannel(host: "0.0.0.0",
port: 0,
multicastAddress: "224.0.2.66",
interface: multicastInterface).wait())
defer {
XCTAssertNoThrow(try listenerChannel.close().wait())
}
let multicastAddress = try assertNoThrowWithValue(try SocketAddress(ipAddress: "224.0.2.66", port: listenerChannel.localAddress!.port!))
// Now that we've joined the group, let's send to it.
let sender = try assertNoThrowWithValue(DatagramBootstrap(group: self.group)
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.bind(host: "127.0.0.1", port: 0)
.wait()
)
defer {
XCTAssertNoThrow(try sender.close().wait())
}
XCTAssertNoThrow(try configureSenderMulticastIf(sender: sender, multicastInterface: multicastInterface).wait())
try self.assertDatagramReaches(multicastChannel: listenerChannel, sender: sender, multicastAddress: multicastAddress)
// Now we should *leave* the group.
XCTAssertNoThrow(try leaveMulticastGroup(channel: listenerChannel, multicastAddress: "224.0.2.66", interface: multicastInterface).wait())
try self.assertDatagramDoesNotReach(multicastChannel: listenerChannel, after: .milliseconds(500), sender: sender, multicastAddress: multicastAddress)
}
func testCanLeaveAnIPv6MulticastGroup() throws {
guard self.supportsIPv6 else {
// Skip on non-IPv6 systems
return
}
let multicastInterface = try assertNoThrowWithValue(self.interfaceForAddress(address: "::1"))
// We avoid the risk of interference due to our all-addresses bind by only joining this multicast
// group on the loopback.
let listenerChannel = try assertNoThrowWithValue(self.bindMulticastChannel(host: "::1",
port: 0,
multicastAddress: "ff12::beeb",
interface: multicastInterface).wait())
defer {
XCTAssertNoThrow(try listenerChannel.close().wait())
}
let multicastAddress = try assertNoThrowWithValue(try SocketAddress(ipAddress: "ff12::beeb", port: listenerChannel.localAddress!.port!))
// Now that we've joined the group, let's send to it.
let sender = try assertNoThrowWithValue(DatagramBootstrap(group: self.group)
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.bind(host: "::1", port: 0)
.wait()
)
defer {
XCTAssertNoThrow(try sender.close().wait())
}
XCTAssertNoThrow(try configureSenderMulticastIf(sender: sender, multicastInterface: multicastInterface).wait())
try self.assertDatagramReaches(multicastChannel: listenerChannel, sender: sender, multicastAddress: multicastAddress)
// Now we should *leave* the group.
XCTAssertNoThrow(try leaveMulticastGroup(channel: listenerChannel, multicastAddress: "ff12::beeb", interface: multicastInterface).wait())
try self.assertDatagramDoesNotReach(multicastChannel: listenerChannel, after: .milliseconds(500), sender: sender, multicastAddress: multicastAddress)
}
}

View File

@ -12,7 +12,7 @@
//
//===----------------------------------------------------------------------===//
//
// SocketOptionChannelTest+XCTest.swift
// SocketOptionProviderTest+XCTest.swift
//
import XCTest
@ -22,9 +22,9 @@ import XCTest
/// Do NOT edit this file directly as it will be regenerated automatically when needed.
///
extension SocketOptionChannelTest {
extension SocketOptionProviderTest {
static var allTests : [(String, (SocketOptionChannelTest) -> () throws -> Void)] {
static var allTests : [(String, (SocketOptionProviderTest) -> () throws -> Void)] {
return [
("testSettingAndGettingComplexSocketOption", testSettingAndGettingComplexSocketOption),
("testObtainingDefaultValueOfComplexSocketOption", testObtainingDefaultValueOfComplexSocketOption),
@ -32,6 +32,12 @@ extension SocketOptionChannelTest {
("testObtainingDefaultValueOfSimpleSocketOption", testObtainingDefaultValueOfSimpleSocketOption),
("testPassingInvalidSizeToSetComplexSocketOptionFails", testPassingInvalidSizeToSetComplexSocketOptionFails),
("testLinger", testLinger),
("testSoIpMulticastIf", testSoIpMulticastIf),
("testIpMulticastTtl", testIpMulticastTtl),
("testIpMulticastLoop", testIpMulticastLoop),
("testIpv6MulticastIf", testIpv6MulticastIf),
("testIPv6MulticastHops", testIPv6MulticastHops),
("testIPv6MulticastLoop", testIPv6MulticastLoop),
]
}
}

View File

@ -15,10 +15,12 @@
import NIO
import XCTest
final class SocketOptionChannelTest: XCTestCase {
final class SocketOptionProviderTest: XCTestCase {
var group: MultiThreadedEventLoopGroup!
var serverChannel: Channel!
var clientChannel: Channel!
var ipv4DatagramChannel: Channel!
var ipv6DatagramChannel: Channel?
struct CastError: Error { }
@ -30,13 +32,47 @@ final class SocketOptionChannelTest: XCTestCase {
return provider
}
private func ipv4MulticastProvider(file: StaticString = #file, line: UInt = #line) throws -> SocketOptionProvider {
guard let provider = self.ipv4DatagramChannel as? SocketOptionProvider else {
XCTFail("Unable to cast \(String(describing: self.ipv4DatagramChannel)) to SocketOptionProvider", file: file, line: line)
throw CastError()
}
return provider
}
private func ipv6MulticastProvider(file: StaticString = #file, line: UInt = #line) throws -> SocketOptionProvider? {
guard let ipv6Channel = self.ipv6DatagramChannel else {
return nil
}
guard let provider = ipv6Channel as? SocketOptionProvider else {
XCTFail("Unable to cast \(ipv6Channel)) to SocketOptionChannel", file: file, line: line)
throw CastError()
}
return provider
}
override func setUp() {
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
self.serverChannel = try? assertNoThrowWithValue(ServerBootstrap(group: group).bind(host: "127.0.0.1", port: 0).wait())
self.clientChannel = try? assertNoThrowWithValue(ClientBootstrap(group: group).connect(to: serverChannel.localAddress!).wait())
self.ipv4DatagramChannel = try? assertNoThrowWithValue(
DatagramBootstrap(group: group).bind(host: "127.0.0.1", port: 0).then { channel in
return (channel as! MulticastChannel).joinGroup(try! SocketAddress(ipAddress: "224.0.2.66", port: 0)).map { channel }
}.wait()
)
// The IPv6 setup is allowed to fail, some hosts don't have IPv6.
self.ipv6DatagramChannel = try? DatagramBootstrap(group: group).bind(host: "::1", port: 0).then { channel in
return (channel as! MulticastChannel).joinGroup(try! SocketAddress(ipAddress: "ff12::beeb", port: 0)).map { channel }
}.wait()
}
override func tearDown() {
XCTAssertNoThrow(try ipv6DatagramChannel?.close().wait())
XCTAssertNoThrow(try ipv4DatagramChannel.close().wait())
XCTAssertNoThrow(try clientChannel.close().wait())
XCTAssertNoThrow(try serverChannel.close().wait())
XCTAssertNoThrow(try group.syncShutdownGracefully())
@ -111,4 +147,88 @@ final class SocketOptionChannelTest: XCTestCase {
XCTAssertEqual($0.l_onoff, newLingerValue.l_onoff)
}.wait())
}
func testSoIpMulticastIf() throws {
let channel = self.ipv4DatagramChannel!
let provider = try assertNoThrowWithValue(self.ipv4MulticastProvider())
let address: in_addr
switch channel.localAddress! {
case .v4(let addr):
address = addr.address.sin_addr
default:
XCTFail("Local address must be IPv4!")
return
}
XCTAssertNoThrow(try provider.setIPMulticastIF(address).then {
provider.getIPMulticastIF()
}.map {
XCTAssertEqual($0.s_addr, address.s_addr)
}.wait())
}
func testIpMulticastTtl() throws {
let provider = try assertNoThrowWithValue(self.ipv4MulticastProvider())
XCTAssertNoThrow(try provider.setIPMulticastTTL(6).then {
provider.getIPMulticastTTL()
}.map {
XCTAssertEqual($0, 6)
}.wait())
}
func testIpMulticastLoop() throws {
let provider = try assertNoThrowWithValue(self.ipv4MulticastProvider())
XCTAssertNoThrow(try provider.setIPMulticastLoop(1).then {
provider.getIPMulticastLoop()
}.map {
XCTAssertNotEqual($0, 0)
}.wait())
}
func testIpv6MulticastIf() throws {
guard let provider = try assertNoThrowWithValue(self.ipv6MulticastProvider()) else {
// Skip on systems without IPv6.
return
}
// TODO: test this when we know what the interface indices are.
let loopbackAddress = try assertNoThrowWithValue(SocketAddress(ipAddress: "::1", port: 0))
guard let loopbackInterface = try assertNoThrowWithValue(System.enumerateInterfaces().filter({ $0.address == loopbackAddress }).first) else {
XCTFail("Could not find index of loopback address")
return
}
XCTAssertNoThrow(try provider.setIPv6MulticastIF(CUnsignedInt(loopbackInterface.interfaceIndex)).then {
provider.getIPv6MulticastIF()
}.map {
XCTAssertEqual($0, CUnsignedInt(loopbackInterface.interfaceIndex))
}.wait())
}
func testIPv6MulticastHops() throws {
guard let provider = try assertNoThrowWithValue(self.ipv6MulticastProvider()) else {
// Skip on systems without IPv6.
return
}
XCTAssertNoThrow(try provider.setIPv6MulticastHops(6).then {
provider.getIPv6MulticastHops()
}.map {
XCTAssertEqual($0, 6)
}.wait())
}
func testIPv6MulticastLoop() throws {
guard let provider = try assertNoThrowWithValue(self.ipv6MulticastProvider()) else {
// Skip on systems without IPv6.
return
}
XCTAssertNoThrow(try provider.setIPv6MulticastLoop(1).then {
provider.getIPv6MulticastLoop()
}.map {
XCTAssertNotEqual($0, 0)
}.wait())
}
}