Handle close(output) in the pipeline handler. (#2414)

Motivation:

Currently the server pipeline handler ignores close. This generally works,
except in the rare case that the user calls close(mode: .output). In this
instance they have signalled that they'll never write again, and they're
likely expecting a final close shortly after.

However, it is possible that the pipeline handler has suspended reads
at the same time. On Linux this isn't an issue, because we'll still be told
about the eventual socket close. However, on Apple platforms we won't: we've
masked off the reads, and we can't listen to EVFILT_EXCEPT due to some
other issues. This means that on Apple platforms the server pipeline handler
can accidentally wedge the Channel open and prevent it from closing.

We should take this opportunity to have the server pipeline handler be smart
about close(mode: .output). What _should_ happen here is that the pipeline
handler should immediately refuse to deliver further requests on the Channel.
If one is in-flight, it can continue, but everything else should be dropped.
This is because the server cannot possibly respond to further requests.

Modifications:

- Add new states to the server pipeline handler
- Drop buffered requests and new data after close(mode: .output)
- Add tests

Result:

Server pipeline handler behaves way better.
This commit is contained in:
Cory Benfield 2023-04-28 14:22:07 +01:00 committed by GitHub
parent 77d35d2f43
commit 5f8b0647e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 218 additions and 7 deletions

View File

@ -82,11 +82,22 @@ public final class HTTPServerPipelineHandler: ChannelDuplexHandler, RemovableCha
/// to wait for the request to complete, but won't block anything.
case requestEndPending
/// The server has closed the output partway through a request. The server will never
/// act again, but this may not be in error, so we'll forward the rest of this request to the server.
case sentCloseOutputRequestEndPending
/// The server has closed the output, and a complete request has been delivered.
/// It's never going to act again. Generally we expect this to be closely followed
/// by read EOF, but we need to keep reading to make that possible, so we
/// never suppress reads again.
case sentCloseOutput
mutating func requestHeadReceived() {
switch self {
case .idle:
self = .requestAndResponseEndPending
case .requestAndResponseEndPending, .responseEndPending, .requestEndPending:
case .requestAndResponseEndPending, .responseEndPending, .requestEndPending,
.sentCloseOutputRequestEndPending, .sentCloseOutput:
preconditionFailure("received request head in state \(self)")
}
}
@ -100,7 +111,7 @@ public final class HTTPServerPipelineHandler: ChannelDuplexHandler, RemovableCha
// We got a response while still receiving a request, which we have to
// wait for.
self = .requestEndPending
case .requestEndPending, .idle:
case .requestEndPending, .idle, .sentCloseOutput, .sentCloseOutputRequestEndPending:
preconditionFailure("Unexpectedly received a response in state \(self)")
}
}
@ -114,10 +125,25 @@ public final class HTTPServerPipelineHandler: ChannelDuplexHandler, RemovableCha
// We got a request and the response isn't done, wait for the
// response.
self = .responseEndPending
case .responseEndPending, .idle:
case .sentCloseOutputRequestEndPending:
// Got the request end we were waiting for.
self = .sentCloseOutput
case .responseEndPending, .idle, .sentCloseOutput:
preconditionFailure("Received second request")
}
}
mutating func closeOutputSent() {
switch self {
case .idle, .responseEndPending:
self = .sentCloseOutput
case .requestEndPending, .requestAndResponseEndPending:
self = .sentCloseOutputRequestEndPending
case .sentCloseOutput, .sentCloseOutputRequestEndPending:
// Weird to duplicate fail, but we tolerate it in both cases.
()
}
}
}
/// The events that this handler buffers while waiting for the server to
@ -182,6 +208,11 @@ public final class HTTPServerPipelineHandler: ChannelDuplexHandler, RemovableCha
()
}
if self.state == .sentCloseOutput {
// Drop all events in this state.
return
}
if self.eventBuffer.count != 0 || self.state == .responseEndPending {
self.eventBuffer.append(.channelRead(data))
return
@ -252,18 +283,20 @@ public final class HTTPServerPipelineHandler: ChannelDuplexHandler, RemovableCha
// we're not in the middle of a request, let's just shut the door
self.lifecycleState = .quiescingLastRequestEndReceived
self.eventBuffer.removeAll()
case .idle:
case .idle, .sentCloseOutput:
// we're completely idle, let's just close
self.lifecycleState = .quiescingCompleted
self.eventBuffer.removeAll()
context.close(promise: nil)
case .requestEndPending, .requestAndResponseEndPending:
// we're in the middle of a request, we'll need to keep accepting events until we see the .end
case .requestEndPending, .requestAndResponseEndPending, .sentCloseOutputRequestEndPending:
// we're in the middle of a request, we'll need to keep accepting events until we see the .end.
// It's ok for us to forget we saw close output here, the lifecycle event will close for us.
self.lifecycleState = .quiescingWaitingForRequestEnd
}
case ChannelEvent.inputClosed:
// We only buffer half-close if there are request parts we're waiting to send.
// Otherwise we deliver the half-close immediately.
// Otherwise we deliver the half-close immediately. Note that we deliver this
// even if the server has sent close output, as it's useful information.
if case .responseEndPending = self.state, self.eventBuffer.count > 0 {
self.eventBuffer.append(.halfClose)
} else {
@ -414,6 +447,32 @@ public final class HTTPServerPipelineHandler: ChannelDuplexHandler, RemovableCha
context.fireChannelInactive()
}
public func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise<Void>?) {
var shouldRead = false
if mode == .output {
// We need to do special handling here. If the server is closing output they don't intend to write anymore.
// That means we want to drop anything up to the end of the in-flight request.
self.dropAllButInFlightRequest()
self.state.closeOutputSent()
// If there's a read pending, we should deliver it after we forward the close on.
shouldRead = self.readPending
}
context.close(mode: mode, promise: promise)
// Double-check readPending here in case something weird happened.
//
// Note that because of the state transition in closeOutputSent() above we likely won't actually
// forward any further reads to the user, unless they belong to a request currently streaming in.
// Any reads past that point will be dropped in channelRead().
if shouldRead && self.readPending {
self.readPending = false
context.read()
}
}
/// A response has been sent: we can now start passing reads through
/// again if there are no further pending requests, and send any read()
/// call we may have swallowed.
@ -469,6 +528,31 @@ public final class HTTPServerPipelineHandler: ChannelDuplexHandler, RemovableCha
context.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
}
}
private func dropAllButInFlightRequest() {
// We're going to walk the request buffer up to the next `.head` and drop from there.
let maybeFirstHead = self.eventBuffer.firstIndex(where: { element in
switch element {
case .channelRead(let read):
switch self.unwrapInboundIn(read) {
case .head:
return true
case .body, .end:
return false
}
case .error, .halfClose:
// Leave these where they are, if they're before the next .head we still want to deliver them.
// If they're after the next .head, we don't care.
return false
}
})
guard let firstHead = maybeFirstHead else {
return
}
self.eventBuffer.removeSubrange(firstHead...)
}
}
@available(*, unavailable)

View File

@ -92,6 +92,21 @@ private final class QuiesceEventRecorder: ChannelInboundHandler {
}
}
// This handler drops close mode output. This is because EmbeddedChannel doesn't support it,
// and tests here don't require that it does anything sensible.
private final class CloseOutputSuppressor: ChannelOutboundHandler {
typealias OutboundIn = Any
typealias OutboundOut = Any
func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise<Void>?) {
if mode == .output {
promise?.succeed()
} else {
context.close(mode: mode, promise: promise)
}
}
}
class HTTPServerPipelineHandlerTest: XCTestCase {
var channel: EmbeddedChannel! = nil
@ -110,6 +125,7 @@ class HTTPServerPipelineHandlerTest: XCTestCase {
self.writeRecorder = WriteRecorder()
self.pipelineHandler = HTTPServerPipelineHandler()
self.quiesceEventRecorder = QuiesceEventRecorder()
XCTAssertNoThrow(try channel.pipeline.addHandler(CloseOutputSuppressor()).wait())
XCTAssertNoThrow(try channel.pipeline.addHandler(self.readCounter).wait())
XCTAssertNoThrow(try channel.pipeline.addHandler(HTTPResponseEncoder()).wait())
XCTAssertNoThrow(try channel.pipeline.addHandler(self.writeRecorder).wait())
@ -1038,4 +1054,115 @@ class HTTPServerPipelineHandlerTest: XCTestCase {
// This should have triggered a read
XCTAssertEqual(self.readCounter.readCount, 1)
}
func testServerCloseOutputForcesReadsBackOn() throws {
// Send in a request
XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.head(self.requestHead)))
XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.end(nil)))
// Reads are blocked.
XCTAssertEqual(self.readCounter.readCount, 0)
self.channel.read()
XCTAssertEqual(self.readCounter.readCount, 0)
XCTAssertEqual(self.readRecorder.reads,
[.channelRead(HTTPServerRequestPart.head(self.requestHead)),
.channelRead(HTTPServerRequestPart.end(nil))])
// Now the server sends close output
XCTAssertNoThrow(try channel.close(mode: .output).wait())
// This unblocked the read and further reads can continue.
XCTAssertEqual(self.readCounter.readCount, 1)
self.channel.read()
XCTAssertEqual(self.readCounter.readCount, 2)
}
func testCloseOutputAlwaysAllowsReads() throws {
// Send in a request
XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.head(self.requestHead)))
XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.end(nil)))
// Reads are blocked.
XCTAssertEqual(self.readCounter.readCount, 0)
self.channel.read()
XCTAssertEqual(self.readCounter.readCount, 0)
XCTAssertEqual(self.readRecorder.reads,
[.channelRead(HTTPServerRequestPart.head(self.requestHead)),
.channelRead(HTTPServerRequestPart.end(nil))])
// Now the server sends close output
XCTAssertNoThrow(try channel.close(mode: .output).wait())
// This unblocked the read and further reads can continue.
XCTAssertEqual(self.readCounter.readCount, 1)
self.channel.read()
XCTAssertEqual(self.readCounter.readCount, 2)
// New requests can come in, but are dropped.
XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.head(self.requestHead)))
XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.end(nil)))
// Reads keep working.
XCTAssertEqual(self.readCounter.readCount, 2)
self.channel.read()
XCTAssertEqual(self.readCounter.readCount, 3)
XCTAssertEqual(self.readRecorder.reads,
[.channelRead(HTTPServerRequestPart.head(self.requestHead)),
.channelRead(HTTPServerRequestPart.end(nil))])
}
func testCloseOutputFirstIsOkEvenIfItsABitWeird() throws {
// Server sends close output first
XCTAssertNoThrow(try channel.close(mode: .output).wait())
// Send in a request
XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.head(self.requestHead)))
XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.end(nil)))
// Reads are unblocked.
XCTAssertEqual(self.readCounter.readCount, 0)
self.channel.read()
XCTAssertEqual(self.readCounter.readCount, 1)
// But the data is dropped.
XCTAssertEqual(self.readRecorder.reads, [])
// New requests can come in, and are dropped.
XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.head(self.requestHead)))
XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.end(nil)))
// Reads keep working.
XCTAssertEqual(self.readCounter.readCount, 1)
self.channel.read()
XCTAssertEqual(self.readCounter.readCount, 2)
XCTAssertEqual(self.readRecorder.reads, [])
}
func testPipelinedRequestsAreDroppedWhenWeSendCloseOutput() throws {
// Send in three requests
for _ in 0..<3 {
XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.head(self.requestHead)))
XCTAssertNoThrow(try self.channel.writeInbound(HTTPServerRequestPart.end(nil)))
}
// Reads are blocked and only one request was read.
XCTAssertEqual(self.readCounter.readCount, 0)
self.channel.read()
XCTAssertEqual(self.readCounter.readCount, 0)
XCTAssertEqual(self.readRecorder.reads,
[.channelRead(HTTPServerRequestPart.head(self.requestHead)),
.channelRead(HTTPServerRequestPart.end(nil))])
// Server sends close mode output. The buffered requests are dropped.
XCTAssertNoThrow(try channel.close(mode: .output).wait())
XCTAssertEqual(self.readRecorder.reads,
[.channelRead(HTTPServerRequestPart.head(self.requestHead)),
.channelRead(HTTPServerRequestPart.end(nil))])
}
}