Use sendmsg rather than sendto to send UDP data. (#1587)

Motivation:

sendmsg is more powerful - this is paving the way for
Explicit Congestion Notifications to be sent.

Modifications:

Replace the sendto system call with sendmsg up to DatagramChannel.

Result:

Sendmsg will be used to send UDP data.
This commit is contained in:
Peter Adams 2020-07-10 13:41:13 +01:00 committed by GitHub
parent 97450139d5
commit ee8acd9b3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 78 additions and 74 deletions

View File

@ -377,6 +377,10 @@ protocol _BSDSocketProtocol {
length len: size_t) throws -> IOResult<size_t>
static func recvmsg(descriptor: CInt, msgHdr: UnsafeMutablePointer<msghdr>, flags: CInt) throws -> IOResult<ssize_t>
static func sendmsg(descriptor: CInt,
msgHdr: UnsafePointer<msghdr>,
flags: CInt) throws -> IOResult<ssize_t>
static func send(socket s: NIOBSDSocket.Handle,
buffer buf: UnsafeRawPointer,
@ -388,15 +392,6 @@ protocol _BSDSocketProtocol {
option_value optval: UnsafeRawPointer,
option_len optlen: socklen_t) throws
// NOTE: this should return a `ssize_t`, however, that is not a standard
// type, and defining that type is difficult. Opt to return a `size_t`
// which is the same size, but is unsigned.
static func sendto(socket s: NIOBSDSocket.Handle,
buffer buf: UnsafeRawPointer,
length len: size_t,
dest_addr to: UnsafePointer<sockaddr>,
dest_len tolen: socklen_t) throws -> IOResult<size_t>
static func shutdown(socket: NIOBSDSocket.Handle, how: Shutdown) throws
static func socket(domain af: NIOBSDSocket.ProtocolFamily,

View File

@ -87,6 +87,12 @@ extension NIOBSDSocket {
static func recvmsg(descriptor: CInt, msgHdr: UnsafeMutablePointer<msghdr>, flags: CInt) throws -> IOResult<ssize_t> {
return try Posix.recvmsg(descriptor: descriptor, msgHdr: msgHdr, flags: flags)
}
static func sendmsg(descriptor: CInt,
msgHdr: UnsafePointer<msghdr>,
flags: CInt) throws -> IOResult<ssize_t> {
return try Posix.sendmsg(descriptor: descriptor, msgHdr: msgHdr, flags: flags)
}
static func send(socket s: NIOBSDSocket.Handle,
buffer buf: UnsafeRawPointer,
@ -106,21 +112,6 @@ extension NIOBSDSocket {
optionLen: optlen)
}
// NOTE: this should return a `ssize_t`, however, that is not a standard
// type, and defining that type is difficult. Opt to return a `size_t`
// which is the same size, but is unsigned.
static func sendto(socket s: NIOBSDSocket.Handle,
buffer buf: UnsafeRawPointer,
length len: size_t,
dest_addr to: UnsafePointer<sockaddr>,
dest_len tolen: socklen_t) throws -> IOResult<size_t> {
return try Posix.sendto(descriptor: s,
pointer: buf,
size: len,
destinationPtr: to,
destinationSize: tolen)
}
static func shutdown(socket: NIOBSDSocket.Handle, how: Shutdown) throws {
return try Posix.shutdown(descriptor: socket, how: how)
}

View File

@ -164,6 +164,13 @@ extension NIOBSDSocket {
static func recvmsg(descriptor: CInt, msgHdr: UnsafeMutablePointer<msghdr>, flags: CInt) throws -> IOResult<ssize_t> {
fatalError("recvmsg not yet implemented on Windows")
}
@inline(never)
static func sendmsg(descriptor: CInt,
msgHdr: UnsafePointer<msghdr>,
flags: CInt) throws -> IOResult<ssize_t> {
fatalError("recvmsg not yet implemented on Windows")
}
@inline(never)
static func send(socket s: NIOBSDSocket.Handle,
@ -188,22 +195,6 @@ extension NIOBSDSocket {
}
}
// NOTE: this should return a `ssize_t`, however, that is not a standard
// type, and defining that type is difficult. Opt to return a `size_t`
// which is the same size, but is unsigned.
@inline(never)
static func sendto(socket s: NIOBSDSocket.Handle,
buffer buf: UnsafeRawPointer,
length len: size_t,
dest_addr to: UnsafePointer<sockaddr>,
dest_len tolen: socklen_t) throws -> IOResult<size_t> {
let iResult: CInt = CNIOWindows_sendto(s, buf, CInt(len), 0, to, tolen)
if iResult == SOCKET_ERROR {
throw IOError(winsock: WSAGetLastError(), reason: "sendto")
}
return .processed(size_t(iResult))
}
@inline(never)
static func shutdown(socket: NIOBSDSocket.Handle, how: Shutdown) throws {
if WinSDK.shutdown(socket, how.cValue) == SOCKET_ERROR {

View File

@ -83,10 +83,6 @@ final class PipePair: SocketProtocol {
}
}
func sendto(pointer: UnsafeRawBufferPointer, destinationPtr: UnsafePointer<sockaddr>, destinationSize: socklen_t) throws -> IOResult<Int> {
throw ChannelError.operationUnsupported
}
func read(pointer: UnsafeMutableRawBufferPointer) throws -> IOResult<Int> {
return try self.inputFD.withUnsafeHandle {
try Posix.read(descriptor: $0, pointer: pointer.baseAddress!, size: pointer.count)
@ -99,6 +95,13 @@ final class PipePair: SocketProtocol {
controlBytes: inout Slice<UnsafeMutableRawBufferPointer>) throws -> IOResult<Int> {
throw ChannelError.operationUnsupported
}
func sendmsg(pointer: UnsafeRawBufferPointer,
destinationPtr: UnsafePointer<sockaddr>,
destinationSize: socklen_t,
controlBytes: UnsafeMutableRawBufferPointer) throws -> IOResult<Int> {
throw ChannelError.operationUnsupported
}
func sendFile(fd: Int32, offset: Int, count: Int) throws -> IOResult<Int> {
throw ChannelError.operationUnsupported

View File

@ -158,15 +158,31 @@ typealias IOVector = iovec
/// - parameters:
/// - pointer: Pointer (and size) to the data to send.
/// - destinationPtr: The destination to which the data should be sent.
/// - returns: The `IOResult` which indicates how much data could be written and if the operation returned before all could be written (because the socket is in non-blocking mode).
/// - destinationSize: The size of the destination address given.
/// - controlBytes: Extra `cmsghdr` information.
/// - returns: The `IOResult` which indicates how much data could be written and if the operation returned before all could be written
/// (because the socket is in non-blocking mode).
/// - throws: An `IOError` if the operation failed.
func sendto(pointer: UnsafeRawBufferPointer, destinationPtr: UnsafePointer<sockaddr>, destinationSize: socklen_t) throws -> IOResult<Int> {
return try withUnsafeHandle {
try NIOBSDSocket.sendto(socket: $0,
buffer: UnsafeMutableRawPointer(mutating: pointer.baseAddress!),
length: pointer.count,
dest_addr: destinationPtr,
dest_len: destinationSize)
func sendmsg(pointer: UnsafeRawBufferPointer,
destinationPtr: UnsafePointer<sockaddr>,
destinationSize: socklen_t,
controlBytes: UnsafeMutableRawBufferPointer) throws -> IOResult<Int> {
// Dubious const casts - it should be OK as there is no reason why this should get mutated
// just bad const declaration below us.
var vec = iovec(iov_base: UnsafeMutableRawPointer(mutating: pointer.baseAddress!), iov_len: pointer.count)
let notConstCorrectDestinationPtr = UnsafeMutableRawPointer(mutating: destinationPtr)
return try withUnsafeHandle { handle in
return try withUnsafeMutablePointer(to: &vec) { vecPtr in
var messageHeader = msghdr(msg_name: notConstCorrectDestinationPtr,
msg_namelen: destinationSize,
msg_iov: vecPtr,
msg_iovlen: 1,
msg_control: controlBytes.baseAddress,
msg_controllen: .init(controlBytes.count),
msg_flags: 0)
return try Posix.sendmsg(descriptor: handle, msgHdr: &messageHeader, flags: 0)
}
}
}

View File

@ -616,21 +616,27 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
}
override func writeToSocket() throws -> OverallWriteResult {
let result = try self.pendingWrites.triggerAppropriateWriteOperations(scalarWriteOperation: { (ptr, destinationPtr, destinationSize) in
guard ptr.count > 0 else {
// No need to call write if the buffer is empty.
return .processed(0)
let result = try self.pendingWrites.triggerAppropriateWriteOperations(
scalarWriteOperation: { (ptr, destinationPtr, destinationSize) in
guard ptr.count > 0 else {
// No need to call write if the buffer is empty.
return .processed(0)
}
// normal write
let controlBytes = UnsafeMutableRawBufferPointer(start: nil, count: 0)
return try self.socket.sendmsg(pointer: ptr,
destinationPtr: destinationPtr,
destinationSize: destinationSize,
controlBytes: controlBytes)
},
vectorWriteOperation: { msgs in
return try self.socket.sendmmsg(msgs: msgs)
}
// normal write
return try self.socket.sendto(pointer: ptr,
destinationPtr: destinationPtr,
destinationSize: destinationSize)
}, vectorWriteOperation: { msgs in
try self.socket.sendmmsg(msgs: msgs)
})
)
return result
}
// MARK: Datagram Channel overrides not required by BaseSocketChannel
override func bind0(to address: SocketAddress, promise: EventLoopPromise<Void>?) {

View File

@ -45,14 +45,17 @@ protocol SocketProtocol: BaseSocketProtocol {
func writev(iovecs: UnsafeBufferPointer<IOVector>) throws -> IOResult<Int>
func sendto(pointer: UnsafeRawBufferPointer, destinationPtr: UnsafePointer<sockaddr>, destinationSize: socklen_t) throws -> IOResult<Int>
func read(pointer: UnsafeMutableRawBufferPointer) throws -> IOResult<Int>
func recvmsg(pointer: UnsafeMutableRawBufferPointer,
storage: inout sockaddr_storage,
storageLen: inout socklen_t,
controlBytes: inout Slice<UnsafeMutableRawBufferPointer>) throws -> IOResult<Int>
func sendmsg(pointer: UnsafeRawBufferPointer,
destinationPtr: UnsafePointer<sockaddr>,
destinationSize: socklen_t,
controlBytes: UnsafeMutableRawBufferPointer) throws -> IOResult<Int>
func sendFile(fd: Int32, offset: Int, count: Int) throws -> IOResult<Int>

View File

@ -81,8 +81,8 @@ private let sysWritev = sysWritev_wrapper
private let sysRecvFrom: @convention(c) (CInt, UnsafeMutableRawPointer?, CLong, CInt, UnsafeMutablePointer<sockaddr>?, UnsafeMutablePointer<socklen_t>?) -> CLong = recvfrom
private let sysWritev: @convention(c) (Int32, UnsafePointer<iovec>?, CInt) -> CLong = writev
#endif
private let sysSendTo: @convention(c) (CInt, UnsafeRawPointer?, CLong, CInt, UnsafePointer<sockaddr>?, socklen_t) -> CLong = sendto
private let sysRecvMsg: @convention(c) (CInt, UnsafeMutablePointer<msghdr>?, CInt) -> ssize_t = recvmsg
private let sysSendMsg: @convention(c) (CInt, UnsafePointer<msghdr>?, CInt) -> ssize_t = sendmsg
private let sysDup: @convention(c) (CInt) -> CInt = dup
private let sysGetpeername: @convention(c) (CInt, UnsafeMutablePointer<sockaddr>?, UnsafeMutablePointer<socklen_t>?) -> CInt = getpeername
private let sysGetsockname: @convention(c) (CInt, UnsafeMutablePointer<sockaddr>?, UnsafeMutablePointer<socklen_t>?) -> CInt = getsockname
@ -351,14 +351,6 @@ internal enum Posix {
}
}
@inline(never)
public static func sendto(descriptor: CInt, pointer: UnsafeRawPointer, size: size_t,
destinationPtr: UnsafePointer<sockaddr>, destinationSize: socklen_t) throws -> IOResult<Int> {
return try syscall(blocking: true) {
sysSendTo(descriptor, pointer, size, 0, destinationPtr, destinationSize)
}
}
@inline(never)
public static func read(descriptor: CInt, pointer: UnsafeMutableRawPointer, size: size_t) throws -> IOResult<ssize_t> {
return try syscall(blocking: true) {
@ -379,6 +371,13 @@ internal enum Posix {
sysRecvMsg(descriptor, msgHdr, flags)
}
}
@inline(never)
public static func sendmsg(descriptor: CInt, msgHdr: UnsafePointer<msghdr>, flags: CInt) throws -> IOResult<ssize_t> {
return try syscall(blocking: true) {
sysSendMsg(descriptor, msgHdr, flags)
}
}
@discardableResult
@inline(never)