Adopt `Sendable` for closures in `HTTPPipelineSetup.swift` (#2214)

* Adopt `Sendable` for closures in `HTTPPipelineSetup.swift`

* make `UnsafeTransfer` and `UnsafeMutableTransferBox` available in Swift 5.4 too

* fix code duplication

* Copy `UnsafeTransfer` to `NIOHTTP1Tests` to be able to remove `@testable` from `NIOCore` import
This commit is contained in:
David Nadoba 2022-07-05 11:01:04 +02:00 committed by GitHub
parent 0abf7eb929
commit 8c922223db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 313 additions and 81 deletions

View File

@ -24,12 +24,11 @@ public typealias NIOSendable = Any
public protocol NIOPreconcurrencySendable {}
#endif
#if swift(>=5.5) && canImport(_Concurrency)
/// ``UnsafeTransfer`` can be used to make non-`Sendable` values `Sendable`.
/// As the name implies, the usage of this is unsafe because it disables the sendable checking of the compiler.
/// It can be used similar to `@unsafe Sendable` but for values instead of types.
@usableFromInline
struct UnsafeTransfer<Wrapped>: @unchecked Sendable {
struct UnsafeTransfer<Wrapped> {
@usableFromInline
var wrappedValue: Wrapped
@ -39,25 +38,13 @@ struct UnsafeTransfer<Wrapped>: @unchecked Sendable {
}
}
#if swift(>=5.5) && canImport(_Concurrency)
extension UnsafeTransfer: @unchecked Sendable {}
#endif
extension UnsafeTransfer: Equatable where Wrapped: Equatable {}
extension UnsafeTransfer: Hashable where Wrapped: Hashable {}
#endif
#if swift(>=5.5) && canImport(_Concurrency)
/// ``UnsafeMutableTransferBox`` can be used to make non-`Sendable` values `Sendable` and mutable.
/// It can be used to capture local mutable values in a `@Sendable` closure and mutate them from within the closure.
/// As the name implies, the usage of this is unsafe because it disables the sendable checking of the compiler and does not add any synchronisation.
@usableFromInline
final class UnsafeMutableTransferBox<Wrapped>: @unchecked Sendable {
@usableFromInline
var wrappedValue: Wrapped
@inlinable
init(_ wrappedValue: Wrapped) {
self.wrappedValue = wrappedValue
}
}
#else
/// ``UnsafeMutableTransferBox`` can be used to make non-`Sendable` values `Sendable` and mutable.
/// It can be used to capture local mutable values in a `@Sendable` closure and mutate them from within the closure.
/// As the name implies, the usage of this is unsafe because it disables the sendable checking of the compiler and does not add any synchronisation.
@ -71,4 +58,6 @@ final class UnsafeMutableTransferBox<Wrapped> {
self.wrappedValue = wrappedValue
}
}
#if swift(>=5.5) && canImport(_Concurrency)
extension UnsafeMutableTransferBox: @unchecked Sendable {}
#endif

View File

@ -14,11 +14,19 @@
import NIOCore
#if swift(>=5.7)
/// Configuration required to configure a HTTP client pipeline for upgrade.
///
/// See the documentation for `HTTPClientUpgradeHandler` for details on these
/// properties.
public typealias NIOHTTPClientUpgradeConfiguration = (upgraders: [NIOHTTPClientProtocolUpgrader], completionHandler: @Sendable (ChannelHandlerContext) -> Void)
#else
/// Configuration required to configure a HTTP client pipeline for upgrade.
///
/// See the documentation for `HTTPClientUpgradeHandler` for details on these
/// properties.
public typealias NIOHTTPClientUpgradeConfiguration = (upgraders: [NIOHTTPClientProtocolUpgrader], completionHandler: (ChannelHandlerContext) -> Void)
#endif
/// Configuration required to configure a HTTP server pipeline for upgrade.
///
@ -27,7 +35,11 @@ public typealias NIOHTTPClientUpgradeConfiguration = (upgraders: [NIOHTTPClientP
@available(*, deprecated, renamed: "NIOHTTPServerUpgradeConfiguration")
public typealias HTTPUpgradeConfiguration = NIOHTTPServerUpgradeConfiguration
#if swift(>=5.7)
public typealias NIOHTTPServerUpgradeConfiguration = (upgraders: [HTTPServerProtocolUpgrader], completionHandler: @Sendable (ChannelHandlerContext) -> Void)
#else
public typealias NIOHTTPServerUpgradeConfiguration = (upgraders: [HTTPServerProtocolUpgrader], completionHandler: (ChannelHandlerContext) -> Void)
#endif
extension ChannelPipeline {
/// Configure a `ChannelPipeline` for use as a HTTP client.
@ -44,6 +56,29 @@ extension ChannelPipeline {
withClientUpgrade: nil)
}
#if swift(>=5.7)
/// Configure a `ChannelPipeline` for use as a HTTP client with a client upgrader configuration.
///
/// - parameters:
/// - position: The position in the `ChannelPipeline` where to add the HTTP client handlers. Defaults to `.last`.
/// - leftOverBytesStrategy: The strategy to use when dealing with leftover bytes after removing the `HTTPDecoder`
/// from the pipeline.
/// - upgrade: Add a `HTTPClientUpgradeHandler` to the pipeline, configured for
/// HTTP upgrade. Should be a tuple of an array of `HTTPClientProtocolUpgrader` and
/// the upgrade completion handler. See the documentation on `HTTPClientUpgradeHandler`
/// for more details.
/// - returns: An `EventLoopFuture` that will fire when the pipeline is configured.
@preconcurrency
public func addHTTPClientHandlers(position: Position = .last,
leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes,
withClientUpgrade upgrade: NIOHTTPClientUpgradeConfiguration?) -> EventLoopFuture<Void> {
self._addHTTPClientHandlers(
position: position,
leftOverBytesStrategy: leftOverBytesStrategy,
withClientUpgrade: upgrade
)
}
#else
/// Configure a `ChannelPipeline` for use as a HTTP client with a client upgrader configuration.
///
/// - parameters:
@ -58,6 +93,17 @@ extension ChannelPipeline {
public func addHTTPClientHandlers(position: Position = .last,
leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes,
withClientUpgrade upgrade: NIOHTTPClientUpgradeConfiguration?) -> EventLoopFuture<Void> {
self._addHTTPClientHandlers(
position: position,
leftOverBytesStrategy: leftOverBytesStrategy,
withClientUpgrade: upgrade
)
}
#endif
private func _addHTTPClientHandlers(position: Position = .last,
leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes,
withClientUpgrade upgrade: NIOHTTPClientUpgradeConfiguration?) -> EventLoopFuture<Void> {
let future: EventLoopFuture<Void>
if self.eventLoop.inEventLoop {
@ -77,7 +123,46 @@ extension ChannelPipeline {
return future
}
#if swift(>=5.7)
/// Configure a `ChannelPipeline` for use as a HTTP server.
///
/// This function knows how to set up all first-party HTTP channel handlers appropriately
/// for server use. It supports the following features:
///
/// 1. Providing assistance handling clients that pipeline HTTP requests, using the
/// `HTTPServerPipelineHandler`.
/// 2. Supporting HTTP upgrade, using the `HTTPServerUpgradeHandler`.
///
/// This method will likely be extended in future with more support for other first-party
/// features.
///
/// - parameters:
/// - position: Where in the pipeline to add the HTTP server handlers, defaults to `.last`.
/// - pipelining: Whether to provide assistance handling HTTP clients that pipeline
/// their requests. Defaults to `true`. If `false`, users will need to handle
/// clients that pipeline themselves.
/// - upgrade: Whether to add a `HTTPServerUpgradeHandler` to the pipeline, configured for
/// HTTP upgrade. Defaults to `nil`, which will not add the handler to the pipeline. If
/// provided should be a tuple of an array of `HTTPServerProtocolUpgrader` and the upgrade
/// completion handler. See the documentation on `HTTPServerUpgradeHandler` for more
/// details.
/// - errorHandling: Whether to provide assistance handling protocol errors (e.g.
/// failure to parse the HTTP request) by sending 400 errors. Defaults to `true`.
/// - returns: An `EventLoopFuture` that will fire when the pipeline is configured.
@preconcurrency
public func configureHTTPServerPipeline(position: ChannelPipeline.Position = .last,
withPipeliningAssistance pipelining: Bool = true,
withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil,
withErrorHandling errorHandling: Bool = true) -> EventLoopFuture<Void> {
self._configureHTTPServerPipeline(
position: position,
withPipeliningAssistance: pipelining,
withServerUpgrade: upgrade,
withErrorHandling: errorHandling
)
}
#else
/// Configure a `ChannelPipeline` for use as a HTTP server.
///
/// This function knows how to set up all first-party HTTP channel handlers appropriately
@ -107,6 +192,19 @@ extension ChannelPipeline {
withPipeliningAssistance pipelining: Bool = true,
withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil,
withErrorHandling errorHandling: Bool = true) -> EventLoopFuture<Void> {
self._configureHTTPServerPipeline(
position: position,
withPipeliningAssistance: pipelining,
withServerUpgrade: upgrade,
withErrorHandling: errorHandling
)
}
#endif
private func _configureHTTPServerPipeline(position: ChannelPipeline.Position = .last,
withPipeliningAssistance pipelining: Bool = true,
withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil,
withErrorHandling errorHandling: Bool = true) -> EventLoopFuture<Void> {
let future: EventLoopFuture<Void>
if self.eventLoop.inEventLoop {
@ -131,6 +229,30 @@ extension ChannelPipeline {
}
extension ChannelPipeline.SynchronousOperations {
#if swift(>=5.7)
/// Configure a `ChannelPipeline` for use as a HTTP client with a client upgrader configuration.
///
/// - important: This **must** be called on the Channel's event loop.
/// - parameters:
/// - position: The position in the `ChannelPipeline` where to add the HTTP client handlers. Defaults to `.last`.
/// - leftOverBytesStrategy: The strategy to use when dealing with leftover bytes after removing the `HTTPDecoder`
/// from the pipeline.
/// - upgrade: Add a `HTTPClientUpgradeHandler` to the pipeline, configured for
/// HTTP upgrade. Should be a tuple of an array of `HTTPClientProtocolUpgrader` and
/// the upgrade completion handler. See the documentation on `HTTPClientUpgradeHandler`
/// for more details.
/// - throws: If the pipeline could not be configured.
@preconcurrency
public func addHTTPClientHandlers(position: ChannelPipeline.Position = .last,
leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes,
withClientUpgrade upgrade: NIOHTTPClientUpgradeConfiguration? = nil) throws {
try self._addHTTPClientHandlers(
position: position,
leftOverBytesStrategy: leftOverBytesStrategy,
withClientUpgrade: upgrade
)
}
#else
/// Configure a `ChannelPipeline` for use as a HTTP client with a client upgrader configuration.
///
/// - important: This **must** be called on the Channel's event loop.
@ -146,6 +268,17 @@ extension ChannelPipeline.SynchronousOperations {
public func addHTTPClientHandlers(position: ChannelPipeline.Position = .last,
leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes,
withClientUpgrade upgrade: NIOHTTPClientUpgradeConfiguration? = nil) throws {
try self._addHTTPClientHandlers(
position: position,
leftOverBytesStrategy: leftOverBytesStrategy,
withClientUpgrade: upgrade
)
}
#endif
private func _addHTTPClientHandlers(position: ChannelPipeline.Position = .last,
leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes,
withClientUpgrade upgrade: NIOHTTPClientUpgradeConfiguration? = nil) throws {
// Why two separate functions? When creating the array of handlers to add to the pipeline, when we don't have
// an upgrade handler -- i.e. just an array literal -- the compiler is able to promote the array to the stack
// which saves an allocation. That's not the case when the upgrade handler is present.
@ -183,7 +316,46 @@ extension ChannelPipeline.SynchronousOperations {
try self.addHandlers(handlers, position: position)
}
#if swift(>=5.7)
/// Configure a `ChannelPipeline` for use as a HTTP server.
///
/// This function knows how to set up all first-party HTTP channel handlers appropriately
/// for server use. It supports the following features:
///
/// 1. Providing assistance handling clients that pipeline HTTP requests, using the
/// `HTTPServerPipelineHandler`.
/// 2. Supporting HTTP upgrade, using the `HTTPServerUpgradeHandler`.
///
/// This method will likely be extended in future with more support for other first-party
/// features.
///
/// - important: This **must** be called on the Channel's event loop.
/// - parameters:
/// - position: Where in the pipeline to add the HTTP server handlers, defaults to `.last`.
/// - pipelining: Whether to provide assistance handling HTTP clients that pipeline
/// their requests. Defaults to `true`. If `false`, users will need to handle
/// clients that pipeline themselves.
/// - upgrade: Whether to add a `HTTPServerUpgradeHandler` to the pipeline, configured for
/// HTTP upgrade. Defaults to `nil`, which will not add the handler to the pipeline. If
/// provided should be a tuple of an array of `HTTPServerProtocolUpgrader` and the upgrade
/// completion handler. See the documentation on `HTTPServerUpgradeHandler` for more
/// details.
/// - errorHandling: Whether to provide assistance handling protocol errors (e.g.
/// failure to parse the HTTP request) by sending 400 errors. Defaults to `true`.
/// - throws: If the pipeline could not be configured.
@preconcurrency
public func configureHTTPServerPipeline(position: ChannelPipeline.Position = .last,
withPipeliningAssistance pipelining: Bool = true,
withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil,
withErrorHandling errorHandling: Bool = true) throws {
try self._configureHTTPServerPipeline(
position: position,
withPipeliningAssistance: pipelining,
withServerUpgrade: upgrade,
withErrorHandling: errorHandling
)
}
#else
/// Configure a `ChannelPipeline` for use as a HTTP server.
///
/// This function knows how to set up all first-party HTTP channel handlers appropriately
@ -214,6 +386,19 @@ extension ChannelPipeline.SynchronousOperations {
withPipeliningAssistance pipelining: Bool = true,
withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil,
withErrorHandling errorHandling: Bool = true) throws {
try self._configureHTTPServerPipeline(
position: position,
withPipeliningAssistance: pipelining,
withServerUpgrade: upgrade,
withErrorHandling: errorHandling
)
}
#endif
private func _configureHTTPServerPipeline(position: ChannelPipeline.Position = .last,
withPipeliningAssistance pipelining: Bool = true,
withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil,
withErrorHandling errorHandling: Bool = true) throws {
self.eventLoop.assertInEventLoop()
let responseEncoder = HTTPResponseEncoder()

View File

@ -83,11 +83,17 @@ extension EmbeddedChannel {
}
}
#if swift(>=5.7)
private typealias UpgradeCompletionHandler = @Sendable (ChannelHandlerContext) -> Void
#else
private typealias UpgradeCompletionHandler = (ChannelHandlerContext) -> Void
#endif
private func serverHTTPChannelWithAutoremoval(group: EventLoopGroup,
pipelining: Bool,
upgraders: [HTTPServerProtocolUpgrader],
extraHandlers: [ChannelHandler],
_ upgradeCompletionHandler: @escaping (ChannelHandlerContext) -> Void) throws -> (Channel, EventLoopFuture<Channel>) {
_ upgradeCompletionHandler: @escaping UpgradeCompletionHandler) throws -> (Channel, EventLoopFuture<Channel>) {
let p = group.next().makePromise(of: Channel.self)
let c = try ServerBootstrap(group: group)
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
@ -138,7 +144,7 @@ private func connectedClientChannel(group: EventLoopGroup, serverAddress: Socket
private func setUpTestWithAutoremoval(pipelining: Bool = false,
upgraders: [HTTPServerProtocolUpgrader],
extraHandlers: [ChannelHandler],
_ upgradeCompletionHandler: @escaping (ChannelHandlerContext) -> Void) throws -> (EventLoopGroup, Channel, Channel, Channel) {
_ upgradeCompletionHandler: @escaping UpgradeCompletionHandler) throws -> (EventLoopGroup, Channel, Channel, Channel) {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let (serverChannel, connectedServerChannelFuture) = try serverHTTPChannelWithAutoremoval(group: group,
pipelining: pipelining,
@ -427,21 +433,21 @@ class HTTPServerUpgradeTestCase: XCTestCase {
}
func testSimpleUpgradeSucceeds() throws {
var upgradeRequest: HTTPRequestHead? = nil
var upgradeHandlerCbFired = false
var upgraderCbFired = false
let upgradeRequest = UnsafeMutableTransferBox<HTTPRequestHead?>(nil)
let upgradeHandlerCbFired = UnsafeMutableTransferBox(false)
let upgraderCbFired = UnsafeMutableTransferBox(false)
let upgrader = SuccessfulUpgrader(forProtocol: "myproto", requiringHeaders: ["kafkaesque"]) { req in
upgradeRequest = req
XCTAssert(upgradeHandlerCbFired)
upgraderCbFired = true
upgradeRequest.wrappedValue = req
XCTAssert(upgradeHandlerCbFired.wrappedValue)
upgraderCbFired.wrappedValue = true
}
let (group, _, client, connectedServer) = try setUpTestWithAutoremoval(upgraders: [upgrader],
extraHandlers: []) { (context) in
// This is called before the upgrader gets called.
XCTAssertNil(upgradeRequest)
upgradeHandlerCbFired = true
XCTAssertNil(upgradeRequest.wrappedValue)
upgradeHandlerCbFired.wrappedValue = true
// We're closing the connection now.
context.close(promise: nil)
@ -469,8 +475,8 @@ class HTTPServerUpgradeTestCase: XCTestCase {
// At this time we want to assert that everything got called. Their own callbacks assert
// that the ordering was correct.
XCTAssert(upgradeHandlerCbFired)
XCTAssert(upgraderCbFired)
XCTAssert(upgradeHandlerCbFired.wrappedValue)
XCTAssert(upgraderCbFired.wrappedValue)
// We also want to confirm that the upgrade handler is no longer in the pipeline.
try connectedServer.pipeline.assertDoesNotContainUpgrader()
@ -532,22 +538,22 @@ class HTTPServerUpgradeTestCase: XCTestCase {
}
func testUpgradeRespectsClientPreference() throws {
var upgradeRequest: HTTPRequestHead? = nil
var upgradeHandlerCbFired = false
var upgraderCbFired = false
let upgradeRequest = UnsafeMutableTransferBox<HTTPRequestHead?>(nil)
let upgradeHandlerCbFired = UnsafeMutableTransferBox(false)
let upgraderCbFired = UnsafeMutableTransferBox(false)
let explodingUpgrader = ExplodingUpgrader(forProtocol: "exploder")
let successfulUpgrader = SuccessfulUpgrader(forProtocol: "myproto", requiringHeaders: ["kafkaesque"]) { req in
upgradeRequest = req
XCTAssert(upgradeHandlerCbFired)
upgraderCbFired = true
upgradeRequest.wrappedValue = req
XCTAssert(upgradeHandlerCbFired.wrappedValue)
upgraderCbFired.wrappedValue = true
}
let (group, _, client, connectedServer) = try setUpTestWithAutoremoval(upgraders: [explodingUpgrader, successfulUpgrader],
extraHandlers: []) { context in
// This is called before the upgrader gets called.
XCTAssertNil(upgradeRequest)
upgradeHandlerCbFired = true
XCTAssertNil(upgradeRequest.wrappedValue)
upgradeHandlerCbFired.wrappedValue = true
// We're closing the connection now.
context.close(promise: nil)
@ -575,8 +581,8 @@ class HTTPServerUpgradeTestCase: XCTestCase {
// At this time we want to assert that everything got called. Their own callbacks assert
// that the ordering was correct.
XCTAssert(upgradeHandlerCbFired)
XCTAssert(upgraderCbFired)
XCTAssert(upgradeHandlerCbFired.wrappedValue)
XCTAssert(upgraderCbFired.wrappedValue)
// We also want to confirm that the upgrade handler is no longer in the pipeline.
try connectedServer.pipeline.waitForUpgraderToBeRemoved()
@ -585,15 +591,15 @@ class HTTPServerUpgradeTestCase: XCTestCase {
func testUpgradeFiresUserEvent() throws {
// The user event is fired last, so we don't see it until both other callbacks
// have fired.
let eventSaver = UserEventSaver<HTTPServerUpgradeEvents>()
let eventSaver = UnsafeTransfer(UserEventSaver<HTTPServerUpgradeEvents>())
let upgrader = SuccessfulUpgrader(forProtocol: "myproto", requiringHeaders: []) { req in
XCTAssertEqual(eventSaver.events.count, 0)
XCTAssertEqual(eventSaver.wrappedValue.events.count, 0)
}
let (group, _, client, connectedServer) = try setUpTestWithAutoremoval(upgraders: [upgrader],
extraHandlers: [eventSaver]) { context in
XCTAssertEqual(eventSaver.events.count, 0)
extraHandlers: [eventSaver.wrappedValue]) { context in
XCTAssertEqual(eventSaver.wrappedValue.events.count, 0)
context.close(promise: nil)
}
defer {
@ -620,14 +626,14 @@ class HTTPServerUpgradeTestCase: XCTestCase {
// At this time we should have received one user event. We schedule this onto the
// event loop to guarantee thread safety.
XCTAssertNoThrow(try connectedServer.eventLoop.scheduleTask(deadline: .now()) {
XCTAssertEqual(eventSaver.events.count, 1)
if case .upgradeComplete(let proto, let req) = eventSaver.events[0] {
XCTAssertEqual(eventSaver.wrappedValue.events.count, 1)
if case .upgradeComplete(let proto, let req) = eventSaver.wrappedValue.events[0] {
XCTAssertEqual(proto, "myproto")
XCTAssertEqual(req.method, .OPTIONS)
XCTAssertEqual(req.uri, "*")
XCTAssertEqual(req.version, .http1_1)
} else {
XCTFail("Unexpected event: \(eventSaver.events[0])")
XCTFail("Unexpected event: \(eventSaver.wrappedValue.events[0])")
}
}.futureResult.wait())
@ -636,23 +642,23 @@ class HTTPServerUpgradeTestCase: XCTestCase {
}
func testUpgraderCanRejectUpgradeForPersonalReasons() throws {
var upgradeRequest: HTTPRequestHead? = nil
var upgradeHandlerCbFired = false
var upgraderCbFired = false
let upgradeRequest = UnsafeMutableTransferBox<HTTPRequestHead?>(nil)
let upgradeHandlerCbFired = UnsafeMutableTransferBox(false)
let upgraderCbFired = UnsafeMutableTransferBox(false)
let explodingUpgrader = UpgraderSaysNo(forProtocol: "noproto")
let successfulUpgrader = SuccessfulUpgrader(forProtocol: "myproto", requiringHeaders: ["kafkaesque"]) { req in
upgradeRequest = req
XCTAssert(upgradeHandlerCbFired)
upgraderCbFired = true
upgradeRequest.wrappedValue = req
XCTAssert(upgradeHandlerCbFired.wrappedValue)
upgraderCbFired.wrappedValue = true
}
let errorCatcher = ErrorSaver()
let (group, _, client, connectedServer) = try setUpTestWithAutoremoval(upgraders: [explodingUpgrader, successfulUpgrader],
extraHandlers: [errorCatcher]) { context in
// This is called before the upgrader gets called.
XCTAssertNil(upgradeRequest)
upgradeHandlerCbFired = true
XCTAssertNil(upgradeRequest.wrappedValue)
upgradeHandlerCbFired.wrappedValue = true
// We're closing the connection now.
context.close(promise: nil)
@ -680,8 +686,8 @@ class HTTPServerUpgradeTestCase: XCTestCase {
// At this time we want to assert that everything got called. Their own callbacks assert
// that the ordering was correct.
XCTAssert(upgradeHandlerCbFired)
XCTAssert(upgraderCbFired)
XCTAssert(upgradeHandlerCbFired.wrappedValue)
XCTAssert(upgraderCbFired.wrappedValue)
// We also want to confirm that the upgrade handler is no longer in the pipeline.
try connectedServer.pipeline.waitForUpgraderToBeRemoved()
@ -1089,9 +1095,9 @@ class HTTPServerUpgradeTestCase: XCTestCase {
func testUpgradeWithUpgradePayloadInlineWithRequestWorks() throws {
enum ReceivedTheWrongThingError: Error { case error }
var upgradeRequest: HTTPRequestHead? = nil
var upgradeHandlerCbFired = false
var upgraderCbFired = false
let upgradeRequest = UnsafeMutableTransferBox<HTTPRequestHead?>(nil)
let upgradeHandlerCbFired = UnsafeMutableTransferBox(false)
let upgraderCbFired = UnsafeMutableTransferBox(false)
class CheckWeReadInlineAndExtraData: ChannelDuplexHandler {
typealias InboundIn = ByteBuffer
@ -1161,9 +1167,9 @@ class HTTPServerUpgradeTestCase: XCTestCase {
}
let upgrader = SuccessfulUpgrader(forProtocol: "myproto", requiringHeaders: ["kafkaesque"]) { req in
upgradeRequest = req
XCTAssert(upgradeHandlerCbFired)
upgraderCbFired = true
upgradeRequest.wrappedValue = req
XCTAssert(upgradeHandlerCbFired.wrappedValue)
upgraderCbFired.wrappedValue = true
}
let promiseGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
@ -1176,8 +1182,8 @@ class HTTPServerUpgradeTestCase: XCTestCase {
let (group, _, client, connectedServer) = try setUpTestWithAutoremoval(upgraders: [upgrader],
extraHandlers: []) { (context) in
// This is called before the upgrader gets called.
XCTAssertNil(upgradeRequest)
upgradeHandlerCbFired = true
XCTAssertNil(upgradeRequest.wrappedValue)
upgradeHandlerCbFired.wrappedValue = true
_ = context.channel.pipeline.addHandler(CheckWeReadInlineAndExtraData(firstByteDonePromise: firstByteDonePromise,
secondByteDonePromise: secondByteDonePromise,
@ -1215,8 +1221,8 @@ class HTTPServerUpgradeTestCase: XCTestCase {
// At this time we want to assert that everything got called. Their own callbacks assert
// that the ordering was correct.
XCTAssert(upgradeHandlerCbFired)
XCTAssert(upgraderCbFired)
XCTAssert(upgradeHandlerCbFired.wrappedValue)
XCTAssert(upgraderCbFired.wrappedValue)
// We also want to confirm that the upgrade handler is no longer in the pipeline.
try connectedServer.pipeline.assertDoesNotContainUpgrader()
@ -1324,9 +1330,9 @@ class HTTPServerUpgradeTestCase: XCTestCase {
}
func testWeTolerateUpgradeFuturesFromWrongEventLoops() throws {
var upgradeRequest: HTTPRequestHead? = nil
var upgradeHandlerCbFired = false
var upgraderCbFired = false
let upgradeRequest = UnsafeMutableTransferBox<HTTPRequestHead?>(nil)
let upgradeHandlerCbFired = UnsafeMutableTransferBox(false)
let upgraderCbFired = UnsafeMutableTransferBox(false)
let otherELG = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
XCTAssertNoThrow(try otherELG.syncShutdownGracefully())
@ -1338,16 +1344,16 @@ class HTTPServerUpgradeTestCase: XCTestCase {
// this is the wrong EL
otherELG.next().makeSucceededFuture($1)
}) { req in
upgradeRequest = req
XCTAssert(upgradeHandlerCbFired)
upgraderCbFired = true
upgradeRequest.wrappedValue = req
XCTAssert(upgradeHandlerCbFired.wrappedValue)
upgraderCbFired.wrappedValue = true
}
let (group, _, client, connectedServer) = try setUpTestWithAutoremoval(upgraders: [upgrader],
extraHandlers: []) { (context) in
// This is called before the upgrader gets called.
XCTAssertNil(upgradeRequest)
upgradeHandlerCbFired = true
XCTAssertNil(upgradeRequest.wrappedValue)
upgradeHandlerCbFired.wrappedValue = true
// We're closing the connection now.
context.close(promise: nil)
@ -1375,8 +1381,8 @@ class HTTPServerUpgradeTestCase: XCTestCase {
// At this time we want to assert that everything got called. Their own callbacks assert
// that the ordering was correct.
XCTAssert(upgradeHandlerCbFired)
XCTAssert(upgraderCbFired)
XCTAssert(upgradeHandlerCbFired.wrappedValue)
XCTAssert(upgraderCbFired.wrappedValue)
// We also want to confirm that the upgrade handler is no longer in the pipeline.
try connectedServer.pipeline.assertDoesNotContainUpgrader()

View File

@ -0,0 +1,51 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2021-2022 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
/// ``UnsafeTransfer`` can be used to make non-`Sendable` values `Sendable`.
/// As the name implies, the usage of this is unsafe because it disables the sendable checking of the compiler.
/// It can be used similar to `@unsafe Sendable` but for values instead of types.
@usableFromInline
struct UnsafeTransfer<Wrapped> {
@usableFromInline
var wrappedValue: Wrapped
@inlinable
init(_ wrappedValue: Wrapped) {
self.wrappedValue = wrappedValue
}
}
#if swift(>=5.5) && canImport(_Concurrency)
extension UnsafeTransfer: @unchecked Sendable {}
#endif
extension UnsafeTransfer: Equatable where Wrapped: Equatable {}
extension UnsafeTransfer: Hashable where Wrapped: Hashable {}
/// ``UnsafeMutableTransferBox`` can be used to make non-`Sendable` values `Sendable` and mutable.
/// It can be used to capture local mutable values in a `@Sendable` closure and mutate them from within the closure.
/// As the name implies, the usage of this is unsafe because it disables the sendable checking of the compiler and does not add any synchronisation.
@usableFromInline
final class UnsafeMutableTransferBox<Wrapped> {
@usableFromInline
var wrappedValue: Wrapped
@inlinable
init(_ wrappedValue: Wrapped) {
self.wrappedValue = wrappedValue
}
}
#if swift(>=5.5) && canImport(_Concurrency)
extension UnsafeMutableTransferBox: @unchecked Sendable {}
#endif

View File

@ -116,8 +116,9 @@ class WebSocketServerEndToEndTests: XCTestCase {
private func createTestFixtures(upgraders: [NIOWebSocketServerUpgrader]) -> (loop: EmbeddedEventLoop, serverChannel: EmbeddedChannel, clientChannel: EmbeddedChannel) {
let loop = EmbeddedEventLoop()
let serverChannel = EmbeddedChannel(loop: loop)
let upgradeConfig = (upgraders: upgraders as [HTTPServerProtocolUpgrader], completionHandler: { (context: ChannelHandlerContext) in } )
XCTAssertNoThrow(try serverChannel.pipeline.configureHTTPServerPipeline(withServerUpgrade: upgradeConfig).wait())
XCTAssertNoThrow(try serverChannel.pipeline.configureHTTPServerPipeline(
withServerUpgrade: (upgraders: upgraders as [HTTPServerProtocolUpgrader], completionHandler: { (context: ChannelHandlerContext) in } )
).wait())
let clientChannel = EmbeddedChannel(loop: loop)
return (loop: loop, serverChannel: serverChannel, clientChannel: clientChannel)
}