diff --git a/Sources/NIOPosix/PendingDatagramWritesManager.swift b/Sources/NIOPosix/PendingDatagramWritesManager.swift index 24d06d3e..315ec033 100644 --- a/Sources/NIOPosix/PendingDatagramWritesManager.swift +++ b/Sources/NIOPosix/PendingDatagramWritesManager.swift @@ -381,17 +381,9 @@ extension PendingDatagramWritesState { /// the availability of the functions. final class PendingDatagramWritesManager: PendingWritesManager { - private var bufferPool: Pool - - /// Storage for mmsghdr structures. Only present on Linux because Darwin does not support - /// gathering datagram writes. - private var msgs: UnsafeMutableBufferPointer - - /// Storage for sockaddr structures. Only present on Linux because Darwin does not support gathering - /// writes. - private var addresses: UnsafeMutableBufferPointer - - private var controlMessageStorage: UnsafeControlMessageStorage + private let bufferPool: Pool + private let msgBufferPool: Pool + private let controlMessageStorage: UnsafeControlMessageStorage private var state = PendingDatagramWritesState() @@ -411,13 +403,9 @@ final class PendingDatagramWritesManager: PendingWritesManager { /// - msgs: A pre-allocated array of `MMsgHdr` elements /// - addresses: A pre-allocated array of `sockaddr_storage` elements /// - controlMessageStorage: Pre-allocated memory for storing cmsghdr data during a vector write operation. - init(bufferPool: Pool, - msgs: UnsafeMutableBufferPointer, - addresses: UnsafeMutableBufferPointer, - controlMessageStorage: UnsafeControlMessageStorage) { + init(bufferPool: Pool, msgBufferPool: Pool, controlMessageStorage: UnsafeControlMessageStorage) { self.bufferPool = bufferPool - self.msgs = msgs - self.addresses = addresses + self.msgBufferPool = msgBufferPool self.controlMessageStorage = controlMessageStorage } @@ -618,13 +606,19 @@ final class PendingDatagramWritesManager: PendingWritesManager { private func triggerVectorBufferWrite(vectorWriteOperation: (UnsafeMutableBufferPointer) throws -> IOResult) throws -> OneWriteOperationResult { assert(self.state.isFlushPending && self.isOpen && !self.state.isEmpty, "illegal state for vector datagram write operation: flushPending: \(self.state.isFlushPending), isOpen: \(self.isOpen), empty: \(self.state.isEmpty)") - return self.didWrite(try doPendingDatagramWriteVectorOperation(pending: self.state, - bufferPool: self.bufferPool, - msgs: self.msgs, - addresses: self.addresses, - controlMessageStorage: self.controlMessageStorage, - { try vectorWriteOperation($0) }), - messages: self.msgs) + + let msgBuffer = self.msgBufferPool.get() + defer { self.msgBufferPool.put(msgBuffer) } + + return try msgBuffer.withUnsafePointers { msgs, addresses in + return self.didWrite(try doPendingDatagramWriteVectorOperation(pending: self.state, + bufferPool: self.bufferPool, + msgs: msgs, + addresses: addresses, + controlMessageStorage: self.controlMessageStorage, + { try vectorWriteOperation($0) }), + messages: msgs) + } } private func fulfillPromise(_ promise: PendingDatagramWritesState.DatagramWritePromiseFiller?) { diff --git a/Sources/NIOPosix/Pool.swift b/Sources/NIOPosix/Pool.swift index 47f61214..56999c78 100644 --- a/Sources/NIOPosix/Pool.swift +++ b/Sources/NIOPosix/Pool.swift @@ -202,3 +202,119 @@ extension Int { self = (self + alignmentGuide) & (~alignmentGuide) } } + +struct PooledMsgBuffer: PoolElement { + + private typealias MemorySentinel = UInt32 + private static let sentinelValue = MemorySentinel(0xdeadbeef) + + private struct PooledMsgBufferHead { + let count: Int + let spaceForMsgHdrs: Int + let spaceForAddresses: Int + + init(count: Int) { + var spaceForMsgHdrs = MemoryLayout.stride * count + spaceForMsgHdrs.roundUpToAlignment(for: sockaddr_storage.self) + + var spaceForAddress = MemoryLayout.stride * count + spaceForAddress.roundUpToAlignment(for: MemorySentinel.self) + + self.count = count + self.spaceForMsgHdrs = spaceForMsgHdrs + self.spaceForAddresses = spaceForAddress + } + + var totalByteCount: Int { + self.spaceForMsgHdrs + self.spaceForAddresses + MemoryLayout.size + } + + var msgHdrsOffset: Int { + 0 + } + + var addressesOffset: Int { + self.spaceForMsgHdrs + } + + var memorySentinelOffset: Int { + return self.spaceForMsgHdrs + self.spaceForAddresses + } + } + + private class BackingStorage: ManagedBuffer { + static func create(count: Int) -> Self { + let head = PooledMsgBufferHead(count: count) + + let baseStorage = Self.create(minimumCapacity: head.totalByteCount) { _ in + head + } + + let storage = unsafeDowncast(baseStorage, to: Self.self) + storage.withUnsafeMutablePointers { headPointer, tailPointer in + UnsafeRawPointer(tailPointer + headPointer.pointee.msgHdrsOffset).bindMemory(to: MMsgHdr.self, capacity: count) + UnsafeRawPointer(tailPointer + headPointer.pointee.addressesOffset).bindMemory(to: sockaddr_storage.self, capacity: count) + UnsafeRawPointer(tailPointer + headPointer.pointee.memorySentinelOffset).bindMemory(to: MemorySentinel.self, capacity: 1) + } + + return storage + } + + func withUnsafeMutableTypedPointers( + _ body: (UnsafeMutableBufferPointer, UnsafeMutableBufferPointer, UnsafeMutablePointer) throws -> ReturnType + ) rethrows -> ReturnType { + return try self.withUnsafeMutablePointers { headPointer, tailPointer in + let msgHdrsPointer = UnsafeMutableRawPointer(tailPointer + headPointer.pointee.msgHdrsOffset).assumingMemoryBound(to: MMsgHdr.self) + let addressesPointer = UnsafeMutableRawPointer(tailPointer + headPointer.pointee.addressesOffset).assumingMemoryBound(to: sockaddr_storage.self) + let sentinelPointer = UnsafeMutableRawPointer(tailPointer + headPointer.pointee.memorySentinelOffset).assumingMemoryBound(to: MemorySentinel.self) + + let msgHdrsBufferPointer = UnsafeMutableBufferPointer( + start: msgHdrsPointer, count: headPointer.pointee.count + ) + let addressesBufferPointer = UnsafeMutableBufferPointer( + start: addressesPointer, count: headPointer.pointee.count + ) + return try body(msgHdrsBufferPointer, addressesBufferPointer, sentinelPointer) + } + } + } + + private func validateSentinel() { + self.storage.withUnsafeMutableTypedPointers { _, _, sentinelPointer in + precondition(sentinelPointer.pointee == Self.sentinelValue, "Detected memory handling error!") + } + } + + private var storage: BackingStorage + + init() { + self.storage = .create(count: Socket.writevLimitIOVectors) + self.storage.withUnsafeMutableTypedPointers { _, _, sentinelPointer in + sentinelPointer.pointee = Self.sentinelValue + } + } + + func evictedFromPool() { + self.validateSentinel() + } + + func withUnsafePointers( + _ body: (UnsafeMutableBufferPointer, UnsafeMutableBufferPointer) throws -> ReturnValue + ) rethrows -> ReturnValue { + defer { + self.validateSentinel() + } + return try self.storage.withUnsafeMutableTypedPointers { msgs, addresses, _ in + return try body(msgs, addresses) + } + } + + func withUnsafePointersWithStorageManagement( + _ body: (UnsafeMutableBufferPointer, UnsafeMutableBufferPointer, Unmanaged) throws -> ReturnValue + ) rethrows -> ReturnValue { + let storageRef: Unmanaged = Unmanaged.passUnretained(self.storage) + return try self.storage.withUnsafeMutableTypedPointers { msgs, addresses, _ in + try body(msgs, addresses, storageRef) + } + } +} diff --git a/Sources/NIOPosix/SelectableEventLoop.swift b/Sources/NIOPosix/SelectableEventLoop.swift index 987098b5..cab345cb 100644 --- a/Sources/NIOPosix/SelectableEventLoop.swift +++ b/Sources/NIOPosix/SelectableEventLoop.swift @@ -103,11 +103,8 @@ internal final class SelectableEventLoop: EventLoop { private var externalState: ExternalState = .open // protected by externalStateLock let bufferPool: Pool + let msgBufferPool: Pool - // Used for gathering UDP writes. - let msgs: UnsafeMutableBufferPointer - let addresses: UnsafeMutableBufferPointer - // Used for UDP control messages. private(set) var controlMessageStorage: UnsafeControlMessageStorage @@ -187,8 +184,7 @@ Further information: self._selector = selector self.thread = thread self.bufferPool = Pool(maxSize: 16) - self.msgs = UnsafeMutableBufferPointer.allocate(capacity: Socket.writevLimitIOVectors) - self.addresses = UnsafeMutableBufferPointer.allocate(capacity: Socket.writevLimitIOVectors) + self.msgBufferPool = Pool(maxSize: 16) self.controlMessageStorage = UnsafeControlMessageStorage.allocate(msghdrCount: Socket.writevLimitIOVectors) // We will process 4096 tasks per while loop. self.tasksCopy.reserveCapacity(4096) @@ -206,8 +202,6 @@ Further information: "illegal internal state on deinit: \(self.internalState)") assert(self.externalState == .resourcesReclaimed, "illegal external state on shutdown: \(self.externalState)") - self.msgs.deallocate() - self.addresses.deallocate() self.controlMessageStorage.deallocate() } diff --git a/Sources/NIOPosix/SocketChannel.swift b/Sources/NIOPosix/SocketChannel.swift index d7d57379..6a0f3201 100644 --- a/Sources/NIOPosix/SocketChannel.swift +++ b/Sources/NIOPosix/SocketChannel.swift @@ -424,8 +424,7 @@ final class DatagramChannel: BaseSocketChannel { } self.pendingWrites = PendingDatagramWritesManager(bufferPool: eventLoop.bufferPool, - msgs: eventLoop.msgs, - addresses: eventLoop.addresses, + msgBufferPool: eventLoop.msgBufferPool, controlMessageStorage: eventLoop.controlMessageStorage) try super.init( @@ -441,8 +440,7 @@ final class DatagramChannel: BaseSocketChannel { self.vectorReadManager = nil try socket.setNonBlocking() self.pendingWrites = PendingDatagramWritesManager(bufferPool: eventLoop.bufferPool, - msgs: eventLoop.msgs, - addresses: eventLoop.addresses, + msgBufferPool: eventLoop.msgBufferPool, controlMessageStorage: eventLoop.controlMessageStorage) try super.init( socket: socket, @@ -816,7 +814,6 @@ final class DatagramChannel: BaseSocketChannel { destinationPtr: destinationPtr, destinationSize: destinationSize, controlBytes: controlBytes.validControlBytes) - }, vectorWriteOperation: { msgs in return try self.socket.sendmmsg(msgs: msgs) diff --git a/Tests/NIOPosixTests/PendingDatagramWritesManagerTests.swift b/Tests/NIOPosixTests/PendingDatagramWritesManagerTests.swift index 67e0e6de..6f3cc458 100644 --- a/Tests/NIOPosixTests/PendingDatagramWritesManagerTests.swift +++ b/Tests/NIOPosixTests/PendingDatagramWritesManagerTests.swift @@ -48,30 +48,22 @@ private extension SocketAddress { class PendingDatagramWritesManagerTests: XCTestCase { private func withPendingDatagramWritesManager(_ body: (PendingDatagramWritesManager) throws -> Void) rethrows { let bufferPool = Pool(maxSize: 16) - var msgs: [MMsgHdr] = Array(repeating: MMsgHdr(), count: Socket.writevLimitIOVectors + 1) - var addresses: [sockaddr_storage] = Array(repeating: sockaddr_storage(), count: Socket.writevLimitIOVectors + 1) + let msgBufferPool = Pool(maxSize: 16) var controlMessageStorage = UnsafeControlMessageStorage.allocate(msghdrCount: Socket.writevLimitIOVectors) defer { controlMessageStorage.deallocate() } + let pwm = NIOPosix.PendingDatagramWritesManager(bufferPool: bufferPool, msgBufferPool: msgBufferPool, controlMessageStorage: controlMessageStorage) - try msgs.withUnsafeMutableBufferPointer { msgs in - try addresses.withUnsafeMutableBufferPointer { addresses in - let pwm = NIOPosix.PendingDatagramWritesManager(bufferPool: bufferPool, - msgs: msgs, - addresses: addresses, - controlMessageStorage: controlMessageStorage) - XCTAssertTrue(pwm.isEmpty) - XCTAssertTrue(pwm.isOpen) - XCTAssertFalse(pwm.isFlushPending) - XCTAssertTrue(pwm.isWritable) + XCTAssertTrue(pwm.isEmpty) + XCTAssertTrue(pwm.isOpen) + XCTAssertFalse(pwm.isFlushPending) + XCTAssertTrue(pwm.isWritable) - try body(pwm) + try body(pwm) - XCTAssertTrue(pwm.isEmpty) - XCTAssertFalse(pwm.isFlushPending) - } - } + XCTAssertTrue(pwm.isEmpty) + XCTAssertFalse(pwm.isFlushPending) } /// A frankenstein testing monster. It asserts that for `PendingDatagramWritesManager` `pwm` and `EventLoopPromises` `promises`