519 lines
23 KiB
Swift
519 lines
23 KiB
Swift
//===----------------------------------------------------------------------===//
|
|
//
|
|
// This source file is part of the SwiftNIO open source project
|
|
//
|
|
// Copyright (c) 2017-2021 Apple Inc. and the SwiftNIO project authors
|
|
// Licensed under Apache License v2.0
|
|
//
|
|
// See LICENSE.txt for license information
|
|
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
|
|
//
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
//
|
|
//===----------------------------------------------------------------------===//
|
|
import NIOCore
|
|
import Atomics
|
|
|
|
private struct PendingStreamWrite {
|
|
var data: IOData
|
|
var promise: Optional<EventLoopPromise<Void>>
|
|
}
|
|
|
|
/// Does the setup required to issue a writev.
|
|
///
|
|
/// - parameters:
|
|
/// - pending: The currently pending writes.
|
|
/// - bufferPool: Pool of buffers to use for iovecs and storageRefs
|
|
/// - body: The function that actually does the vector write (usually `writev`).
|
|
/// - returns: A tuple of the number of items attempted to write and the result of the write operation.
|
|
private func doPendingWriteVectorOperation(pending: PendingStreamWritesState,
|
|
bufferPool: Pool<PooledBuffer>,
|
|
_ body: (UnsafeBufferPointer<IOVector>) throws -> IOResult<Int>) throws -> (itemCount: Int, writeResult: IOResult<Int>) {
|
|
let buffer = bufferPool.get()
|
|
defer { bufferPool.put(buffer) }
|
|
|
|
return try buffer.withUnsafePointers { iovecs, storageRefs in
|
|
// Clamp the number of writes we're willing to issue to the limit for writev.
|
|
var count = min(iovecs.count, storageRefs.count)
|
|
count = min(pending.flushedChunks, count)
|
|
|
|
// the numbers of storage refs that we need to decrease later.
|
|
var numberOfUsedStorageSlots = 0
|
|
var toWrite: Int = 0
|
|
|
|
loop: for i in 0..<count {
|
|
let p = pending[i]
|
|
switch p.data {
|
|
case .byteBuffer(let buffer):
|
|
// Must not write more than Int32.max in one go.
|
|
guard (numberOfUsedStorageSlots == 0) || (Socket.writevLimitBytes - toWrite >= buffer.readableBytes) else {
|
|
break loop
|
|
}
|
|
let toWriteForThisBuffer = min(Socket.writevLimitBytes, buffer.readableBytes)
|
|
toWrite += numericCast(toWriteForThisBuffer)
|
|
|
|
buffer.withUnsafeReadableBytesWithStorageManagement { ptr, storageRef in
|
|
storageRefs[i] = storageRef.retain()
|
|
iovecs[i] = IOVector(iov_base: UnsafeMutableRawPointer(mutating: ptr.baseAddress!), iov_len: numericCast(toWriteForThisBuffer))
|
|
}
|
|
numberOfUsedStorageSlots += 1
|
|
case .fileRegion:
|
|
assert(numberOfUsedStorageSlots != 0, "first item in doPendingWriteVectorOperation was a FileRegion")
|
|
// We found a FileRegion so stop collecting
|
|
break loop
|
|
}
|
|
}
|
|
defer {
|
|
for i in 0..<numberOfUsedStorageSlots {
|
|
storageRefs[i].release()
|
|
}
|
|
}
|
|
let result = try body(UnsafeBufferPointer(start: iovecs.baseAddress!, count: numberOfUsedStorageSlots))
|
|
/* if we hit a limit, we really wanted to write more than we have so the caller should retry us */
|
|
return (numberOfUsedStorageSlots, result)
|
|
}
|
|
}
|
|
|
|
/// The result of a single write operation, usually `write`, `sendfile` or `writev`.
|
|
internal enum OneWriteOperationResult {
|
|
/// Wrote everything asked.
|
|
case writtenCompletely
|
|
|
|
/// Wrote some portion of what was asked.
|
|
case writtenPartially
|
|
|
|
/// Could not write as doing that would have blocked.
|
|
case wouldBlock
|
|
}
|
|
|
|
/// The result of trying to write all the outstanding flushed data. That naturally includes all `ByteBuffer`s and
|
|
/// `FileRegions` and the individual writes have potentially been retried (see `WriteSpinOption`).
|
|
internal struct OverallWriteResult {
|
|
enum WriteOutcome {
|
|
/// Wrote all the data that was flushed. When receiving this result, we can unsubscribe from 'writable' notification.
|
|
case writtenCompletely
|
|
|
|
/// Could not write everything. Before attempting further writes the eventing system should send a 'writable' notification.
|
|
case couldNotWriteEverything
|
|
}
|
|
|
|
internal var writeResult: WriteOutcome
|
|
internal var writabilityChange: Bool
|
|
}
|
|
|
|
/// This holds the states of the currently pending stream writes. The core is a `MarkedCircularBuffer` which holds all the
|
|
/// writes and a mark up until the point the data is flushed.
|
|
///
|
|
/// The most important operations on this object are:
|
|
/// - `append` to add an `IOData` to the list of pending writes.
|
|
/// - `markFlushCheckpoint` which sets a flush mark on the current position of the `MarkedCircularBuffer`. All the items before the checkpoint will be written eventually.
|
|
/// - `didWrite` when a number of bytes have been written.
|
|
/// - `failAll` if for some reason all outstanding writes need to be discarded and the corresponding `EventLoopPromise` needs to be failed.
|
|
private struct PendingStreamWritesState {
|
|
private var pendingWrites = MarkedCircularBuffer<PendingStreamWrite>(initialCapacity: 16)
|
|
public private(set) var bytes: Int64 = 0
|
|
|
|
public var flushedChunks: Int {
|
|
return self.pendingWrites.markedElementIndex.map {
|
|
self.pendingWrites.distance(from: self.pendingWrites.startIndex, to: $0) + 1
|
|
} ?? 0
|
|
}
|
|
|
|
/// Subtract `bytes` from the number of outstanding bytes to write.
|
|
private mutating func subtractOutstanding(bytes: Int) {
|
|
assert(self.bytes >= bytes, "allegedly written more bytes (\(bytes)) than outstanding (\(self.bytes))")
|
|
self.bytes -= numericCast(bytes)
|
|
}
|
|
|
|
/// Indicates that the first outstanding write was written in its entirety.
|
|
///
|
|
/// - returns: The `EventLoopPromise` of the write or `nil` if none was provided. The promise needs to be fulfilled by the caller.
|
|
///
|
|
private mutating func fullyWrittenFirst() -> EventLoopPromise<Void>? {
|
|
let first = self.pendingWrites.removeFirst()
|
|
self.subtractOutstanding(bytes: first.data.readableBytes)
|
|
return first.promise
|
|
}
|
|
|
|
/// Indicates that the first outstanding object has been partially written.
|
|
///
|
|
/// - parameters:
|
|
/// - bytes: How many bytes of the item were written.
|
|
private mutating func partiallyWrittenFirst(bytes: Int) {
|
|
self.pendingWrites[self.pendingWrites.startIndex].data.moveReaderIndex(forwardBy: bytes)
|
|
self.subtractOutstanding(bytes: bytes)
|
|
}
|
|
|
|
/// Initialise a new, empty `PendingWritesState`.
|
|
public init() { }
|
|
|
|
/// Check if there are no outstanding writes.
|
|
public var isEmpty: Bool {
|
|
if self.pendingWrites.isEmpty {
|
|
assert(self.bytes == 0)
|
|
assert(!self.pendingWrites.hasMark)
|
|
return true
|
|
} else {
|
|
assert(self.bytes >= 0)
|
|
return false
|
|
}
|
|
}
|
|
|
|
/// Add a new write and optionally the corresponding promise to the list of outstanding writes.
|
|
public mutating func append(_ chunk: PendingStreamWrite) {
|
|
self.pendingWrites.append(chunk)
|
|
switch chunk.data {
|
|
case .byteBuffer(let buffer):
|
|
self.bytes += numericCast(buffer.readableBytes)
|
|
case .fileRegion(let fileRegion):
|
|
self.bytes += numericCast(fileRegion.readableBytes)
|
|
}
|
|
}
|
|
|
|
/// Get the outstanding write at `index`.
|
|
public subscript(index: Int) -> PendingStreamWrite {
|
|
return self.pendingWrites[self.pendingWrites.index(self.pendingWrites.startIndex, offsetBy: index)]
|
|
}
|
|
|
|
/// Mark the flush checkpoint.
|
|
///
|
|
/// All writes before this checkpoint will eventually be written to the socket.
|
|
public mutating func markFlushCheckpoint() {
|
|
self.pendingWrites.mark()
|
|
}
|
|
|
|
/// Indicate that a write has happened, this may be a write of multiple outstanding writes (using for example `writev`).
|
|
///
|
|
/// - warning: The promises will be returned in order. If one of those promises does for example close the `Channel` we might see subsequent writes fail out of order. Example: Imagine the user issues three writes: `A`, `B` and `C`. Imagine that `A` and `B` both get successfully written in one write operation but the user closes the `Channel` in `A`'s callback. Then overall the promises will be fulfilled in this order: 1) `A`: success 2) `C`: error 3) `B`: success. Note how `B` and `C` get fulfilled out of order.
|
|
///
|
|
/// - parameters:
|
|
/// - writeResult: The result of the write operation.
|
|
/// - returns: A tuple of a promise and a `OneWriteResult`. The promise is the first promise that needs to be notified of the write result.
|
|
/// This promise will cascade the result to all other promises that need notifying. If no promises need to be notified, will be `nil`.
|
|
/// The write result will indicate whether we were able to write everything or not.
|
|
public mutating func didWrite(itemCount: Int, result writeResult: IOResult<Int>) -> (EventLoopPromise<Void>?, OneWriteOperationResult) {
|
|
switch writeResult {
|
|
case .wouldBlock(0):
|
|
return (nil, .wouldBlock)
|
|
case .processed(let written), .wouldBlock(let written):
|
|
var promise0: EventLoopPromise<Void>?
|
|
assert(written >= 0, "allegedly written a negative amount of bytes: \(written)")
|
|
var unaccountedWrites = written
|
|
for _ in 0..<itemCount {
|
|
let headItemReadableBytes = self.pendingWrites.first!.data.readableBytes
|
|
if unaccountedWrites >= headItemReadableBytes {
|
|
unaccountedWrites -= headItemReadableBytes
|
|
/* we wrote at least the whole head item, so drop it and succeed the promise */
|
|
if let promise = self.fullyWrittenFirst() {
|
|
if let p = promise0 {
|
|
p.futureResult.cascade(to: promise)
|
|
} else {
|
|
promise0 = promise
|
|
}
|
|
}
|
|
} else {
|
|
/* we could only write a part of the head item, so don't drop it but remember what we wrote */
|
|
self.partiallyWrittenFirst(bytes: unaccountedWrites)
|
|
|
|
// may try again depending on the writeSpinCount
|
|
return (promise0, .writtenPartially)
|
|
}
|
|
}
|
|
assert(unaccountedWrites == 0, "after doing all the accounting for the byte written, \(unaccountedWrites) bytes of unaccounted writes remain.")
|
|
return (promise0, .writtenCompletely)
|
|
}
|
|
}
|
|
|
|
/// Is there a pending flush?
|
|
public var isFlushPending: Bool {
|
|
return self.pendingWrites.hasMark
|
|
}
|
|
|
|
/// Remove all pending writes and return a `EventLoopPromise` which will cascade notifications to all.
|
|
///
|
|
/// - warning: See the warning for `didWrite`.
|
|
///
|
|
/// - returns: promise that needs to be failed, or `nil` if there were no pending writes.
|
|
public mutating func removeAll() -> EventLoopPromise<Void>? {
|
|
var promise0: EventLoopPromise<Void>?
|
|
|
|
while !self.pendingWrites.isEmpty {
|
|
if let p = self.fullyWrittenFirst() {
|
|
if let promise = promise0 {
|
|
promise.futureResult.cascade(to: p)
|
|
} else {
|
|
promise0 = p
|
|
}
|
|
}
|
|
}
|
|
return promise0
|
|
}
|
|
|
|
/// Returns the best mechanism to write pending data at the current point in time.
|
|
var currentBestWriteMechanism: WriteMechanism {
|
|
switch self.flushedChunks {
|
|
case 0:
|
|
return .nothingToBeWritten
|
|
case 1:
|
|
switch self.pendingWrites.first!.data {
|
|
case .byteBuffer:
|
|
return .scalarBufferWrite
|
|
case .fileRegion:
|
|
return .scalarFileWrite
|
|
}
|
|
default:
|
|
let startIndex = self.pendingWrites.startIndex
|
|
switch (self.pendingWrites[startIndex].data,
|
|
self.pendingWrites[self.pendingWrites.index(after: startIndex)].data) {
|
|
case (.byteBuffer, .byteBuffer):
|
|
return .vectorBufferWrite
|
|
case (.byteBuffer, .fileRegion):
|
|
return .scalarBufferWrite
|
|
case (.fileRegion, _):
|
|
return .scalarFileWrite
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// This class manages the writing of pending writes to stream sockets. The state is held in a `PendingWritesState`
|
|
/// value. The most important purpose of this object is to call `write`, `writev` or `sendfile` depending on the
|
|
/// currently pending writes.
|
|
final class PendingStreamWritesManager: PendingWritesManager {
|
|
private var state = PendingStreamWritesState()
|
|
private let bufferPool: Pool<PooledBuffer>
|
|
|
|
internal var waterMark: ChannelOptions.Types.WriteBufferWaterMark = ChannelOptions.Types.WriteBufferWaterMark(low: 32 * 1024, high: 64 * 1024)
|
|
internal let channelWritabilityFlag = ManagedAtomic(true)
|
|
internal var publishedWritability = true
|
|
|
|
internal var writeSpinCount: UInt = 16
|
|
|
|
private(set) var isOpen = true
|
|
|
|
/// Mark the flush checkpoint.
|
|
func markFlushCheckpoint() {
|
|
self.state.markFlushCheckpoint()
|
|
}
|
|
|
|
/// Is there a flush pending?
|
|
var isFlushPending: Bool {
|
|
return self.state.isFlushPending
|
|
}
|
|
|
|
/// Are there any outstanding writes currently?
|
|
var isEmpty: Bool {
|
|
return self.state.isEmpty
|
|
}
|
|
|
|
/// Add a pending write alongside its promise.
|
|
///
|
|
/// - parameters:
|
|
/// - data: The `IOData` to write.
|
|
/// - promise: Optionally an `EventLoopPromise` that will get the write operation's result
|
|
/// - result: If the `Channel` is still writable after adding the write of `data`.
|
|
func add(data: IOData, promise: EventLoopPromise<Void>?) -> Bool {
|
|
assert(self.isOpen)
|
|
self.state.append(.init(data: data, promise: promise))
|
|
|
|
if self.state.bytes > waterMark.high &&
|
|
channelWritabilityFlag.compareExchange(expected: true, desired: false, ordering: .relaxed).exchanged {
|
|
// Returns false to signal the Channel became non-writable and we need to notify the user.
|
|
self.publishedWritability = false
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
/// Returns the best mechanism to write pending data at the current point in time.
|
|
var currentBestWriteMechanism: WriteMechanism {
|
|
return self.state.currentBestWriteMechanism
|
|
}
|
|
|
|
/// Triggers the appropriate write operation. This is a fancy way of saying trigger either `write`, `writev` or
|
|
/// `sendfile`.
|
|
///
|
|
/// - parameters:
|
|
/// - scalarBufferWriteOperation: An operation that writes a single, contiguous array of bytes (usually `write`).
|
|
/// - vectorBufferWriteOperation: An operation that writes multiple contiguous arrays of bytes (usually `writev`).
|
|
/// - scalarFileWriteOperation: An operation that writes a region of a file descriptor (usually `sendfile`).
|
|
/// - returns: The `OneWriteOperationResult` and whether the `Channel` is now writable.
|
|
func triggerAppropriateWriteOperations(scalarBufferWriteOperation: (UnsafeRawBufferPointer) throws -> IOResult<Int>,
|
|
vectorBufferWriteOperation: (UnsafeBufferPointer<IOVector>) throws -> IOResult<Int>,
|
|
scalarFileWriteOperation: (CInt, Int, Int) throws -> IOResult<Int>) throws -> OverallWriteResult {
|
|
return try self.triggerWriteOperations { writeMechanism in
|
|
switch writeMechanism {
|
|
case .scalarBufferWrite:
|
|
return try triggerScalarBufferWrite({ try scalarBufferWriteOperation($0) })
|
|
case .vectorBufferWrite:
|
|
return try triggerVectorBufferWrite({ try vectorBufferWriteOperation($0) })
|
|
case .scalarFileWrite:
|
|
return try triggerScalarFileWrite({ try scalarFileWriteOperation($0, $1, $2) })
|
|
case .nothingToBeWritten:
|
|
assertionFailure("called \(#function) with nothing available to be written")
|
|
return .writtenCompletely
|
|
}
|
|
}
|
|
}
|
|
|
|
/// To be called after a write operation (usually selected and run by `triggerAppropriateWriteOperation`) has
|
|
/// completed.
|
|
///
|
|
/// - parameters:
|
|
/// - itemCount: The number of items we tried to write.
|
|
/// - result: The result of the write operation.
|
|
private func didWrite(itemCount: Int, result: IOResult<Int>) -> OneWriteOperationResult {
|
|
let (promise, result) = self.state.didWrite(itemCount: itemCount, result: result)
|
|
|
|
if self.state.bytes < waterMark.low {
|
|
channelWritabilityFlag.store(true, ordering: .relaxed)
|
|
}
|
|
|
|
promise?.succeed(())
|
|
return result
|
|
}
|
|
|
|
/// Trigger a write of a single `ByteBuffer` (usually using `write(2)`).
|
|
///
|
|
/// - parameters:
|
|
/// - operation: An operation that writes a single, contiguous array of bytes (usually `write`).
|
|
private func triggerScalarBufferWrite(_ operation: (UnsafeRawBufferPointer) throws -> IOResult<Int>) throws -> OneWriteOperationResult {
|
|
assert(self.state.isFlushPending && !self.state.isEmpty && self.isOpen,
|
|
"single write called in illegal state: flush pending: \(self.state.isFlushPending), empty: \(self.state.isEmpty), isOpen: \(self.isOpen)")
|
|
|
|
switch self.state[0].data {
|
|
case .byteBuffer(let buffer):
|
|
return self.didWrite(itemCount: 1, result: try buffer.withUnsafeReadableBytes({ try operation($0) }))
|
|
case .fileRegion:
|
|
preconditionFailure("called \(#function) but first item to write was a FileRegion")
|
|
}
|
|
}
|
|
|
|
/// Trigger a write of a single `FileRegion` (usually using `sendfile(2)`).
|
|
///
|
|
/// - parameters:
|
|
/// - operation: An operation that writes a region of a file descriptor.
|
|
private func triggerScalarFileWrite(_ operation: (CInt, Int, Int) throws -> IOResult<Int>) throws -> OneWriteOperationResult {
|
|
assert(self.state.isFlushPending && !self.state.isEmpty && self.isOpen,
|
|
"single write called in illegal state: flush pending: \(self.state.isFlushPending), empty: \(self.state.isEmpty), isOpen: \(self.isOpen)")
|
|
|
|
switch self.state[0].data {
|
|
case .fileRegion(let file):
|
|
let readerIndex = file.readerIndex
|
|
let endIndex = file.endIndex
|
|
return try file.fileHandle.withUnsafeFileDescriptor { fd in
|
|
self.didWrite(itemCount: 1, result: try operation(fd, readerIndex, endIndex))
|
|
}
|
|
case .byteBuffer:
|
|
preconditionFailure("called \(#function) but first item to write was a ByteBuffer")
|
|
}
|
|
}
|
|
|
|
/// Trigger a vector write operation. In other words: Write multiple contiguous arrays of bytes.
|
|
///
|
|
/// - parameters:
|
|
/// - operation: The vector write operation to use. Usually `writev`.
|
|
private func triggerVectorBufferWrite(_ operation: (UnsafeBufferPointer<IOVector>) throws -> IOResult<Int>) throws -> OneWriteOperationResult {
|
|
assert(self.state.isFlushPending && !self.state.isEmpty && self.isOpen,
|
|
"vector write called in illegal state: flush pending: \(self.state.isFlushPending), empty: \(self.state.isEmpty), isOpen: \(self.isOpen)")
|
|
let result = try doPendingWriteVectorOperation(pending: self.state,
|
|
bufferPool: bufferPool,
|
|
{ try operation($0) })
|
|
return self.didWrite(itemCount: result.itemCount, result: result.writeResult)
|
|
}
|
|
|
|
/// Fail all the outstanding writes. This is useful if for example the `Channel` is closed.
|
|
func failAll(error: Error, close: Bool) {
|
|
if close {
|
|
assert(self.isOpen)
|
|
self.isOpen = false
|
|
}
|
|
|
|
self.state.removeAll()?.fail(error)
|
|
|
|
assert(self.state.isEmpty)
|
|
}
|
|
|
|
/// Initialize with a pre-allocated array of IO vectors and storage references. We pass in these pre-allocated
|
|
/// objects to save allocations. They can be safely be re-used for all `Channel`s on a given `EventLoop` as an
|
|
/// `EventLoop` always runs on one and the same thread. That means that there can't be any writes of more than
|
|
/// one `Channel` on the same `EventLoop` at the same time.
|
|
///
|
|
/// - parameters:
|
|
/// - bufferPool: Pool of buffers to be used for iovecs and storage references
|
|
init(bufferPool: Pool<PooledBuffer>) {
|
|
self.bufferPool = bufferPool
|
|
}
|
|
}
|
|
|
|
internal enum WriteMechanism {
|
|
case scalarBufferWrite
|
|
case vectorBufferWrite
|
|
case scalarFileWrite
|
|
case nothingToBeWritten
|
|
}
|
|
|
|
internal protocol PendingWritesManager: AnyObject {
|
|
var isOpen: Bool { get }
|
|
var isFlushPending: Bool { get }
|
|
var writeSpinCount: UInt { get }
|
|
var currentBestWriteMechanism: WriteMechanism { get }
|
|
var channelWritabilityFlag: ManagedAtomic<Bool> { get }
|
|
|
|
/// Represents the writability state the last time we published a writability change to the `Channel`.
|
|
/// This is used in `triggerWriteOperations` to determine whether we need to trigger a writability
|
|
/// change.
|
|
var publishedWritability: Bool { get set }
|
|
}
|
|
|
|
extension PendingWritesManager {
|
|
// This is called from `Channel` API so must be thread-safe.
|
|
var isWritable: Bool {
|
|
return self.channelWritabilityFlag.load(ordering: .relaxed)
|
|
}
|
|
|
|
internal func triggerWriteOperations(triggerOneWriteOperation: (WriteMechanism) throws -> OneWriteOperationResult) throws -> OverallWriteResult {
|
|
var result = OverallWriteResult(writeResult: .couldNotWriteEverything, writabilityChange: false)
|
|
|
|
writeSpinLoop: for _ in 0...self.writeSpinCount {
|
|
var oneResult: OneWriteOperationResult
|
|
repeat {
|
|
guard self.isOpen && self.isFlushPending else {
|
|
result.writeResult = .writtenCompletely
|
|
break writeSpinLoop
|
|
}
|
|
|
|
oneResult = try triggerOneWriteOperation(self.currentBestWriteMechanism)
|
|
if oneResult == .wouldBlock {
|
|
break writeSpinLoop
|
|
}
|
|
} while oneResult == .writtenCompletely
|
|
}
|
|
|
|
// Please note that the re-entrancy protection in `flushNow` expects this code to try to write _all_ the data
|
|
// that is flushed. If we receive a `flush` whilst processing a previous `flush`, we won't do anything because
|
|
// we expect this loop to attempt to attempt all writes, even ones that arrive after this method begins to run.
|
|
//
|
|
// In other words, don't return `.writtenCompletely` unless you've written everything the PendingWritesManager
|
|
// knows to be flushed.
|
|
//
|
|
// Also, it is very important to not do any outcalls to user code outside of the loop until the `flushNow`
|
|
// re-entrancy protection is off again.
|
|
|
|
if !self.publishedWritability {
|
|
// When we last published a writability change the `Channel` wasn't writable, signal back to the caller
|
|
// whether we should emit a writability change.
|
|
result.writabilityChange = self.isWritable
|
|
self.publishedWritability = result.writabilityChange
|
|
}
|
|
return result
|
|
}
|
|
}
|
|
|
|
extension PendingStreamWritesManager: CustomStringConvertible {
|
|
var description: String {
|
|
return "PendingStreamWritesManager { isFlushPending: \(self.isFlushPending), " +
|
|
/* */ "writabilityFlag: \(self.channelWritabilityFlag.load(ordering: .relaxed))), state: \(self.state) }"
|
|
}
|
|
}
|