Remove management of flush promises from PDWM. (#24)

Motivation:

A substantial chunk of code complexity exists in the PDWM to manage
flush promises. This complexity is no longer justified now that we
don't bother with them.

Modifications:

Removed much of the complexity of the PDWM handling of flush promises.

Result:

Simpler, easier to understand code.
This commit is contained in:
Cory Benfield 2018-02-23 12:09:00 +00:00 committed by Norman Maurer
parent c95051db78
commit 369a0714ae
4 changed files with 36 additions and 155 deletions

View File

@ -44,17 +44,6 @@ private struct PendingDatagramWrite {
}
}
/// This enum is used to keep track of flushes separate from writes for the datagram
/// channels. This is necessary to handle the richer flush promise notifications that
/// datagrams provide: specifically, some fraction of writes to datagram sockets may fail
/// for "recoverable" reasons, and those errors need to be reported on the flush promises.
/// That means it's vital we keep track of where the flushes were, which `MarkedCircularBuffer`
/// cannot do for us.
private enum PendingDatagramOperation {
case write(PendingDatagramWrite)
case flush(EventLoopPromise<Void>)
}
fileprivate extension Error {
/// Returns whether the error is "recoverable" from the perspective of datagram sending.
///
@ -141,17 +130,13 @@ private func doPendingDatagramWriteVectorOperation(pending: PendingDatagramWrite
private struct PendingDatagramWritesState {
fileprivate typealias DatagramWritePromiseFiller = (EventLoopPromise<Void>, Error?)
private var pendingWrites = MarkedCircularBuffer<PendingDatagramOperation>(initialRingCapacity: 16)
private var pendingWrites = MarkedCircularBuffer<PendingDatagramWrite>(initialRingCapacity: 16)
private var chunks: Int = 0
private var toBeFlushedErrors: [Error] = []
public private(set) var bytes: Int = 0
public var nextWrite: PendingDatagramWrite? {
guard case .some(.write(let w)) = self.pendingWrites.first else {
return nil
}
return w
return self.pendingWrites.first
}
/// Subtract `bytes` from the number of outstanding bytes to write.
@ -165,10 +150,7 @@ private struct PendingDatagramWritesState {
/// - returns: The promise that the caller must fire, along with an error to fire it with if it needs one.
///
private mutating func wroteFirst(error: Error? = nil) -> DatagramWritePromiseFiller? {
guard case .write(let first) = self.pendingWrites.removeFirst() else {
fatalError("First pending write is actually a flush")
}
let first = self.pendingWrites.removeFirst()
self.chunks -= 1
self.subtractOutstanding(bytes: first.data.readableBytes)
if let promise = first.promise {
@ -196,7 +178,7 @@ private struct PendingDatagramWritesState {
/// Add a new write and optionally the corresponding promise to the list of outstanding writes.
public mutating func append(_ chunk: PendingDatagramWrite) {
self.pendingWrites.append(.write(chunk))
self.pendingWrites.append(chunk)
self.chunks += 1
self.bytes += chunk.data.readableBytes
}
@ -204,19 +186,11 @@ private struct PendingDatagramWritesState {
/// Mark the flush checkpoint.
///
/// All writes before this checkpoint will eventually be written to the socket.
///
/// - parameters:
/// - The flush promise.
public mutating func markFlushCheckpoint(promise: EventLoopPromise<Void>?) {
public mutating func markFlushCheckpoint() {
// No point marking a flush checkpoint if we have no writes!
guard self.pendingWrites.count > 0 else {
// No writes mean this is a flush on empty, so we can satisfy this immediately.
promise?.succeed(result: ())
return
}
if let promise = promise {
self.pendingWrites.append(.flush(promise))
}
self.pendingWrites.mark()
}
@ -249,7 +223,6 @@ private struct PendingDatagramWritesState {
if let promiseFiller = self.wroteFirst(error: error) {
pendingPromises.append(promiseFiller)
}
pendingPromises.append(contentsOf: self.processPendingFlushes())
let result: OneWriteOperationResult = self.pendingWrites.hasMark() ? .writtenPartially : .writtenCompletely
return (pendingPromises, result)
@ -285,45 +258,18 @@ private struct PendingDatagramWritesState {
/// - returns: All the promises that must be fired, and a `WriteResult` that indicates if we could write
/// everything or not.
private mutating func didScalarWrite(written: Int) -> ([DatagramWritePromiseFiller], OneWriteOperationResult) {
guard case .write(let write) = self.pendingWrites[0] else {
fatalError("First write in queue is actually a flush")
}
precondition(written <= write.data.readableBytes,
"Appeared to write more bytes (\(written)) than the datagram contained (\(write.data.readableBytes))")
precondition(written <= self.pendingWrites[0].data.readableBytes,
"Appeared to write more bytes (\(written)) than the datagram contained (\(self.pendingWrites[0].data.readableBytes))")
var fillers: [DatagramWritePromiseFiller] = []
if let writeFiller = self.wroteFirst() {
fillers.append(writeFiller)
}
fillers.append(contentsOf: self.processPendingFlushes())
// If we no longer have a mark, we wrote everything.
let result: OneWriteOperationResult = self.pendingWrites.hasMark() ? .writtenPartially : .writtenCompletely
return (fillers, result)
}
/// Returns any pending flush promises that need to be fired, along with (if needed) a composite error
/// to fire them with.
private mutating func processPendingFlushes() -> [DatagramWritePromiseFiller] {
var results: [DatagramWritePromiseFiller] = []
while case .some(.flush(let promise)) = self.pendingWrites.first {
_ = self.pendingWrites.removeFirst()
if self.toBeFlushedErrors.count > 0 {
results.append((promise, NIOCompositeError(comprising: self.toBeFlushedErrors)))
} else {
results.append((promise, nil))
}
}
if results.count > 0 {
self.toBeFlushedErrors = []
}
return results
}
/// Is there a pending flush?
public var isFlushPending: Bool {
return self.pendingWrites.hasMark()
@ -339,14 +285,10 @@ private struct PendingDatagramWritesState {
promises.reserveCapacity(self.pendingWrites.count)
while !self.pendingWrites.isEmpty {
switch self.pendingWrites.removeFirst() {
case .write(let w):
self.chunks -= 1
self.bytes -= w.data.readableBytes
w.promise.map { promises.append($0) }
case .flush(let p):
promises.append(p)
}
let w = self.pendingWrites.removeFirst()
self.chunks -= 1
self.bytes -= w.data.readableBytes
w.promise.map { promises.append($0) }
}
promises.forEach { $0.fail(error: error) }
@ -354,14 +296,14 @@ private struct PendingDatagramWritesState {
/// Returns the best mechanism to write pending data at the current point in time.
var currentBestWriteMechanism: WriteMechanism {
var flushedWrites = self.flushedWrites
if flushedWrites.next() == nil {
return .nothingToBeWritten
} else if flushedWrites.next() == nil {
return .scalarBufferWrite
} else {
switch self.pendingWrites.markedElementIndex() {
case .some(let e) where e > 0:
return .vectorBufferWrite
case .some(let e):
assert(e == 0) // The compiler can't prove this, but it must be so.
return .scalarBufferWrite
default:
return .nothingToBeWritten
}
}
}
@ -383,12 +325,7 @@ extension PendingDatagramWritesState {
while self.index <= self.markedIndex {
let element = self.pendingWrites.pendingWrites[index]
index += 1
switch element {
case .write(let w):
return w
case .flush:
continue
}
return element
}
return nil
@ -448,11 +385,8 @@ final class PendingDatagramWritesManager: PendingWritesManager {
}
/// Mark the flush checkpoint.
///
/// - parameters:
/// - The flush promise.
func markFlushCheckpoint(promise: EventLoopPromise<Void>?) {
self.state.markFlushCheckpoint(promise: promise)
func markFlushCheckpoint() {
self.state.markFlushCheckpoint()
}
/// Is there a flush pending?

View File

@ -1237,7 +1237,7 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
override fileprivate func markFlushPoint(promise: EventLoopPromise<Void>?) {
// Even if writable() will be called later by the EventLoop we still need to mark the flush checkpoint so we are sure all the flushed messages
// are actually written once writable() is called.
self.pendingWrites.markFlushCheckpoint(promise: promise)
self.pendingWrites.markFlushCheckpoint()
}
/// Called when closing, to instruct the specific implementation to discard all pending

View File

@ -33,7 +33,6 @@ extension PendingDatagramWritesManagerTests {
("testPendingWritesCancellationWorksCorrectly", testPendingWritesCancellationWorksCorrectly),
("testPendingWritesNoMoreThanWritevLimitIsWritten", testPendingWritesNoMoreThanWritevLimitIsWritten),
("testPendingWritesNoMoreThanWritevLimitIsWrittenInOneMassiveChunk", testPendingWritesNoMoreThanWritevLimitIsWrittenInOneMassiveChunk),
("testPendingWritesFlushPromiseWorksWithoutWritePromises", testPendingWritesFlushPromiseWorksWithoutWritePromises),
("testPendingWritesWorksWithManyEmptyWrites", testPendingWritesWorksWithManyEmptyWrites),
("testPendingWritesCloseDuringVectorWrite", testPendingWritesCloseDuringVectorWrite),
("testPendingWritesMoreThanWritevIOVectorLimit", testPendingWritesMoreThanWritevIOVectorLimit),

View File

@ -237,7 +237,7 @@ class PendingDatagramWritesManagerTests: XCTestCase {
XCTAssertFalse(pwm.isEmpty)
XCTAssertFalse(pwm.isFlushPending)
pwm.markFlushCheckpoint(promise: nil)
pwm.markFlushCheckpoint()
XCTAssertFalse(pwm.isEmpty)
XCTAssertTrue(pwm.isFlushPending)
@ -263,7 +263,7 @@ class PendingDatagramWritesManagerTests: XCTestCase {
promiseStates: [[true, false]])
XCTAssertEqual(.writtenCompletely, result)
pwm.markFlushCheckpoint(promise: nil)
pwm.markFlushCheckpoint()
result = try assertExpectedWritability(pendingWritesManager: pwm,
promises: ps,
@ -289,7 +289,7 @@ class PendingDatagramWritesManagerTests: XCTestCase {
let ps: [EventLoopPromise<()>] = (0..<3).map { (_: Int) in el.newPromise() }
_ = pwm.add(envelope: AddressedEnvelope(remoteAddress: firstAddress, data: buffer), promise: ps[0])
_ = pwm.add(envelope: AddressedEnvelope(remoteAddress: secondAddress, data: buffer), promise: ps[1])
pwm.markFlushCheckpoint(promise: nil)
pwm.markFlushCheckpoint()
_ = pwm.add(envelope: AddressedEnvelope(remoteAddress: firstAddress, data: emptyBuffer), promise: ps[2])
var result = try assertExpectedWritability(pendingWritesManager: pwm,
@ -300,7 +300,7 @@ class PendingDatagramWritesManagerTests: XCTestCase {
promiseStates: [[true, true, false]])
XCTAssertEqual(.writtenCompletely, result)
pwm.markFlushCheckpoint(promise: nil)
pwm.markFlushCheckpoint()
result = try assertExpectedWritability(pendingWritesManager: pwm,
promises: ps,
@ -327,7 +327,7 @@ class PendingDatagramWritesManagerTests: XCTestCase {
_ = pwm.add(envelope: AddressedEnvelope(remoteAddress: secondAddress, data: buffer), promise: ps[1])
_ = pwm.add(envelope: AddressedEnvelope(remoteAddress: firstAddress, data: buffer), promise: ps[2])
_ = pwm.add(envelope: AddressedEnvelope(remoteAddress: secondAddress, data: buffer), promise: ps[3])
pwm.markFlushCheckpoint(promise: nil)
pwm.markFlushCheckpoint()
var result = try assertExpectedWritability(pendingWritesManager: pwm,
promises: ps,
@ -377,7 +377,7 @@ class PendingDatagramWritesManagerTests: XCTestCase {
let actualVectorWritabilities = maxVectorWritabilities.indices.dropLast().map { Array(maxVectorWritabilities[$0...]) }
let actualPromiseStates = ps.indices.dropFirst().map { Array(repeating: true, count: $0) + Array(repeating: false, count: ps.count - $0) }
pwm.markFlushCheckpoint(promise: nil)
pwm.markFlushCheckpoint()
/* below, we'll write 1 datagram at a time. So the number of datagrams offered should decrease by one.
The write operation should be repeated until we did it 1 + spin count times and then return `.couldNotWriteEverything`.
@ -413,7 +413,7 @@ class PendingDatagramWritesManagerTests: XCTestCase {
let ps: [EventLoopPromise<()>] = (0..<3).map { (_: Int) in el.newPromise() }
_ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: ps[0])
_ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: ps[1])
pwm.markFlushCheckpoint(promise: nil)
pwm.markFlushCheckpoint()
_ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: ps[2])
let result = try assertExpectedWritability(pendingWritesManager: pwm,
@ -450,7 +450,7 @@ class PendingDatagramWritesManagerTests: XCTestCase {
_ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: ps[0])
_ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: ps[1])
_ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: ps[2])
pwm.markFlushCheckpoint(promise: nil)
pwm.markFlushCheckpoint()
let result = try assertExpectedWritability(pendingWritesManager: pwm,
promises: ps,
@ -484,8 +484,7 @@ class PendingDatagramWritesManagerTests: XCTestCase {
_ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: ps[1])
_ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: ps[2])
let flushPromise1: EventLoopPromise<()> = el.newPromise()
pwm.markFlushCheckpoint(promise: flushPromise1)
pwm.markFlushCheckpoint()
let result = try assertExpectedWritability(pendingWritesManager: pwm,
promises: ps,
@ -499,7 +498,6 @@ class PendingDatagramWritesManagerTests: XCTestCase {
promiseStates: [[true, false, false], [true, true, false], [true, true, true]])
XCTAssertEqual(.writtenCompletely, result)
XCTAssertTrue(flushPromise1.futureResult.fulfilled)
XCTAssertNoThrow(try ps[1].futureResult.wait())
XCTAssertNoThrow(try ps[2].futureResult.wait())
@ -512,53 +510,6 @@ class PendingDatagramWritesManagerTests: XCTestCase {
} catch {
XCTFail("Unexpected error \(error)")
}
do {
try flushPromise1.futureResult.wait()
XCTFail("Did not throw")
} catch let e as NIOCompositeError {
XCTAssertEqual(e.count, 1)
XCTAssertEqual(e[0] as? ChannelError, ChannelError.writeMessageTooLarge)
} catch {
XCTFail("Unexpected error \(error)")
}
}
}
func testPendingWritesFlushPromiseWorksWithoutWritePromises() throws {
let el = EmbeddedEventLoop()
let alloc = ByteBufferAllocator()
let address = try SocketAddress(ipAddress: "fe80::1", port: 80)
var buffer = alloc.buffer(capacity: 12)
_ = buffer.write(string: "1234")
try withPendingDatagramWritesManager { pwm in
let ps: [EventLoopPromise<()>] = (0..<2).map { (_: Int) in el.newPromise() }
pwm.markFlushCheckpoint(promise: ps[0])
/* let's start with no writes and just a promise */
var result = try assertExpectedWritability(pendingWritesManager: pwm,
promises: ps,
expectedSingleWritabilities: nil,
expectedVectorWritabilities: nil,
returns: [],
promiseStates: [[true, false]])
/* let's add a few writes but still without any promises */
_ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: nil)
_ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: nil)
_ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: nil)
pwm.markFlushCheckpoint(promise: ps[1])
result = try assertExpectedWritability(pendingWritesManager: pwm,
promises: ps,
expectedSingleWritabilities: nil,
expectedVectorWritabilities: [[(4, address), (4, address), (4, address)]],
returns: [.ok(.processed(3))],
promiseStates: [[true, true]])
XCTAssertEqual(.writtenCompletely, result)
}
}
@ -572,7 +523,7 @@ class PendingDatagramWritesManagerTests: XCTestCase {
let ps: [EventLoopPromise<()>] = (0..<3).map { (_: Int) in el.newPromise() }
_ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: emptyBuffer), promise: ps[0])
_ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: emptyBuffer), promise: ps[1])
pwm.markFlushCheckpoint(promise: nil)
pwm.markFlushCheckpoint()
_ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: emptyBuffer), promise: ps[2])
var result = try assertExpectedWritability(pendingWritesManager: pwm,
@ -583,7 +534,7 @@ class PendingDatagramWritesManagerTests: XCTestCase {
promiseStates: [[true, true, false]])
XCTAssertEqual(.writtenCompletely, result)
pwm.markFlushCheckpoint(promise: nil)
pwm.markFlushCheckpoint()
result = try assertExpectedWritability(pendingWritesManager: pwm,
promises: ps,
@ -606,7 +557,7 @@ class PendingDatagramWritesManagerTests: XCTestCase {
let ps: [EventLoopPromise<()>] = (0..<3).map { (_: Int) in el.newPromise() }
_ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: ps[0])
_ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: ps[1])
pwm.markFlushCheckpoint(promise: nil)
pwm.markFlushCheckpoint()
_ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: ps[2])
ps[0].futureResult.whenComplete {
@ -638,8 +589,7 @@ class PendingDatagramWritesManagerTests: XCTestCase {
ps.forEach { p in
_ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: p)
}
let flushPromise: EventLoopPromise<()> = el.newPromise()
pwm.markFlushCheckpoint(promise: flushPromise)
pwm.markFlushCheckpoint()
var result = try assertExpectedWritability(pendingWritesManager: pwm,
promises: ps,
@ -649,7 +599,6 @@ class PendingDatagramWritesManagerTests: XCTestCase {
promiseStates: [Array(repeating: true, count: Socket.writevLimitIOVectors) + [false],
Array(repeating: true, count: Socket.writevLimitIOVectors) + [false]])
XCTAssertEqual(.couldNotWriteEverything, result)
XCTAssertFalse(flushPromise.futureResult.fulfilled)
result = try assertExpectedWritability(pendingWritesManager: pwm,
promises: ps,
expectedSingleWritabilities: [(4, address)],
@ -657,7 +606,6 @@ class PendingDatagramWritesManagerTests: XCTestCase {
returns: [.ok(.processed(4))],
promiseStates: [Array(repeating: true, count: Socket.writevLimitIOVectors + 1)])
XCTAssertEqual(.writtenCompletely, result)
XCTAssertTrue(flushPromise.futureResult.fulfilled)
}
}
}