swift-nio/Sources/NIOHTTP1/HTTPServerPipelineHandler.s...

564 lines
24 KiB
Swift

//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2021 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import NIOCore
/// A utility function that runs the body code only in debug builds, without
/// emitting compiler warnings.
///
/// This is currently the only way to do this in Swift: see
/// https://forums.swift.org/t/support-debug-only-code/11037 for a discussion.
internal func debugOnly(_ body: () -> Void) {
assert({ body(); return true }())
}
/// A `ChannelHandler` that handles HTTP pipelining by buffering inbound data until a
/// response has been sent.
///
/// This handler ensures that HTTP server pipelines only process one request at a time.
/// This is the safest way for pipelining-unaware code to operate, as it ensures that
/// mutation of any shared server state is not parallelised, and that responses are always
/// sent for each request in turn. In almost all cases this is the behaviour that a
/// pipeline will want. This is achieved without doing too much buffering by preventing
/// the `Channel` from reading from the socket until a complete response is processed,
/// ensuring that a malicious client is not capable of overwhelming a server by shoving
/// an enormous amount of data down the `Channel` while a server is processing a
/// slow response.
///
/// See [RFC 7320 Section 6.3.2](https://tools.ietf.org/html/rfc7230#section-6.3.2) for
/// more details on safely handling HTTP pipelining.
///
/// In addition to handling the request buffering, this `ChannelHandler` is aware of
/// TCP half-close. While there are very few HTTP clients that are capable of TCP
/// half-close, clients that are not HTTP specific (e.g. `netcat`) may trigger a TCP
/// half-close. Having this `ChannelHandler` be aware of TCP half-close makes it easier
/// to build HTTP servers that are resilient to this kind of behaviour.
///
/// The TCP half-close handling is done by buffering the half-close notification along
/// with the HTTP request parts. The half-close notification will be delivered in order
/// with the rest of the reads. If the half-close occurs either before a request is received
/// or during a request body upload, it will be delivered immediately. If a half-close is
/// received immediately after `HTTPServerRequestPart.end`, it will also be passed along
/// immediately, allowing this signal to be seen by the HTTP server as early as possible.
public final class HTTPServerPipelineHandler: ChannelDuplexHandler, RemovableChannelHandler {
public typealias InboundIn = HTTPServerRequestPart
public typealias InboundOut = HTTPServerRequestPart
public typealias OutboundIn = HTTPServerResponsePart
public typealias OutboundOut = HTTPServerResponsePart
public init() {
self.nextExpectedInboundMessage = nil
self.nextExpectedOutboundMessage = nil
debugOnly {
self.nextExpectedInboundMessage = .head
self.nextExpectedOutboundMessage = .head
}
}
/// The state of the HTTP connection.
private enum ConnectionState {
/// We are waiting for a HTTP response to complete before we
/// let the next request in.
case responseEndPending
/// We are in the middle of both a request and a response and waiting for both `.end`s.
case requestAndResponseEndPending
/// Nothing is active on this connection, the next message we expect would be a request `.head`.
case idle
/// The server has responded early, before the request has completed. We need
/// to wait for the request to complete, but won't block anything.
case requestEndPending
/// The server has closed the output partway through a request. The server will never
/// act again, but this may not be in error, so we'll forward the rest of this request to the server.
case sentCloseOutputRequestEndPending
/// The server has closed the output, and a complete request has been delivered.
/// It's never going to act again. Generally we expect this to be closely followed
/// by read EOF, but we need to keep reading to make that possible, so we
/// never suppress reads again.
case sentCloseOutput
mutating func requestHeadReceived() {
switch self {
case .idle:
self = .requestAndResponseEndPending
case .requestAndResponseEndPending, .responseEndPending, .requestEndPending,
.sentCloseOutputRequestEndPending, .sentCloseOutput:
preconditionFailure("received request head in state \(self)")
}
}
mutating func responseEndReceived() {
switch self {
case .responseEndPending:
// Got the response we were waiting for.
self = .idle
case .requestAndResponseEndPending:
// We got a response while still receiving a request, which we have to
// wait for.
self = .requestEndPending
case .sentCloseOutput, .sentCloseOutputRequestEndPending:
// This is a user error: they have sent close(mode: .output), but are continuing to write.
// The write will fail, so we can allow it to pass.
()
case .requestEndPending, .idle:
preconditionFailure("Unexpectedly received a response in state \(self)")
}
}
mutating func requestEndReceived() {
switch self {
case .requestEndPending:
// Got the request end we were waiting for.
self = .idle
case .requestAndResponseEndPending:
// We got a request and the response isn't done, wait for the
// response.
self = .responseEndPending
case .sentCloseOutputRequestEndPending:
// Got the request end we were waiting for.
self = .sentCloseOutput
case .responseEndPending, .idle, .sentCloseOutput:
preconditionFailure("Received second request")
}
}
mutating func closeOutputSent() {
switch self {
case .idle, .responseEndPending:
self = .sentCloseOutput
case .requestEndPending, .requestAndResponseEndPending:
self = .sentCloseOutputRequestEndPending
case .sentCloseOutput, .sentCloseOutputRequestEndPending:
// Weird to duplicate fail, but we tolerate it in both cases.
()
}
}
}
/// The events that this handler buffers while waiting for the server to
/// generate a response.
private enum BufferedEvent {
/// A channelRead event.
case channelRead(NIOAny)
case error(HTTPParserError)
/// A TCP half-close. This is buffered to ensure that subsequent channel
/// handlers that are aware of TCP half-close are informed about it in
/// the appropriate order.
case halfClose
}
/// The connection state
private var state = ConnectionState.idle
/// While we're waiting to send the response we don't read from the socket.
/// This keeps track of whether we need to call read() when we've send our response.
private var readPending = false
/// The buffered HTTP requests that are not going to be addressed yet. In general clients
/// don't pipeline, so this initially allocates no space for data at all. Clients that
/// do pipeline will cause dynamic resizing of the buffer, which is generally acceptable.
private var eventBuffer = CircularBuffer<BufferedEvent>(initialCapacity: 0)
enum NextExpectedMessageType {
case head
case bodyOrEnd
}
enum LifecycleState {
/// Operating normally, accepting all events.
case acceptingEvents
/// Quiescing but we're still waiting for the request's `.end` which means we still need to process input.
case quiescingWaitingForRequestEnd
/// Quiescing and the last request's `.end` has been seen which means we no longer accept any input.
case quiescingLastRequestEndReceived
/// Quiescing and we have issued a channel close. Further I/O here is not expected, and won't be managed.
case quiescingCompleted
}
private var lifecycleState: LifecycleState = .acceptingEvents
// always `nil` in release builds, never `nil` in debug builds
private var nextExpectedInboundMessage: Optional<NextExpectedMessageType>
// always `nil` in release builds, never `nil` in debug builds
private var nextExpectedOutboundMessage: Optional<NextExpectedMessageType>
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
switch self.lifecycleState {
case .quiescingLastRequestEndReceived, .quiescingCompleted:
// We're done, no more data for you.
return
case .acceptingEvents, .quiescingWaitingForRequestEnd:
// Still accepting I/O
()
}
if self.state == .sentCloseOutput {
// Drop all events in this state.
return
}
if self.eventBuffer.count != 0 || self.state == .responseEndPending {
self.eventBuffer.append(.channelRead(data))
return
} else {
self.deliverOneMessage(context: context, data: data)
}
}
private func deliverOneMessage(context: ChannelHandlerContext, data: NIOAny) {
assert(self.lifecycleState != .quiescingLastRequestEndReceived &&
self.lifecycleState != .quiescingCompleted,
"deliverOneMessage called in lifecycle illegal state \(self.lifecycleState)")
let msg = self.unwrapInboundIn(data)
debugOnly {
switch msg {
case .head:
assert(self.nextExpectedInboundMessage == .head)
self.nextExpectedInboundMessage = .bodyOrEnd
case .body:
assert(self.nextExpectedInboundMessage == .bodyOrEnd)
case .end:
assert(self.nextExpectedInboundMessage == .bodyOrEnd)
self.nextExpectedInboundMessage = .head
}
}
switch msg {
case .head:
self.state.requestHeadReceived()
case .end:
// New request is complete. We don't want any more data from now on.
self.state.requestEndReceived()
if self.lifecycleState == .quiescingWaitingForRequestEnd {
self.lifecycleState = .quiescingLastRequestEndReceived
self.eventBuffer.removeAll()
}
if self.lifecycleState == .quiescingLastRequestEndReceived && self.state == .idle {
self.lifecycleState = .quiescingCompleted
context.close(promise: nil)
}
case .body:
()
}
context.fireChannelRead(data)
}
private func deliverOneError(context: ChannelHandlerContext, error: Error) {
// there is one interesting case in this error sending logic: If we receive a `HTTPParserError` and we haven't
// received a full request nor the beginning of a response we should treat this as a full request. The reason
// is that what the user will probably do is send a `.badRequest` response and we should be in a state which
// allows that.
if (self.state == .idle || self.state == .requestEndPending) && error is HTTPParserError {
self.state = .responseEndPending
}
context.fireErrorCaught(error)
}
public func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
switch event {
case is ChannelShouldQuiesceEvent:
assert(self.lifecycleState == .acceptingEvents,
"unexpected lifecycle state when receiving ChannelShouldQuiesceEvent: \(self.lifecycleState)")
switch self.state {
case .responseEndPending:
// we're not in the middle of a request, let's just shut the door
self.lifecycleState = .quiescingLastRequestEndReceived
self.eventBuffer.removeAll()
case .idle, .sentCloseOutput:
// we're completely idle, let's just close
self.lifecycleState = .quiescingCompleted
self.eventBuffer.removeAll()
context.close(promise: nil)
case .requestEndPending, .requestAndResponseEndPending, .sentCloseOutputRequestEndPending:
// we're in the middle of a request, we'll need to keep accepting events until we see the .end.
// It's ok for us to forget we saw close output here, the lifecycle event will close for us.
self.lifecycleState = .quiescingWaitingForRequestEnd
}
case ChannelEvent.inputClosed:
// We only buffer half-close if there are request parts we're waiting to send.
// Otherwise we deliver the half-close immediately. Note that we deliver this
// even if the server has sent close output, as it's useful information.
if case .responseEndPending = self.state, self.eventBuffer.count > 0 {
self.eventBuffer.append(.halfClose)
} else {
context.fireUserInboundEventTriggered(event)
}
default:
context.fireUserInboundEventTriggered(event)
}
}
public func errorCaught(context: ChannelHandlerContext, error: Error) {
guard let httpError = error as? HTTPParserError else {
self.deliverOneError(context: context, error: error)
return
}
if case .responseEndPending = self.state {
self.eventBuffer.append(.error(httpError))
return
}
self.deliverOneError(context: context, error: error)
}
public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
assert(self.state != .requestEndPending,
"Received second response while waiting for first one to complete")
debugOnly {
let res = self.unwrapOutboundIn(data)
switch res {
case .head(let head) where head.isInformational:
assert(self.nextExpectedOutboundMessage == .head)
case .head:
assert(self.nextExpectedOutboundMessage == .head)
self.nextExpectedOutboundMessage = .bodyOrEnd
case .body:
assert(self.nextExpectedOutboundMessage == .bodyOrEnd)
case .end:
assert(self.nextExpectedOutboundMessage == .bodyOrEnd)
self.nextExpectedOutboundMessage = .head
}
}
var startReadingAgain = false
switch self.unwrapOutboundIn(data) {
case .head(var head) where self.lifecycleState != .acceptingEvents:
if head.isKeepAlive {
head.headers.replaceOrAdd(name: "connection", value: "close")
}
context.write(self.wrapOutboundOut(.head(head)), promise: promise)
case .end:
startReadingAgain = true
switch self.lifecycleState {
case .quiescingWaitingForRequestEnd where self.state == .responseEndPending:
// we just received the .end that we're missing so we can fall through to closing the connection
fallthrough
case .quiescingLastRequestEndReceived:
self.lifecycleState = .quiescingCompleted
context.write(data).flatMap {
context.close()
}.cascade(to: promise)
case .acceptingEvents, .quiescingWaitingForRequestEnd:
context.write(data, promise: promise)
case .quiescingCompleted:
// Uh, why are we writing more data here? We'll write it, but it should be guaranteed
// to fail.
assertionFailure("Wrote in quiescing completed state")
context.write(data, promise: promise)
}
case .body, .head:
context.write(data, promise: promise)
}
if startReadingAgain {
self.state.responseEndReceived()
self.deliverPendingRequests(context: context)
self.startReading(context: context)
}
}
public func read(context: ChannelHandlerContext) {
switch self.lifecycleState {
case .quiescingLastRequestEndReceived, .quiescingCompleted:
// We swallow all reads now, as we're going to close the connection.
()
case .acceptingEvents, .quiescingWaitingForRequestEnd:
if case .responseEndPending = self.state {
self.readPending = true
} else {
context.read()
}
}
}
public func handlerRemoved(context: ChannelHandlerContext) {
// We're being removed from the pipeline. We need to do a few things:
//
// 1. If we have buffered events, deliver them. While we shouldn't be
// re-entrantly called, we want to ensure that so we take a local copy.
// 2. If we are quiescing, we swallowed a quiescing event from the user: replay it,
// as the user has hopefully added a handler that will do something with this.
// 3. Finally, if we have a read pending, we need to release it.
//
// The basic theory here is that if there is anything we were going to do when we received
// either a request .end or a response .end, we do it now because there is no future for us.
// We also need to ensure we do not drop any data on the floor.
//
// At this stage we are no longer in the pipeline, so all further content should be
// blocked from reaching us. Thus we can avoid mutating our own internal state any
// longer.
let bufferedEvents = self.eventBuffer
for event in bufferedEvents {
switch event {
case .channelRead(let read):
context.fireChannelRead(read)
case .halfClose:
context.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
case .error(let error):
context.fireErrorCaught(error)
}
}
switch self.lifecycleState {
case .quiescingLastRequestEndReceived, .quiescingWaitingForRequestEnd:
context.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
case .acceptingEvents, .quiescingCompleted:
// Either we haven't quiesced, or we succeeded in doing it.
()
}
if self.readPending {
context.read()
}
}
public func channelInactive(context: ChannelHandlerContext) {
// Welp, this channel isn't going to work anymore. We may as well drop our pending events here, as we
// cannot be expected to act on them any longer.
//
// Side note: it's important that we drop these. If we don't, handlerRemoved will deliver them all.
// While it's fair to immediately pipeline a channel where the user chose to remove the HTTPPipelineHandler,
// it's deeply unfair to do so to a user that didn't choose to do that, where it happened to them only because
// the channel closed.
//
// We set keepingCapacity to avoid this reallocating a buffer, as we'll just free it shortly anyway.
self.eventBuffer.removeAll(keepingCapacity: true)
context.fireChannelInactive()
}
public func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise<Void>?) {
var shouldRead = false
if mode == .output {
// We need to do special handling here. If the server is closing output they don't intend to write anymore.
// That means we want to drop anything up to the end of the in-flight request.
self.dropAllButInFlightRequest()
self.state.closeOutputSent()
// If there's a read pending, we should deliver it after we forward the close on.
shouldRead = self.readPending
}
context.close(mode: mode, promise: promise)
// Double-check readPending here in case something weird happened.
//
// Note that because of the state transition in closeOutputSent() above we likely won't actually
// forward any further reads to the user, unless they belong to a request currently streaming in.
// Any reads past that point will be dropped in channelRead().
if shouldRead && self.readPending {
self.readPending = false
context.read()
}
}
/// A response has been sent: we can now start passing reads through
/// again if there are no further pending requests, and send any read()
/// call we may have swallowed.
private func startReading(context: ChannelHandlerContext) {
if self.readPending && self.state != .responseEndPending {
switch self.lifecycleState {
case .quiescingLastRequestEndReceived, .quiescingCompleted:
// No more reads in these states.
()
case .acceptingEvents, .quiescingWaitingForRequestEnd:
self.readPending = false
context.read()
}
}
}
/// A response has been sent: deliver all pending requests and
/// mark the channel ready to handle more requests.
private func deliverPendingRequests(context: ChannelHandlerContext) {
var deliveredRead = false
while self.state != .responseEndPending, let event = self.eventBuffer.first {
self.eventBuffer.removeFirst()
switch event {
case .channelRead(let read):
self.deliverOneMessage(context: context, data: read)
deliveredRead = true
case .error(let error):
self.deliverOneError(context: context, error: error)
case .halfClose:
// When we fire the half-close, we want to forget all prior reads.
// They will just trigger further half-close notifications we don't
// need.
self.readPending = false
context.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
}
}
if deliveredRead {
context.fireChannelReadComplete()
}
// We need to quickly check whether there is an EOF waiting here, because
// if there is we should also unbuffer it and pass it along. There is no
// advantage in sitting on it, and it may help the later channel handlers
// be more sensible about keep-alive logic if they can see this early.
// This is done after `fireChannelReadComplete` to keep the same observable
// behaviour as `SocketChannel`, which fires these events in this order.
if case .some(.halfClose) = self.eventBuffer.first {
self.eventBuffer.removeFirst()
self.readPending = false
context.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
}
}
private func dropAllButInFlightRequest() {
// We're going to walk the request buffer up to the next `.head` and drop from there.
let maybeFirstHead = self.eventBuffer.firstIndex(where: { element in
switch element {
case .channelRead(let read):
switch self.unwrapInboundIn(read) {
case .head:
return true
case .body, .end:
return false
}
case .error, .halfClose:
// Leave these where they are, if they're before the next .head we still want to deliver them.
// If they're after the next .head, we don't care.
return false
}
})
guard let firstHead = maybeFirstHead else {
return
}
self.eventBuffer.removeSubrange(firstHead...)
}
}
@available(*, unavailable)
extension HTTPServerPipelineHandler: Sendable {}