MPTCP support on Linux (#2308)

Motivation

MPTCP provides multipath capability for TCP connections. This
allows TCP connections to consume multiple independent network
paths, providing devices with a number of capabilities to
improve throughput, latency, or reliability.

MPTCP is not totally transparent, and requires servers to support
the functionality as well as clients. To that end, we should expose
some MPTCP capability.

Importantly, MPTCP uses a number of new socket flags and options.
To enable us to support this when it is available but gracefully fail
when it is not, we've hardcoded a number of Linux kernel constants
instead of relying on libc to expose them. This is safe to do on Linux
because its syscall layer is ABI stable.

Modifications

- Add ClientBootstrap and ServerBootstrap flags for MPTCP
- Plumb MPTCP through the stack
- Add new socket options for MPTCP

Result

MPTCP is supported on Linux
This commit is contained in:
Cory Benfield 2022-11-09 11:02:45 +00:00 committed by GitHub
parent 6e404d1614
commit 558e4f2fb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 219 additions and 36 deletions

View File

@ -27,6 +27,29 @@
#include <netinet/ip.h>
#include "liburing_nio.h"
#if __has_include(<linux/mptcp.h>)
#include <linux/mptcp.h>
#else
// A backported copy of the mptcp_info structure to make programming against
// an uncertain linux kernel easier.
struct mptcp_info {
uint8_t mptcpi_subflows;
uint8_t mptcpi_add_addr_signal;
uint8_t mptcpi_add_addr_accepted;
uint8_t mptcpi_subflows_max;
uint8_t mptcpi_add_addr_signal_max;
uint8_t mptcpi_add_addr_accepted_max;
uint32_t mptcpi_flags;
uint32_t mptcpi_token;
uint64_t mptcpi_write_seq;
uint64_t mptcpi_snd_una;
uint64_t mptcpi_rcv_nxt;
uint8_t mptcpi_local_addr_used;
uint8_t mptcpi_local_addr_max;
uint8_t mptcpi_csum_enabled;
};
#endif
// Some explanation is required here.
//
// Due to SR-6772, we cannot get Swift code to directly see any of the mmsg structures or

View File

@ -236,6 +236,11 @@ extension NIOBSDSocket.OptionLevel {
NIOBSDSocket.OptionLevel(rawValue: IPPROTO_TCP)
#endif
/// Socket options that apply to MPTCP sockets.
///
/// These only work on Linux currently.
public static let mptcp = NIOBSDSocket.OptionLevel(rawValue: 284)
/// Socket options that apply to all sockets.
public static let socket: NIOBSDSocket.OptionLevel =
NIOBSDSocket.OptionLevel(rawValue: SOL_SOCKET)
@ -315,6 +320,15 @@ extension NIOBSDSocket.Option {
}
#endif
// MPTCP options
//
// These values are hardcoded as they're fairly new, and not available in all
// header files yet.
extension NIOBSDSocket.Option {
/// Get info about an MPTCP connection
public static let mptcp_info = NIOBSDSocket.Option(rawValue: 1)
}
// Socket Options
extension NIOBSDSocket.Option {
/// Get the error status and clear.

View File

@ -15,6 +15,7 @@
import Darwin
#elseif os(Linux) || os(Android)
import Glibc
import CNIOLinux
#elseif os(Windows)
import WinSDK
#endif
@ -282,4 +283,16 @@ extension SocketOptionProvider {
return self.unsafeGetSocketOption(level: .tcp, name: .tcp_connection_info)
}
#endif
#if os(Linux)
/// Gets the value of the socket option MPTCP_INFO.
///
/// This socket option cannot be set.
///
/// - returns: An `EventLoopFuture` containing the value of the socket option, or
/// any error that occurred while retrieving the socket option.
public func getMPTCPInfo() -> EventLoopFuture<mptcp_info> {
return self.unsafeGetSocketOption(level: .mptcp, name: .mptcp_info)
}
#endif
}

View File

@ -209,6 +209,17 @@ extension NIOBSDSocket {
throw err
}
}
// The protocol subtype for MPTCP.
// Returns nil if mptcp is not supported.
static var mptcpProtocolSubtype: Int? {
#if os(Linux)
// Defined by the linux kernel, this is IPPROTO_MPTCP.
return 262
#else
return nil
#endif
}
}
#if os(iOS) || os(macOS) || os(tvOS) || os(watchOS)

View File

@ -439,6 +439,13 @@ extension NIOBSDSocket {
return .processed(Int(nNumberOfBytesToWrite))
}
// The protocol subtype for MPTCP.
// Returns nil if mptcp is not supported.
static var mptcpProtocolSubtype: Int? {
// MPTCP not supported on Windows.
return nil
}
}
extension NIOBSDSocket {

View File

@ -162,10 +162,12 @@ class BaseSocket: BaseSocketProtocol {
/// - parameters:
/// - protocolFamily: The protocol family to use (usually `AF_INET6` or `AF_INET`).
/// - type: The type of the socket to create.
/// - protocolSubtype: The subtype of the protocol, corresponding to the `protocol`
/// argument to the socket syscall. Defaults to 0.
/// - setNonBlocking: Set non-blocking mode on the socket.
/// - returns: the file descriptor of the socket that was created.
/// - throws: An `IOError` if creation of the socket failed.
static func makeSocket(protocolFamily: NIOBSDSocket.ProtocolFamily, type: NIOBSDSocket.SocketType, setNonBlocking: Bool = false) throws -> NIOBSDSocket.Handle {
static func makeSocket(protocolFamily: NIOBSDSocket.ProtocolFamily, type: NIOBSDSocket.SocketType, protocolSubtype: Int = 0, setNonBlocking: Bool = false) throws -> NIOBSDSocket.Handle {
var sockType: CInt = type.rawValue
#if os(Linux)
if setNonBlocking {
@ -174,7 +176,7 @@ class BaseSocket: BaseSocketProtocol {
#endif
let sock = try NIOBSDSocket.socket(domain: protocolFamily,
type: NIOBSDSocket.SocketType(rawValue: sockType),
protocol: 0)
protocol: CInt(protocolSubtype))
#if !os(Linux)
if setNonBlocking {
do {
@ -203,7 +205,7 @@ class BaseSocket: BaseSocketProtocol {
}
return sock
}
/// Cleanup the unix domain socket.
///
/// Deletes the associated file if it exists and has socket type. Does nothing if pathname does not exist.

View File

@ -90,6 +90,7 @@ public final class ServerBootstrap {
internal var _serverChannelOptions: ChannelOptions.Storage
@usableFromInline
internal var _childChannelOptions: ChannelOptions.Storage
private var enableMPTCP: Bool
/// Create a `ServerBootstrap` on the `EventLoopGroup` `group`.
///
@ -147,8 +148,9 @@ public final class ServerBootstrap {
self.serverChannelInit = nil
self.childChannelInit = nil
self._serverChannelOptions.append(key: ChannelOptions.tcpOption(.tcp_nodelay), value: 1)
self.enableMPTCP = false
}
#if swift(>=5.7)
/// Initialize the `ServerSocketChannel` with `initializer`. The most common task in initializer is to add
/// `ChannelHandler`s to the `ChannelPipeline`.
@ -179,7 +181,7 @@ public final class ServerBootstrap {
return self
}
#endif
#if swift(>=5.7)
/// Initialize the accepted `SocketChannel`s with `initializer`. The most common task in initializer is to add
/// `ChannelHandler`s to the `ChannelPipeline`. Note that if the `initializer` fails then the error will be
@ -253,6 +255,30 @@ public final class ServerBootstrap {
return self
}
/// Enables multi-path TCP support.
///
/// This option is only supported on some systems, and will lead to bind
/// failing if the system does not support it. Users are recommended to
/// only enable this in response to configuration or feature detection.
///
/// > Note: Enabling this setting will re-enable Nagle's algorithm, even if it
/// > had been disabled. This is a temporary workaround for a Linux kernel
/// > limitation.
///
/// - parameters:
/// - value: Whether to enable MPTCP or not.
public func enableMPTCP(_ value: Bool) -> Self {
self.enableMPTCP = value
// This is a temporary workaround until we get some stable Linux kernel
// versions that support TCP_NODELAY and MPTCP.
if value {
self._serverChannelOptions.remove(key: ChannelOptions.tcpOption(.tcp_nodelay))
}
return self
}
/// Bind the `ServerSocketChannel` to `host` and `port`.
///
/// - parameters:
@ -281,7 +307,7 @@ public final class ServerBootstrap {
try SocketAddress(unixDomainSocketPath: unixDomainSocketPath)
}
}
/// Bind the `ServerSocketChannel` to a UNIX Domain Socket.
///
/// - parameters:
@ -315,7 +341,10 @@ public final class ServerBootstrap {
/// - parameters:
/// - descriptor: The _Unix file descriptor_ representing the bound stream socket.
public func withBoundSocket(_ socket: NIOBSDSocket.Handle) -> EventLoopFuture<Channel> {
func makeChannel(_ eventLoop: SelectableEventLoop, _ childEventLoopGroup: EventLoopGroup) throws -> ServerSocketChannel {
func makeChannel(_ eventLoop: SelectableEventLoop, _ childEventLoopGroup: EventLoopGroup, _ enableMPTCP: Bool) throws -> ServerSocketChannel {
if enableMPTCP {
throw ChannelError.operationUnsupported
}
return try ServerSocketChannel(socket: socket, eventLoop: eventLoop, group: childEventLoopGroup)
}
return bind0(makeServerChannel: makeChannel) { (eventLoop, serverChannel) in
@ -332,10 +361,11 @@ public final class ServerBootstrap {
} catch {
return group.next().makeFailedFuture(error)
}
func makeChannel(_ eventLoop: SelectableEventLoop, _ childEventLoopGroup: EventLoopGroup) throws -> ServerSocketChannel {
func makeChannel(_ eventLoop: SelectableEventLoop, _ childEventLoopGroup: EventLoopGroup, _ enableMPTCP: Bool) throws -> ServerSocketChannel {
return try ServerSocketChannel(eventLoop: eventLoop,
group: childEventLoopGroup,
protocolFamily: address.protocol)
protocolFamily: address.protocol,
enableMPTCP: enableMPTCP)
}
return bind0(makeServerChannel: makeChannel) { (eventLoop, serverChannel) in
@ -345,7 +375,7 @@ public final class ServerBootstrap {
}
}
private func bind0(makeServerChannel: (_ eventLoop: SelectableEventLoop, _ childGroup: EventLoopGroup) throws -> ServerSocketChannel, _ register: @escaping (EventLoop, ServerSocketChannel) -> EventLoopFuture<Void>) -> EventLoopFuture<Channel> {
private func bind0(makeServerChannel: (_ eventLoop: SelectableEventLoop, _ childGroup: EventLoopGroup, _ enableMPTCP: Bool) throws -> ServerSocketChannel, _ register: @escaping (EventLoop, ServerSocketChannel) -> EventLoopFuture<Void>) -> EventLoopFuture<Channel> {
let eventLoop = self.group.next()
let childEventLoopGroup = self.childGroup
let serverChannelOptions = self._serverChannelOptions
@ -355,7 +385,7 @@ public final class ServerBootstrap {
let serverChannel: ServerSocketChannel
do {
serverChannel = try makeServerChannel(eventLoop as! SelectableEventLoop, childEventLoopGroup)
serverChannel = try makeServerChannel(eventLoop as! SelectableEventLoop, childEventLoopGroup, self.enableMPTCP)
} catch {
return eventLoop.makeFailedFuture(error)
}
@ -522,6 +552,7 @@ public final class ClientBootstrap: NIOClientTCPBootstrapProtocol {
private var connectTimeout: TimeAmount = TimeAmount.seconds(10)
private var resolver: Optional<Resolver>
private var bindTarget: Optional<SocketAddress>
private var enableMPTCP: Bool
/// Create a `ClientBootstrap` on the `EventLoopGroup` `group`.
///
@ -555,8 +586,9 @@ public final class ClientBootstrap: NIOClientTCPBootstrapProtocol {
self.protocolHandlers = nil
self.resolver = nil
self.bindTarget = nil
self.enableMPTCP = false
}
#if swift(>=5.7)
/// Initialize the connected `SocketChannel` with `initializer`. The most common task in initializer is to add
/// `ChannelHandler`s to the `ChannelPipeline`.
@ -603,7 +635,7 @@ public final class ClientBootstrap: NIOClientTCPBootstrapProtocol {
return self
}
#endif
#if swift(>=5.7)
/// Sets the protocol handlers that will be added to the front of the `ChannelPipeline` right after the
/// `channelInitializer` has been called.
@ -660,6 +692,30 @@ public final class ClientBootstrap: NIOClientTCPBootstrapProtocol {
return self
}
/// Enables multi-path TCP support.
///
/// This option is only supported on some systems, and will lead to bind
/// failing if the system does not support it. Users are recommended to
/// only enable this in response to configuration or feature detection.
///
/// > Note: Enabling this setting will re-enable Nagle's algorithm, even if it
/// > had been disabled. This is a temporary workaround for a Linux kernel
/// > limitation.
///
/// - parameters:
/// - value: Whether to enable MPTCP or not.
public func enableMPTCP(_ value: Bool) -> Self {
self.enableMPTCP = value
// This is a temporary workaround until we get some stable Linux kernel
// versions that support TCP_NODELAY and MPTCP.
if value {
self._channelOptions.remove(key: ChannelOptions.tcpOption(.tcp_nodelay))
}
return self
}
/// Bind the `SocketChannel` to `address`.
///
/// Using `bind` is not necessary unless you need the local address to be bound to a specific address.
@ -675,7 +731,7 @@ public final class ClientBootstrap: NIOClientTCPBootstrapProtocol {
func makeSocketChannel(eventLoop: EventLoop,
protocolFamily: NIOBSDSocket.ProtocolFamily) throws -> SocketChannel {
return try SocketChannel(eventLoop: eventLoop as! SelectableEventLoop, protocolFamily: protocolFamily)
return try SocketChannel(eventLoop: eventLoop as! SelectableEventLoop, protocolFamily: protocolFamily, enableMPTCP: self.enableMPTCP)
}
/// Specify the `host` and `port` to connect to for the TCP `Channel` that will be established.
@ -913,7 +969,7 @@ public final class DatagramBootstrap {
self.group = group
self.channelInitializer = nil
}
#if swift(>=5.7)
/// Initialize the bound `DatagramChannel` with `initializer`. The most common task in initializer is to add
/// `ChannelHandler`s to the `ChannelPipeline`.
@ -1002,7 +1058,7 @@ public final class DatagramBootstrap {
return try SocketAddress(unixDomainSocketPath: unixDomainSocketPath)
}
}
/// Bind the `DatagramChannel` to a UNIX Domain Socket.
///
/// - parameters:
@ -1173,7 +1229,7 @@ public final class NIOPipeBootstrap {
self.group = group
self.channelInitializer = nil
}
#if swift(>=5.7)
/// Initialize the connected `PipeChannel` with `initializer`. The most common task in initializer is to add
/// `ChannelHandler`s to the `ChannelPipeline`.

View File

@ -30,10 +30,12 @@ import NIOCore
///
/// - parameters:
/// - protocolFamily: The protocol family to use (usually `AF_INET6` or `AF_INET`).
/// - protocolSubtype: The subtype of the protocol, corresponding to the `protocol`
/// argument to the socket syscall. Defaults to 0.
/// - setNonBlocking: Set non-blocking mode on the socket.
/// - throws: An `IOError` if creation of the socket failed.
init(protocolFamily: NIOBSDSocket.ProtocolFamily, setNonBlocking: Bool = false) throws {
let sock = try BaseSocket.makeSocket(protocolFamily: protocolFamily, type: .stream, setNonBlocking: setNonBlocking)
init(protocolFamily: NIOBSDSocket.ProtocolFamily, protocolSubtype: Int = 0, setNonBlocking: Bool = false) throws {
let sock = try BaseSocket.makeSocket(protocolFamily: protocolFamily, type: .stream, protocolSubtype: protocolSubtype, setNonBlocking: setNonBlocking)
switch protocolFamily {
case .unix:
cleanupOnClose = true
@ -119,7 +121,7 @@ import NIOCore
return sock
}
}
/// Close the socket.
///
/// After the socket was closed all other methods will throw an `IOError` when called.

View File

@ -32,10 +32,12 @@ typealias IOVector = iovec
/// - parameters:
/// - protocolFamily: The protocol family to use (usually `AF_INET6` or `AF_INET`).
/// - type: The type of the socket to create.
/// - protocolSubtype: The subtype of the protocol, corresponding to the `protocol`
/// argument to the socket syscall. Defaults to 0.
/// - setNonBlocking: Set non-blocking mode on the socket.
/// - throws: An `IOError` if creation of the socket failed.
init(protocolFamily: NIOBSDSocket.ProtocolFamily, type: NIOBSDSocket.SocketType, setNonBlocking: Bool = false) throws {
let sock = try BaseSocket.makeSocket(protocolFamily: protocolFamily, type: type, setNonBlocking: setNonBlocking)
init(protocolFamily: NIOBSDSocket.ProtocolFamily, type: NIOBSDSocket.SocketType, protocolSubtype: Int = 0, setNonBlocking: Bool = false) throws {
let sock = try BaseSocket.makeSocket(protocolFamily: protocolFamily, type: type, protocolSubtype: protocolSubtype, setNonBlocking: setNonBlocking)
try super.init(socket: sock)
}
@ -249,12 +251,12 @@ typealias IOVector = iovec
return try NIOBSDSocket.recvmsg(socket: fd, msgHdr: messageHeader, flags: 0)
}
}
// Only look at the control bytes if all is good.
if case .processed = result {
controlBytes.receivedControlMessages = UnsafeControlMessageCollection(messageHeader: messageHeader)
}
return result
}
}

View File

@ -50,8 +50,15 @@ extension ByteBuffer {
final class SocketChannel: BaseStreamSocketChannel<Socket> {
private var connectTimeout: TimeAmount? = nil
init(eventLoop: SelectableEventLoop, protocolFamily: NIOBSDSocket.ProtocolFamily) throws {
let socket = try Socket(protocolFamily: protocolFamily, type: .stream, setNonBlocking: true)
init(eventLoop: SelectableEventLoop, protocolFamily: NIOBSDSocket.ProtocolFamily, enableMPTCP: Bool = false) throws {
var protocolSubtype = 0
if enableMPTCP {
guard let subtype = NIOBSDSocket.mptcpProtocolSubtype else {
throw ChannelError.operationUnsupported
}
protocolSubtype = subtype
}
let socket = try Socket(protocolFamily: protocolFamily, type: .stream, protocolSubtype: protocolSubtype, setNonBlocking: true)
try super.init(socket: socket, parent: nil, eventLoop: eventLoop, recvAllocator: AdaptiveRecvByteBufferAllocator())
}
@ -154,8 +161,15 @@ final class ServerSocketChannel: BaseSocketChannel<ServerSocket> {
// This is `Channel` API so must be thread-safe.
override public var isWritable: Bool { return false }
convenience init(eventLoop: SelectableEventLoop, group: EventLoopGroup, protocolFamily: NIOBSDSocket.ProtocolFamily) throws {
try self.init(serverSocket: try ServerSocket(protocolFamily: protocolFamily, setNonBlocking: true), eventLoop: eventLoop, group: group)
convenience init(eventLoop: SelectableEventLoop, group: EventLoopGroup, protocolFamily: NIOBSDSocket.ProtocolFamily, enableMPTCP: Bool = false) throws {
var protocolSubtype = 0
if enableMPTCP {
guard let subtype = NIOBSDSocket.mptcpProtocolSubtype else {
throw ChannelError.operationUnsupported
}
protocolSubtype = subtype
}
try self.init(serverSocket: try ServerSocket(protocolFamily: protocolFamily, protocolSubtype: protocolSubtype, setNonBlocking: true), eventLoop: eventLoop, group: group)
}
init(serverSocket: ServerSocket, eventLoop: SelectableEventLoop, group: EventLoopGroup) throws {

View File

@ -2,7 +2,7 @@
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2021 Apple Inc. and the SwiftNIO project authors
// Copyright (c) 2017-2022 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
@ -54,6 +54,7 @@ extension SocketChannelTest {
("testWeAreInterestedInReadEOFWhenChannelIsConnectedOnTheServerSide", testWeAreInterestedInReadEOFWhenChannelIsConnectedOnTheServerSide),
("testWeAreInterestedInReadEOFWhenChannelIsConnectedOnTheClientSide", testWeAreInterestedInReadEOFWhenChannelIsConnectedOnTheClientSide),
("testServerClosesTheConnectionImmediately", testServerClosesTheConnectionImmediately),
("testSimpleMPTCP", testSimpleMPTCP),
]
}
}

View File

@ -541,7 +541,7 @@ public final class SocketChannelTest : XCTestCase {
})
XCTAssertNoThrow(try socket.close())
}
func testInstantTCPConnectionResetThrowsError() throws {
#if !os(Linux) && !os(Android)
// This test checks that we correctly fail with an error rather than
@ -590,10 +590,10 @@ public final class SocketChannelTest : XCTestCase {
XCTAssertNoThrow(try clientSocket.setOption(level: .socket, name: .so_linger, value: linger(l_onoff: 1, l_linger: 0)))
XCTAssertNoThrow(try clientSocket.connect(to: serverChannel.localAddress!))
XCTAssertNoThrow(try clientSocket.close())
// Trigger accept() in the server
serverChannel.read()
// Wait for the server to have something
XCTAssertThrowsError(try serverPromise.futureResult.wait()) { error in
XCTAssert(error is NIOFcntlFailedError)
@ -857,13 +857,13 @@ public final class SocketChannelTest : XCTestCase {
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}
class WaitForChannelInactiveHandler: ChannelInboundHandler {
typealias InboundIn = Never
typealias OutboundOut = ByteBuffer
let channelInactivePromise: EventLoopPromise<Void>
init(channelInactivePromise: EventLoopPromise<Void>) {
self.channelInactivePromise = channelInactivePromise
}
@ -873,7 +873,7 @@ public final class SocketChannelTest : XCTestCase {
buffer.writeString(String(repeating: "x", count: 517))
context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil)
}
func channelInactive(context: ChannelHandlerContext) {
self.channelInactivePromise.succeed(())
context.fireChannelInactive()
@ -905,6 +905,44 @@ public final class SocketChannelTest : XCTestCase {
g.wait()
XCTAssertNoThrow(try serverSocket.close())
}
func testSimpleMPTCP() throws {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try group.syncShutdownGracefully()) }
let serverChannel: Channel
do {
serverChannel = try ServerBootstrap(group: group)
.enableMPTCP(true)
.bind(host: "127.0.0.1", port: 0)
.wait()
} catch let error as IOError {
// Older Linux kernel versions don't support MPTCP, which is fine.
if error.errnoCode != EINVAL && error.errnoCode != EPROTONOSUPPORT {
XCTFail("Unexpected error: \(error)")
}
return
}
let clientChannel = try assertNoThrowWithValue(ClientBootstrap(group: group)
.enableMPTCP(true)
.connect(to: serverChannel.localAddress!)
.wait())
do {
let serverInfo = try (serverChannel as? SocketOptionProvider)?.getMPTCPInfo().wait()
let clientInfo = try (clientChannel as? SocketOptionProvider)?.getMPTCPInfo().wait()
XCTAssertNotNil(serverInfo)
XCTAssertNotNil(clientInfo)
} catch let error as IOError {
// Some Linux kernel versions do support MPTCP but don't support the MPTCP_INFO
// option.
XCTAssertEqual(error.errnoCode, EOPNOTSUPP, "Unexpected error: \(error)")
return
}
}
}
class DropAllReadsOnTheFloorHandler: ChannelDuplexHandler {