Functions passed to non-`Sendable` `ChannelHandler`s do *not* need to be `Sendable` (#2249)

This commit is contained in:
David Nadoba 2022-08-26 17:59:34 +02:00 committed by GitHub
parent 9d6a041b12
commit 1100054107
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 10 additions and 217 deletions

View File

@ -62,15 +62,9 @@ public final class HTTPServerUpgradeHandler: ChannelInboundHandler, RemovableCha
public typealias InboundIn = HTTPServerRequestPart public typealias InboundIn = HTTPServerRequestPart
public typealias InboundOut = HTTPServerRequestPart public typealias InboundOut = HTTPServerRequestPart
public typealias OutboundOut = HTTPServerResponsePart public typealias OutboundOut = HTTPServerResponsePart
#if swift(>=5.7)
private typealias UpgradeCompletionHandler = @Sendable (ChannelHandlerContext) -> Void
#else
private typealias UpgradeCompletionHandler = (ChannelHandlerContext) -> Void
#endif
private let upgraders: [String: HTTPServerProtocolUpgrader] private let upgraders: [String: HTTPServerProtocolUpgrader]
private let upgradeCompletionHandler: UpgradeCompletionHandler private let upgradeCompletionHandler: (ChannelHandlerContext) -> Void
private let httpEncoder: HTTPResponseEncoder? private let httpEncoder: HTTPResponseEncoder?
private let extraHTTPHandlers: [RemovableChannelHandler] private let extraHTTPHandlers: [RemovableChannelHandler]
@ -82,7 +76,6 @@ public final class HTTPServerUpgradeHandler: ChannelInboundHandler, RemovableCha
private var upgradeState: UpgradeState = .idle private var upgradeState: UpgradeState = .idle
private var receivedMessages: CircularBuffer<NIOAny> = CircularBuffer() private var receivedMessages: CircularBuffer<NIOAny> = CircularBuffer()
#if swift(>=5.7)
/// Create a `HTTPServerUpgradeHandler`. /// Create a `HTTPServerUpgradeHandler`.
/// ///
/// - Parameter upgraders: All `HTTPServerProtocolUpgrader` objects that this pipeline will be able /// - Parameter upgraders: All `HTTPServerProtocolUpgrader` objects that this pipeline will be able
@ -94,42 +87,11 @@ public final class HTTPServerUpgradeHandler: ChannelInboundHandler, RemovableCha
/// this should include the `HTTPDecoder`, but should also include any other handler that cannot tolerate /// this should include the `HTTPDecoder`, but should also include any other handler that cannot tolerate
/// receiving non-HTTP data. /// receiving non-HTTP data.
/// - Parameter upgradeCompletionHandler: A block that will be fired when HTTP upgrade is complete. /// - Parameter upgradeCompletionHandler: A block that will be fired when HTTP upgrade is complete.
@preconcurrency public init(
public convenience init(
upgraders: [HTTPServerProtocolUpgrader],
httpEncoder: HTTPResponseEncoder,
extraHTTPHandlers: [RemovableChannelHandler],
upgradeCompletionHandler: @escaping @Sendable (ChannelHandlerContext) -> Void
) {
self.init(_upgraders: upgraders, httpEncoder: httpEncoder, extraHTTPHandlers: extraHTTPHandlers, upgradeCompletionHandler: upgradeCompletionHandler)
}
#else
/// Create a `HTTPServerUpgradeHandler`.
///
/// - Parameter upgraders: All `HTTPServerProtocolUpgrader` objects that this pipeline will be able
/// to use to handle HTTP upgrade.
/// - Parameter httpEncoder: The `HTTPResponseEncoder` encoding responses from this handler and which will
/// be removed from the pipeline once the upgrade response is sent. This is used to ensure
/// that the pipeline will be in a clean state after upgrade.
/// - Parameter extraHTTPHandlers: Any other handlers that are directly related to handling HTTP. At the very least
/// this should include the `HTTPDecoder`, but should also include any other handler that cannot tolerate
/// receiving non-HTTP data.
/// - Parameter upgradeCompletionHandler: A block that will be fired when HTTP upgrade is complete.
public convenience init(
upgraders: [HTTPServerProtocolUpgrader], upgraders: [HTTPServerProtocolUpgrader],
httpEncoder: HTTPResponseEncoder, httpEncoder: HTTPResponseEncoder,
extraHTTPHandlers: [RemovableChannelHandler], extraHTTPHandlers: [RemovableChannelHandler],
upgradeCompletionHandler: @escaping (ChannelHandlerContext) -> Void upgradeCompletionHandler: @escaping (ChannelHandlerContext) -> Void
) {
self.init(_upgraders: upgraders, httpEncoder: httpEncoder, extraHTTPHandlers: extraHTTPHandlers, upgradeCompletionHandler: upgradeCompletionHandler)
}
#endif
private init(
_upgraders upgraders: [HTTPServerProtocolUpgrader],
httpEncoder: HTTPResponseEncoder,
extraHTTPHandlers: [RemovableChannelHandler],
upgradeCompletionHandler: @escaping UpgradeCompletionHandler
) { ) {
var upgraderMap = [String: HTTPServerProtocolUpgrader]() var upgraderMap = [String: HTTPServerProtocolUpgrader]()
for upgrader in upgraders { for upgrader in upgraders {

View File

@ -92,15 +92,9 @@ public final class NIOHTTPClientUpgradeHandler: ChannelDuplexHandler, RemovableC
public typealias InboundIn = HTTPClientResponsePart public typealias InboundIn = HTTPClientResponsePart
public typealias InboundOut = HTTPClientResponsePart public typealias InboundOut = HTTPClientResponsePart
#if swift(>=5.7)
private typealias UpgradeCompletionHandler = @Sendable (ChannelHandlerContext) -> Void
#else
private typealias UpgradeCompletionHandler = (ChannelHandlerContext) -> Void
#endif
private var upgraders: [NIOHTTPClientProtocolUpgrader] private var upgraders: [NIOHTTPClientProtocolUpgrader]
private let httpHandlers: [RemovableChannelHandler] private let httpHandlers: [RemovableChannelHandler]
private let upgradeCompletionHandler: UpgradeCompletionHandler private let upgradeCompletionHandler: (ChannelHandlerContext) -> Void
/// Whether we've already seen the first response from our initial upgrade request. /// Whether we've already seen the first response from our initial upgrade request.
private var seenFirstResponse = false private var seenFirstResponse = false
@ -109,27 +103,6 @@ public final class NIOHTTPClientUpgradeHandler: ChannelDuplexHandler, RemovableC
private var receivedMessages: CircularBuffer<NIOAny> = CircularBuffer() private var receivedMessages: CircularBuffer<NIOAny> = CircularBuffer()
#if swift(>=5.7)
/// Create a `HTTPClientUpgradeHandler`.
///
/// - Parameter upgraders: All `HTTPClientProtocolUpgrader` objects that will add their upgrade request
/// headers and handle the upgrade if there is a response for their protocol. They should be placed in
/// order of the preference for the upgrade.
/// - Parameter httpHandlers: All `RemovableChannelHandler` objects which will be removed from the pipeline
/// once the upgrade response is sent. This is used to ensure that the pipeline will be in a clean state
/// after the upgrade. It should include any handlers that are directly related to handling HTTP.
/// At the very least this should include the `HTTPEncoder` and `HTTPDecoder`, but should also include
/// any other handler that cannot tolerate receiving non-HTTP data.
/// - Parameter upgradeCompletionHandler: A closure that will be fired when HTTP upgrade is complete.
@preconcurrency
public convenience init(
upgraders: [NIOHTTPClientProtocolUpgrader],
httpHandlers: [RemovableChannelHandler],
upgradeCompletionHandler: @escaping @Sendable (ChannelHandlerContext) -> Void
) {
self.init(_upgraders: upgraders, httpHandlers: httpHandlers, upgradeCompletionHandler: upgradeCompletionHandler)
}
#else
/// Create a `HTTPClientUpgradeHandler`. /// Create a `HTTPClientUpgradeHandler`.
/// ///
/// - Parameter upgraders: All `HTTPClientProtocolUpgrader` objects that will add their upgrade request /// - Parameter upgraders: All `HTTPClientProtocolUpgrader` objects that will add their upgrade request
@ -148,12 +121,11 @@ public final class NIOHTTPClientUpgradeHandler: ChannelDuplexHandler, RemovableC
) { ) {
self.init(_upgraders: upgraders, httpHandlers: httpHandlers, upgradeCompletionHandler: upgradeCompletionHandler) self.init(_upgraders: upgraders, httpHandlers: httpHandlers, upgradeCompletionHandler: upgradeCompletionHandler)
} }
#endif
private init( private init(
_upgraders upgraders: [NIOHTTPClientProtocolUpgrader], _upgraders upgraders: [NIOHTTPClientProtocolUpgrader],
httpHandlers: [RemovableChannelHandler], httpHandlers: [RemovableChannelHandler],
upgradeCompletionHandler: @escaping UpgradeCompletionHandler upgradeCompletionHandler: @escaping (ChannelHandlerContext) -> Void
) { ) {
precondition(upgraders.count > 0, "A minimum of one protocol upgrader must be specified.") precondition(upgraders.count > 0, "A minimum of one protocol upgrader must be specified.")

View File

@ -60,26 +60,10 @@ public final class ApplicationProtocolNegotiationHandler: ChannelInboundHandler,
public typealias InboundIn = Any public typealias InboundIn = Any
public typealias InboundOut = Any public typealias InboundOut = Any
#if swift(>=5.7)
private let completionHandler: @Sendable (ALPNResult, Channel) -> EventLoopFuture<Void>
#else
private let completionHandler: (ALPNResult, Channel) -> EventLoopFuture<Void> private let completionHandler: (ALPNResult, Channel) -> EventLoopFuture<Void>
#endif
private var waitingForUser: Bool private var waitingForUser: Bool
private var eventBuffer: [NIOAny] private var eventBuffer: [NIOAny]
#if swift(>=5.7)
/// Create an `ApplicationProtocolNegotiationHandler` with the given completion
/// callback.
///
/// - Parameter alpnCompleteHandler: The closure that will fire when ALPN
/// negotiation has completed.
@preconcurrency public init(alpnCompleteHandler: @Sendable @escaping (ALPNResult, Channel) -> EventLoopFuture<Void>) {
self.completionHandler = alpnCompleteHandler
self.waitingForUser = false
self.eventBuffer = []
}
#else
/// Create an `ApplicationProtocolNegotiationHandler` with the given completion /// Create an `ApplicationProtocolNegotiationHandler` with the given completion
/// callback. /// callback.
/// ///
@ -90,20 +74,7 @@ public final class ApplicationProtocolNegotiationHandler: ChannelInboundHandler,
self.waitingForUser = false self.waitingForUser = false
self.eventBuffer = [] self.eventBuffer = []
} }
#endif
#if swift(>=5.7)
/// Create an `ApplicationProtocolNegotiationHandler` with the given completion
/// callback.
///
/// - Parameter alpnCompleteHandler: The closure that will fire when ALPN
/// negotiation has completed.
@preconcurrency public convenience init(alpnCompleteHandler: @Sendable @escaping (ALPNResult) -> EventLoopFuture<Void>) {
self.init { result, _ in
alpnCompleteHandler(result)
}
}
#else
/// Create an `ApplicationProtocolNegotiationHandler` with the given completion /// Create an `ApplicationProtocolNegotiationHandler` with the given completion
/// callback. /// callback.
/// ///
@ -114,7 +85,6 @@ public final class ApplicationProtocolNegotiationHandler: ChannelInboundHandler,
alpnCompleteHandler(result) alpnCompleteHandler(result)
} }
} }
#endif
public func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { public func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
guard let tlsEvent = event as? TLSUserEvent else { guard let tlsEvent = event as? TLSUserEvent else {

View File

@ -98,26 +98,15 @@ public final class SNIHandler: ByteToMessageDecoder {
public var cumulationBuffer: Optional<ByteBuffer> public var cumulationBuffer: Optional<ByteBuffer>
public typealias InboundIn = ByteBuffer public typealias InboundIn = ByteBuffer
public typealias InboundOut = ByteBuffer public typealias InboundOut = ByteBuffer
#if swift(>=5.7)
private let completionHandler: @Sendable (SNIResult) -> EventLoopFuture<Void>
#else
private let completionHandler: (SNIResult) -> EventLoopFuture<Void> private let completionHandler: (SNIResult) -> EventLoopFuture<Void>
#endif
private var waitingForUser: Bool private var waitingForUser: Bool
#if swift(>=5.7)
@preconcurrency public init(sniCompleteHandler: @Sendable @escaping (SNIResult) -> EventLoopFuture<Void>) {
self.cumulationBuffer = nil
self.completionHandler = sniCompleteHandler
self.waitingForUser = false
}
#else
public init(sniCompleteHandler: @escaping (SNIResult) -> EventLoopFuture<Void>) { public init(sniCompleteHandler: @escaping (SNIResult) -> EventLoopFuture<Void>) {
self.cumulationBuffer = nil self.cumulationBuffer = nil
self.completionHandler = sniCompleteHandler self.completionHandler = sniCompleteHandler
self.waitingForUser = false self.waitingForUser = false
} }
#endif
public func decodeLast(context: ChannelHandlerContext, buffer: inout ByteBuffer, seenEOF: Bool) throws -> DecodingState { public func decodeLast(context: ChannelHandlerContext, buffer: inout ByteBuffer, seenEOF: Bool) throws -> DecodingState {
context.fireChannelRead(NIOAny(buffer)) context.fireChannelRead(NIOAny(buffer))

View File

@ -25,12 +25,6 @@ public typealias NIOWebClientSocketUpgrader = NIOWebSocketClientUpgrader
/// pipeline to remove the HTTP `ChannelHandler`s. /// pipeline to remove the HTTP `ChannelHandler`s.
public final class NIOWebSocketClientUpgrader: NIOHTTPClientProtocolUpgrader { public final class NIOWebSocketClientUpgrader: NIOHTTPClientProtocolUpgrader {
#if swift(>=5.7)
private typealias UpgradePipelineHandler = @Sendable (Channel, HTTPResponseHead) -> EventLoopFuture<Void>
#else
private typealias UpgradePipelineHandler = (Channel, HTTPResponseHead) -> EventLoopFuture<Void>
#endif
/// RFC 6455 specs this as the required entry in the Upgrade header. /// RFC 6455 specs this as the required entry in the Upgrade header.
public let supportedProtocol: String = "websocket" public let supportedProtocol: String = "websocket"
/// None of the websocket headers are actually defined as 'required'. /// None of the websocket headers are actually defined as 'required'.
@ -39,54 +33,18 @@ public final class NIOWebSocketClientUpgrader: NIOHTTPClientProtocolUpgrader {
private let requestKey: String private let requestKey: String
private let maxFrameSize: Int private let maxFrameSize: Int
private let automaticErrorHandling: Bool private let automaticErrorHandling: Bool
private let upgradePipelineHandler: UpgradePipelineHandler private let upgradePipelineHandler: (Channel, HTTPResponseHead) -> EventLoopFuture<Void>
#if swift(>=5.7)
/// - Parameters: /// - Parameters:
/// - requestKey: sent to the server in the `Sec-WebSocket-Key` HTTP header. Default is random request key. /// - requestKey: sent to the server in the `Sec-WebSocket-Key` HTTP header. Default is random request key.
/// - maxFrameSize: largest incoming `WebSocketFrame` size in bytes. Default is 16,384 bytes. /// - maxFrameSize: largest incoming `WebSocketFrame` size in bytes. Default is 16,384 bytes.
/// - automaticErrorHandling: If true, adds `WebSocketProtocolErrorHandler` to the channel pipeline to catch and respond to WebSocket protocol errors. Default is true. /// - automaticErrorHandling: If true, adds `WebSocketProtocolErrorHandler` to the channel pipeline to catch and respond to WebSocket protocol errors. Default is true.
/// - upgradePipelineHandler: called once the upgrade was successful /// - upgradePipelineHandler: called once the upgrade was successful
@preconcurrency public init(
public convenience init(
requestKey: String = randomRequestKey(),
maxFrameSize: Int = 1 << 14,
automaticErrorHandling: Bool = true,
upgradePipelineHandler: @escaping @Sendable (Channel, HTTPResponseHead) -> EventLoopFuture<Void>
) {
self.init(
_requestKey: requestKey,
maxFrameSize: maxFrameSize,
automaticErrorHandling: automaticErrorHandling,
upgradePipelineHandler: upgradePipelineHandler
)
}
#else
/// - Parameters:
/// - requestKey: sent to the server in the `Sec-WebSocket-Key` HTTP header. Default is random request key.
/// - maxFrameSize: largest incoming `WebSocketFrame` size in bytes. Default is 16,384 bytes.
/// - automaticErrorHandling: If true, adds `WebSocketProtocolErrorHandler` to the channel pipeline to catch and respond to WebSocket protocol errors. Default is true.
/// - upgradePipelineHandler: called once the upgrade was successful
public convenience init(
requestKey: String = randomRequestKey(), requestKey: String = randomRequestKey(),
maxFrameSize: Int = 1 << 14, maxFrameSize: Int = 1 << 14,
automaticErrorHandling: Bool = true, automaticErrorHandling: Bool = true,
upgradePipelineHandler: @escaping (Channel, HTTPResponseHead) -> EventLoopFuture<Void> upgradePipelineHandler: @escaping (Channel, HTTPResponseHead) -> EventLoopFuture<Void>
) {
self.init(
_requestKey: requestKey,
maxFrameSize: maxFrameSize,
automaticErrorHandling: automaticErrorHandling,
upgradePipelineHandler: upgradePipelineHandler
)
}
#endif
private init(
_requestKey requestKey: String,
maxFrameSize: Int,
automaticErrorHandling: Bool,
upgradePipelineHandler: @escaping UpgradePipelineHandler
) { ) {
precondition(requestKey != "", "The request key must contain a valid Sec-WebSocket-Key") precondition(requestKey != "", "The request key must contain a valid Sec-WebSocket-Key")
precondition(maxFrameSize <= UInt32.max, "invalid overlarge max frame size") precondition(maxFrameSize <= UInt32.max, "invalid overlarge max frame size")

View File

@ -62,13 +62,6 @@ fileprivate extension HTTPHeaders {
/// This upgrader assumes that the `HTTPServerUpgradeHandler` will appropriately mutate the pipeline to /// This upgrader assumes that the `HTTPServerUpgradeHandler` will appropriately mutate the pipeline to
/// remove the HTTP `ChannelHandler`s. /// remove the HTTP `ChannelHandler`s.
public final class NIOWebSocketServerUpgrader: HTTPServerProtocolUpgrader { public final class NIOWebSocketServerUpgrader: HTTPServerProtocolUpgrader {
#if swift(>=5.7)
private typealias ShouldUpgrade = @Sendable (Channel, HTTPRequestHead) -> EventLoopFuture<HTTPHeaders?>
private typealias UpgradePipelineHandler = @Sendable (Channel, HTTPRequestHead) -> EventLoopFuture<Void>
#else
private typealias ShouldUpgrade = (Channel, HTTPRequestHead) -> EventLoopFuture<HTTPHeaders?>
private typealias UpgradePipelineHandler = (Channel, HTTPRequestHead) -> EventLoopFuture<Void>
#endif
/// RFC 6455 specs this as the required entry in the Upgrade header. /// RFC 6455 specs this as the required entry in the Upgrade header.
public let supportedProtocol: String = "websocket" public let supportedProtocol: String = "websocket"
@ -77,8 +70,8 @@ public final class NIOWebSocketServerUpgrader: HTTPServerProtocolUpgrader {
/// which NIO requires. We check for these manually. /// which NIO requires. We check for these manually.
public let requiredUpgradeHeaders: [String] = [] public let requiredUpgradeHeaders: [String] = []
private let shouldUpgrade: ShouldUpgrade private let shouldUpgrade: (Channel, HTTPRequestHead) -> EventLoopFuture<HTTPHeaders?>
private let upgradePipelineHandler: UpgradePipelineHandler private let upgradePipelineHandler: (Channel, HTTPRequestHead) -> EventLoopFuture<Void>
private let maxFrameSize: Int private let maxFrameSize: Int
private let automaticErrorHandling: Bool private let automaticErrorHandling: Bool
@ -135,7 +128,6 @@ public final class NIOWebSocketServerUpgrader: HTTPServerProtocolUpgrader {
} }
#endif #endif
#if swift(>=5.7)
/// Create a new `NIOWebSocketServerUpgrader`. /// Create a new `NIOWebSocketServerUpgrader`.
/// ///
/// - parameters: /// - parameters:
@ -156,61 +148,11 @@ public final class NIOWebSocketServerUpgrader: HTTPServerProtocolUpgrader {
/// websocket protocol. This only needs to add the user handlers: the /// websocket protocol. This only needs to add the user handlers: the
/// `WebSocketFrameEncoder` and `WebSocketFrameDecoder` will have been added to the /// `WebSocketFrameEncoder` and `WebSocketFrameDecoder` will have been added to the
/// pipeline automatically. /// pipeline automatically.
@preconcurrency public init(
public convenience init(
maxFrameSize: Int,
automaticErrorHandling: Bool = true,
shouldUpgrade: @escaping @Sendable (Channel, HTTPRequestHead) -> EventLoopFuture<HTTPHeaders?>,
upgradePipelineHandler: @escaping @Sendable (Channel, HTTPRequestHead) -> EventLoopFuture<Void>
) {
self.init(
_maxFrameSize: maxFrameSize,
automaticErrorHandling: automaticErrorHandling,
shouldUpgrade: shouldUpgrade,
upgradePipelineHandler: upgradePipelineHandler
)
}
#else
/// Create a new `NIOWebSocketServerUpgrader`.
///
/// - parameters:
/// - maxFrameSize: The maximum frame size the decoder is willing to tolerate from the
/// remote peer. WebSockets in principle allows frame sizes up to `2**64` bytes, but
/// this is an objectively unreasonable maximum value (on AMD64 systems it is not
/// possible to even. Users may set this to any value up to `UInt32.max`.
/// - automaticErrorHandling: Whether the pipeline should automatically handle protocol
/// errors by sending error responses and closing the connection. Defaults to `true`,
/// may be set to `false` if the user wishes to handle their own errors.
/// - shouldUpgrade: A callback that determines whether the websocket request should be
/// upgraded. This callback is responsible for creating a `HTTPHeaders` object with
/// any headers that it needs on the response *except for* the `Upgrade`, `Connection`,
/// and `Sec-WebSocket-Accept` headers, which this upgrader will handle. Should return
/// an `EventLoopFuture` containing `nil` if the upgrade should be refused.
/// - upgradePipelineHandler: A function that will be called once the upgrade response is
/// flushed, and that is expected to mutate the `Channel` appropriately to handle the
/// websocket protocol. This only needs to add the user handlers: the
/// `WebSocketFrameEncoder` and `WebSocketFrameDecoder` will have been added to the
/// pipeline automatically.
public convenience init(
maxFrameSize: Int, maxFrameSize: Int,
automaticErrorHandling: Bool = true, automaticErrorHandling: Bool = true,
shouldUpgrade: @escaping (Channel, HTTPRequestHead) -> EventLoopFuture<HTTPHeaders?>, shouldUpgrade: @escaping (Channel, HTTPRequestHead) -> EventLoopFuture<HTTPHeaders?>,
upgradePipelineHandler: @escaping (Channel, HTTPRequestHead) -> EventLoopFuture<Void> upgradePipelineHandler: @escaping (Channel, HTTPRequestHead) -> EventLoopFuture<Void>
) {
self.init(
_maxFrameSize: maxFrameSize,
automaticErrorHandling: automaticErrorHandling,
shouldUpgrade: shouldUpgrade,
upgradePipelineHandler: upgradePipelineHandler
)
}
#endif
private init(
_maxFrameSize maxFrameSize: Int,
automaticErrorHandling: Bool,
shouldUpgrade: @escaping ShouldUpgrade,
upgradePipelineHandler: @escaping UpgradePipelineHandler
) { ) {
precondition(maxFrameSize <= UInt32.max, "invalid overlarge max frame size") precondition(maxFrameSize <= UInt32.max, "invalid overlarge max frame size")
self.shouldUpgrade = shouldUpgrade self.shouldUpgrade = shouldUpgrade