Buffer pool for message headers and addresses. (#2378)

* Pool buffers for messages and addresses.

* Revert changes related to controlMessageStorage

* Cosmetic fix.

---------

Co-authored-by: Cory Benfield <lukasa@apple.com>
This commit is contained in:
ser 2023-03-10 19:06:02 +03:00 committed by GitHub
parent ef7dc666e8
commit 8193940b9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 147 additions and 54 deletions

View File

@ -381,17 +381,9 @@ extension PendingDatagramWritesState {
/// the availability of the functions.
final class PendingDatagramWritesManager: PendingWritesManager {
private var bufferPool: Pool<PooledBuffer>
/// Storage for mmsghdr structures. Only present on Linux because Darwin does not support
/// gathering datagram writes.
private var msgs: UnsafeMutableBufferPointer<MMsgHdr>
/// Storage for sockaddr structures. Only present on Linux because Darwin does not support gathering
/// writes.
private var addresses: UnsafeMutableBufferPointer<sockaddr_storage>
private var controlMessageStorage: UnsafeControlMessageStorage
private let bufferPool: Pool<PooledBuffer>
private let msgBufferPool: Pool<PooledMsgBuffer>
private let controlMessageStorage: UnsafeControlMessageStorage
private var state = PendingDatagramWritesState()
@ -411,13 +403,9 @@ final class PendingDatagramWritesManager: PendingWritesManager {
/// - msgs: A pre-allocated array of `MMsgHdr` elements
/// - addresses: A pre-allocated array of `sockaddr_storage` elements
/// - controlMessageStorage: Pre-allocated memory for storing cmsghdr data during a vector write operation.
init(bufferPool: Pool<PooledBuffer>,
msgs: UnsafeMutableBufferPointer<MMsgHdr>,
addresses: UnsafeMutableBufferPointer<sockaddr_storage>,
controlMessageStorage: UnsafeControlMessageStorage) {
init(bufferPool: Pool<PooledBuffer>, msgBufferPool: Pool<PooledMsgBuffer>, controlMessageStorage: UnsafeControlMessageStorage) {
self.bufferPool = bufferPool
self.msgs = msgs
self.addresses = addresses
self.msgBufferPool = msgBufferPool
self.controlMessageStorage = controlMessageStorage
}
@ -618,13 +606,19 @@ final class PendingDatagramWritesManager: PendingWritesManager {
private func triggerVectorBufferWrite(vectorWriteOperation: (UnsafeMutableBufferPointer<MMsgHdr>) throws -> IOResult<Int>) throws -> OneWriteOperationResult {
assert(self.state.isFlushPending && self.isOpen && !self.state.isEmpty,
"illegal state for vector datagram write operation: flushPending: \(self.state.isFlushPending), isOpen: \(self.isOpen), empty: \(self.state.isEmpty)")
return self.didWrite(try doPendingDatagramWriteVectorOperation(pending: self.state,
bufferPool: self.bufferPool,
msgs: self.msgs,
addresses: self.addresses,
controlMessageStorage: self.controlMessageStorage,
{ try vectorWriteOperation($0) }),
messages: self.msgs)
let msgBuffer = self.msgBufferPool.get()
defer { self.msgBufferPool.put(msgBuffer) }
return try msgBuffer.withUnsafePointers { msgs, addresses in
return self.didWrite(try doPendingDatagramWriteVectorOperation(pending: self.state,
bufferPool: self.bufferPool,
msgs: msgs,
addresses: addresses,
controlMessageStorage: self.controlMessageStorage,
{ try vectorWriteOperation($0) }),
messages: msgs)
}
}
private func fulfillPromise(_ promise: PendingDatagramWritesState.DatagramWritePromiseFiller?) {

View File

@ -202,3 +202,119 @@ extension Int {
self = (self + alignmentGuide) & (~alignmentGuide)
}
}
struct PooledMsgBuffer: PoolElement {
private typealias MemorySentinel = UInt32
private static let sentinelValue = MemorySentinel(0xdeadbeef)
private struct PooledMsgBufferHead {
let count: Int
let spaceForMsgHdrs: Int
let spaceForAddresses: Int
init(count: Int) {
var spaceForMsgHdrs = MemoryLayout<MMsgHdr>.stride * count
spaceForMsgHdrs.roundUpToAlignment(for: sockaddr_storage.self)
var spaceForAddress = MemoryLayout<sockaddr_storage>.stride * count
spaceForAddress.roundUpToAlignment(for: MemorySentinel.self)
self.count = count
self.spaceForMsgHdrs = spaceForMsgHdrs
self.spaceForAddresses = spaceForAddress
}
var totalByteCount: Int {
self.spaceForMsgHdrs + self.spaceForAddresses + MemoryLayout<MemorySentinel>.size
}
var msgHdrsOffset: Int {
0
}
var addressesOffset: Int {
self.spaceForMsgHdrs
}
var memorySentinelOffset: Int {
return self.spaceForMsgHdrs + self.spaceForAddresses
}
}
private class BackingStorage: ManagedBuffer<PooledMsgBufferHead, UInt8> {
static func create(count: Int) -> Self {
let head = PooledMsgBufferHead(count: count)
let baseStorage = Self.create(minimumCapacity: head.totalByteCount) { _ in
head
}
let storage = unsafeDowncast(baseStorage, to: Self.self)
storage.withUnsafeMutablePointers { headPointer, tailPointer in
UnsafeRawPointer(tailPointer + headPointer.pointee.msgHdrsOffset).bindMemory(to: MMsgHdr.self, capacity: count)
UnsafeRawPointer(tailPointer + headPointer.pointee.addressesOffset).bindMemory(to: sockaddr_storage.self, capacity: count)
UnsafeRawPointer(tailPointer + headPointer.pointee.memorySentinelOffset).bindMemory(to: MemorySentinel.self, capacity: 1)
}
return storage
}
func withUnsafeMutableTypedPointers<ReturnType>(
_ body: (UnsafeMutableBufferPointer<MMsgHdr>, UnsafeMutableBufferPointer<sockaddr_storage>, UnsafeMutablePointer<MemorySentinel>) throws -> ReturnType
) rethrows -> ReturnType {
return try self.withUnsafeMutablePointers { headPointer, tailPointer in
let msgHdrsPointer = UnsafeMutableRawPointer(tailPointer + headPointer.pointee.msgHdrsOffset).assumingMemoryBound(to: MMsgHdr.self)
let addressesPointer = UnsafeMutableRawPointer(tailPointer + headPointer.pointee.addressesOffset).assumingMemoryBound(to: sockaddr_storage.self)
let sentinelPointer = UnsafeMutableRawPointer(tailPointer + headPointer.pointee.memorySentinelOffset).assumingMemoryBound(to: MemorySentinel.self)
let msgHdrsBufferPointer = UnsafeMutableBufferPointer(
start: msgHdrsPointer, count: headPointer.pointee.count
)
let addressesBufferPointer = UnsafeMutableBufferPointer(
start: addressesPointer, count: headPointer.pointee.count
)
return try body(msgHdrsBufferPointer, addressesBufferPointer, sentinelPointer)
}
}
}
private func validateSentinel() {
self.storage.withUnsafeMutableTypedPointers { _, _, sentinelPointer in
precondition(sentinelPointer.pointee == Self.sentinelValue, "Detected memory handling error!")
}
}
private var storage: BackingStorage
init() {
self.storage = .create(count: Socket.writevLimitIOVectors)
self.storage.withUnsafeMutableTypedPointers { _, _, sentinelPointer in
sentinelPointer.pointee = Self.sentinelValue
}
}
func evictedFromPool() {
self.validateSentinel()
}
func withUnsafePointers<ReturnValue>(
_ body: (UnsafeMutableBufferPointer<MMsgHdr>, UnsafeMutableBufferPointer<sockaddr_storage>) throws -> ReturnValue
) rethrows -> ReturnValue {
defer {
self.validateSentinel()
}
return try self.storage.withUnsafeMutableTypedPointers { msgs, addresses, _ in
return try body(msgs, addresses)
}
}
func withUnsafePointersWithStorageManagement<ReturnValue>(
_ body: (UnsafeMutableBufferPointer<MMsgHdr>, UnsafeMutableBufferPointer<sockaddr_storage>, Unmanaged<AnyObject>) throws -> ReturnValue
) rethrows -> ReturnValue {
let storageRef: Unmanaged<AnyObject> = Unmanaged.passUnretained(self.storage)
return try self.storage.withUnsafeMutableTypedPointers { msgs, addresses, _ in
try body(msgs, addresses, storageRef)
}
}
}

View File

@ -103,11 +103,8 @@ internal final class SelectableEventLoop: EventLoop {
private var externalState: ExternalState = .open // protected by externalStateLock
let bufferPool: Pool<PooledBuffer>
let msgBufferPool: Pool<PooledMsgBuffer>
// Used for gathering UDP writes.
let msgs: UnsafeMutableBufferPointer<MMsgHdr>
let addresses: UnsafeMutableBufferPointer<sockaddr_storage>
// Used for UDP control messages.
private(set) var controlMessageStorage: UnsafeControlMessageStorage
@ -187,8 +184,7 @@ Further information:
self._selector = selector
self.thread = thread
self.bufferPool = Pool<PooledBuffer>(maxSize: 16)
self.msgs = UnsafeMutableBufferPointer<MMsgHdr>.allocate(capacity: Socket.writevLimitIOVectors)
self.addresses = UnsafeMutableBufferPointer<sockaddr_storage>.allocate(capacity: Socket.writevLimitIOVectors)
self.msgBufferPool = Pool<PooledMsgBuffer>(maxSize: 16)
self.controlMessageStorage = UnsafeControlMessageStorage.allocate(msghdrCount: Socket.writevLimitIOVectors)
// We will process 4096 tasks per while loop.
self.tasksCopy.reserveCapacity(4096)
@ -206,8 +202,6 @@ Further information:
"illegal internal state on deinit: \(self.internalState)")
assert(self.externalState == .resourcesReclaimed,
"illegal external state on shutdown: \(self.externalState)")
self.msgs.deallocate()
self.addresses.deallocate()
self.controlMessageStorage.deallocate()
}

View File

@ -424,8 +424,7 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
}
self.pendingWrites = PendingDatagramWritesManager(bufferPool: eventLoop.bufferPool,
msgs: eventLoop.msgs,
addresses: eventLoop.addresses,
msgBufferPool: eventLoop.msgBufferPool,
controlMessageStorage: eventLoop.controlMessageStorage)
try super.init(
@ -441,8 +440,7 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
self.vectorReadManager = nil
try socket.setNonBlocking()
self.pendingWrites = PendingDatagramWritesManager(bufferPool: eventLoop.bufferPool,
msgs: eventLoop.msgs,
addresses: eventLoop.addresses,
msgBufferPool: eventLoop.msgBufferPool,
controlMessageStorage: eventLoop.controlMessageStorage)
try super.init(
socket: socket,
@ -816,7 +814,6 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
destinationPtr: destinationPtr,
destinationSize: destinationSize,
controlBytes: controlBytes.validControlBytes)
},
vectorWriteOperation: { msgs in
return try self.socket.sendmmsg(msgs: msgs)

View File

@ -48,30 +48,22 @@ private extension SocketAddress {
class PendingDatagramWritesManagerTests: XCTestCase {
private func withPendingDatagramWritesManager(_ body: (PendingDatagramWritesManager) throws -> Void) rethrows {
let bufferPool = Pool<PooledBuffer>(maxSize: 16)
var msgs: [MMsgHdr] = Array(repeating: MMsgHdr(), count: Socket.writevLimitIOVectors + 1)
var addresses: [sockaddr_storage] = Array(repeating: sockaddr_storage(), count: Socket.writevLimitIOVectors + 1)
let msgBufferPool = Pool<PooledMsgBuffer>(maxSize: 16)
var controlMessageStorage = UnsafeControlMessageStorage.allocate(msghdrCount: Socket.writevLimitIOVectors)
defer {
controlMessageStorage.deallocate()
}
let pwm = NIOPosix.PendingDatagramWritesManager(bufferPool: bufferPool, msgBufferPool: msgBufferPool, controlMessageStorage: controlMessageStorage)
try msgs.withUnsafeMutableBufferPointer { msgs in
try addresses.withUnsafeMutableBufferPointer { addresses in
let pwm = NIOPosix.PendingDatagramWritesManager(bufferPool: bufferPool,
msgs: msgs,
addresses: addresses,
controlMessageStorage: controlMessageStorage)
XCTAssertTrue(pwm.isEmpty)
XCTAssertTrue(pwm.isOpen)
XCTAssertFalse(pwm.isFlushPending)
XCTAssertTrue(pwm.isWritable)
XCTAssertTrue(pwm.isEmpty)
XCTAssertTrue(pwm.isOpen)
XCTAssertFalse(pwm.isFlushPending)
XCTAssertTrue(pwm.isWritable)
try body(pwm)
try body(pwm)
XCTAssertTrue(pwm.isEmpty)
XCTAssertFalse(pwm.isFlushPending)
}
}
XCTAssertTrue(pwm.isEmpty)
XCTAssertFalse(pwm.isFlushPending)
}
/// A frankenstein testing monster. It asserts that for `PendingDatagramWritesManager` `pwm` and `EventLoopPromises` `promises`