diff --git a/Sources/NIOCore/NIOSendable.swift b/Sources/NIOCore/NIOSendable.swift index ba07fa03..4a6e96f2 100644 --- a/Sources/NIOCore/NIOSendable.swift +++ b/Sources/NIOCore/NIOSendable.swift @@ -24,12 +24,11 @@ public typealias NIOSendable = Any public protocol NIOPreconcurrencySendable {} #endif -#if swift(>=5.5) && canImport(_Concurrency) /// ``UnsafeTransfer`` can be used to make non-`Sendable` values `Sendable`. /// As the name implies, the usage of this is unsafe because it disables the sendable checking of the compiler. /// It can be used similar to `@unsafe Sendable` but for values instead of types. @usableFromInline -struct UnsafeTransfer: @unchecked Sendable { +struct UnsafeTransfer { @usableFromInline var wrappedValue: Wrapped @@ -39,25 +38,13 @@ struct UnsafeTransfer: @unchecked Sendable { } } +#if swift(>=5.5) && canImport(_Concurrency) +extension UnsafeTransfer: @unchecked Sendable {} +#endif + extension UnsafeTransfer: Equatable where Wrapped: Equatable {} extension UnsafeTransfer: Hashable where Wrapped: Hashable {} -#endif -#if swift(>=5.5) && canImport(_Concurrency) -/// ``UnsafeMutableTransferBox`` can be used to make non-`Sendable` values `Sendable` and mutable. -/// It can be used to capture local mutable values in a `@Sendable` closure and mutate them from within the closure. -/// As the name implies, the usage of this is unsafe because it disables the sendable checking of the compiler and does not add any synchronisation. -@usableFromInline -final class UnsafeMutableTransferBox: @unchecked Sendable { - @usableFromInline - var wrappedValue: Wrapped - - @inlinable - init(_ wrappedValue: Wrapped) { - self.wrappedValue = wrappedValue - } -} -#else /// ``UnsafeMutableTransferBox`` can be used to make non-`Sendable` values `Sendable` and mutable. /// It can be used to capture local mutable values in a `@Sendable` closure and mutate them from within the closure. /// As the name implies, the usage of this is unsafe because it disables the sendable checking of the compiler and does not add any synchronisation. @@ -71,4 +58,6 @@ final class UnsafeMutableTransferBox { self.wrappedValue = wrappedValue } } +#if swift(>=5.5) && canImport(_Concurrency) +extension UnsafeMutableTransferBox: @unchecked Sendable {} #endif diff --git a/Sources/NIOHTTP1/HTTPPipelineSetup.swift b/Sources/NIOHTTP1/HTTPPipelineSetup.swift index cf488630..54e5e97f 100644 --- a/Sources/NIOHTTP1/HTTPPipelineSetup.swift +++ b/Sources/NIOHTTP1/HTTPPipelineSetup.swift @@ -14,11 +14,19 @@ import NIOCore +#if swift(>=5.7) +/// Configuration required to configure a HTTP client pipeline for upgrade. +/// +/// See the documentation for `HTTPClientUpgradeHandler` for details on these +/// properties. +public typealias NIOHTTPClientUpgradeConfiguration = (upgraders: [NIOHTTPClientProtocolUpgrader], completionHandler: @Sendable (ChannelHandlerContext) -> Void) +#else /// Configuration required to configure a HTTP client pipeline for upgrade. /// /// See the documentation for `HTTPClientUpgradeHandler` for details on these /// properties. public typealias NIOHTTPClientUpgradeConfiguration = (upgraders: [NIOHTTPClientProtocolUpgrader], completionHandler: (ChannelHandlerContext) -> Void) +#endif /// Configuration required to configure a HTTP server pipeline for upgrade. /// @@ -27,7 +35,11 @@ public typealias NIOHTTPClientUpgradeConfiguration = (upgraders: [NIOHTTPClientP @available(*, deprecated, renamed: "NIOHTTPServerUpgradeConfiguration") public typealias HTTPUpgradeConfiguration = NIOHTTPServerUpgradeConfiguration +#if swift(>=5.7) +public typealias NIOHTTPServerUpgradeConfiguration = (upgraders: [HTTPServerProtocolUpgrader], completionHandler: @Sendable (ChannelHandlerContext) -> Void) +#else public typealias NIOHTTPServerUpgradeConfiguration = (upgraders: [HTTPServerProtocolUpgrader], completionHandler: (ChannelHandlerContext) -> Void) +#endif extension ChannelPipeline { /// Configure a `ChannelPipeline` for use as a HTTP client. @@ -44,6 +56,29 @@ extension ChannelPipeline { withClientUpgrade: nil) } + #if swift(>=5.7) + /// Configure a `ChannelPipeline` for use as a HTTP client with a client upgrader configuration. + /// + /// - parameters: + /// - position: The position in the `ChannelPipeline` where to add the HTTP client handlers. Defaults to `.last`. + /// - leftOverBytesStrategy: The strategy to use when dealing with leftover bytes after removing the `HTTPDecoder` + /// from the pipeline. + /// - upgrade: Add a `HTTPClientUpgradeHandler` to the pipeline, configured for + /// HTTP upgrade. Should be a tuple of an array of `HTTPClientProtocolUpgrader` and + /// the upgrade completion handler. See the documentation on `HTTPClientUpgradeHandler` + /// for more details. + /// - returns: An `EventLoopFuture` that will fire when the pipeline is configured. + @preconcurrency + public func addHTTPClientHandlers(position: Position = .last, + leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes, + withClientUpgrade upgrade: NIOHTTPClientUpgradeConfiguration?) -> EventLoopFuture { + self._addHTTPClientHandlers( + position: position, + leftOverBytesStrategy: leftOverBytesStrategy, + withClientUpgrade: upgrade + ) + } + #else /// Configure a `ChannelPipeline` for use as a HTTP client with a client upgrader configuration. /// /// - parameters: @@ -58,6 +93,17 @@ extension ChannelPipeline { public func addHTTPClientHandlers(position: Position = .last, leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes, withClientUpgrade upgrade: NIOHTTPClientUpgradeConfiguration?) -> EventLoopFuture { + self._addHTTPClientHandlers( + position: position, + leftOverBytesStrategy: leftOverBytesStrategy, + withClientUpgrade: upgrade + ) + } + #endif + + private func _addHTTPClientHandlers(position: Position = .last, + leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes, + withClientUpgrade upgrade: NIOHTTPClientUpgradeConfiguration?) -> EventLoopFuture { let future: EventLoopFuture if self.eventLoop.inEventLoop { @@ -77,7 +123,46 @@ extension ChannelPipeline { return future } - + + #if swift(>=5.7) + /// Configure a `ChannelPipeline` for use as a HTTP server. + /// + /// This function knows how to set up all first-party HTTP channel handlers appropriately + /// for server use. It supports the following features: + /// + /// 1. Providing assistance handling clients that pipeline HTTP requests, using the + /// `HTTPServerPipelineHandler`. + /// 2. Supporting HTTP upgrade, using the `HTTPServerUpgradeHandler`. + /// + /// This method will likely be extended in future with more support for other first-party + /// features. + /// + /// - parameters: + /// - position: Where in the pipeline to add the HTTP server handlers, defaults to `.last`. + /// - pipelining: Whether to provide assistance handling HTTP clients that pipeline + /// their requests. Defaults to `true`. If `false`, users will need to handle + /// clients that pipeline themselves. + /// - upgrade: Whether to add a `HTTPServerUpgradeHandler` to the pipeline, configured for + /// HTTP upgrade. Defaults to `nil`, which will not add the handler to the pipeline. If + /// provided should be a tuple of an array of `HTTPServerProtocolUpgrader` and the upgrade + /// completion handler. See the documentation on `HTTPServerUpgradeHandler` for more + /// details. + /// - errorHandling: Whether to provide assistance handling protocol errors (e.g. + /// failure to parse the HTTP request) by sending 400 errors. Defaults to `true`. + /// - returns: An `EventLoopFuture` that will fire when the pipeline is configured. + @preconcurrency + public func configureHTTPServerPipeline(position: ChannelPipeline.Position = .last, + withPipeliningAssistance pipelining: Bool = true, + withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil, + withErrorHandling errorHandling: Bool = true) -> EventLoopFuture { + self._configureHTTPServerPipeline( + position: position, + withPipeliningAssistance: pipelining, + withServerUpgrade: upgrade, + withErrorHandling: errorHandling + ) + } + #else /// Configure a `ChannelPipeline` for use as a HTTP server. /// /// This function knows how to set up all first-party HTTP channel handlers appropriately @@ -107,6 +192,19 @@ extension ChannelPipeline { withPipeliningAssistance pipelining: Bool = true, withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil, withErrorHandling errorHandling: Bool = true) -> EventLoopFuture { + self._configureHTTPServerPipeline( + position: position, + withPipeliningAssistance: pipelining, + withServerUpgrade: upgrade, + withErrorHandling: errorHandling + ) + } + #endif + + private func _configureHTTPServerPipeline(position: ChannelPipeline.Position = .last, + withPipeliningAssistance pipelining: Bool = true, + withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil, + withErrorHandling errorHandling: Bool = true) -> EventLoopFuture { let future: EventLoopFuture if self.eventLoop.inEventLoop { @@ -131,6 +229,30 @@ extension ChannelPipeline { } extension ChannelPipeline.SynchronousOperations { + #if swift(>=5.7) + /// Configure a `ChannelPipeline` for use as a HTTP client with a client upgrader configuration. + /// + /// - important: This **must** be called on the Channel's event loop. + /// - parameters: + /// - position: The position in the `ChannelPipeline` where to add the HTTP client handlers. Defaults to `.last`. + /// - leftOverBytesStrategy: The strategy to use when dealing with leftover bytes after removing the `HTTPDecoder` + /// from the pipeline. + /// - upgrade: Add a `HTTPClientUpgradeHandler` to the pipeline, configured for + /// HTTP upgrade. Should be a tuple of an array of `HTTPClientProtocolUpgrader` and + /// the upgrade completion handler. See the documentation on `HTTPClientUpgradeHandler` + /// for more details. + /// - throws: If the pipeline could not be configured. + @preconcurrency + public func addHTTPClientHandlers(position: ChannelPipeline.Position = .last, + leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes, + withClientUpgrade upgrade: NIOHTTPClientUpgradeConfiguration? = nil) throws { + try self._addHTTPClientHandlers( + position: position, + leftOverBytesStrategy: leftOverBytesStrategy, + withClientUpgrade: upgrade + ) + } + #else /// Configure a `ChannelPipeline` for use as a HTTP client with a client upgrader configuration. /// /// - important: This **must** be called on the Channel's event loop. @@ -146,6 +268,17 @@ extension ChannelPipeline.SynchronousOperations { public func addHTTPClientHandlers(position: ChannelPipeline.Position = .last, leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes, withClientUpgrade upgrade: NIOHTTPClientUpgradeConfiguration? = nil) throws { + try self._addHTTPClientHandlers( + position: position, + leftOverBytesStrategy: leftOverBytesStrategy, + withClientUpgrade: upgrade + ) + } + #endif + + private func _addHTTPClientHandlers(position: ChannelPipeline.Position = .last, + leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes, + withClientUpgrade upgrade: NIOHTTPClientUpgradeConfiguration? = nil) throws { // Why two separate functions? When creating the array of handlers to add to the pipeline, when we don't have // an upgrade handler -- i.e. just an array literal -- the compiler is able to promote the array to the stack // which saves an allocation. That's not the case when the upgrade handler is present. @@ -183,7 +316,46 @@ extension ChannelPipeline.SynchronousOperations { try self.addHandlers(handlers, position: position) } - + #if swift(>=5.7) + /// Configure a `ChannelPipeline` for use as a HTTP server. + /// + /// This function knows how to set up all first-party HTTP channel handlers appropriately + /// for server use. It supports the following features: + /// + /// 1. Providing assistance handling clients that pipeline HTTP requests, using the + /// `HTTPServerPipelineHandler`. + /// 2. Supporting HTTP upgrade, using the `HTTPServerUpgradeHandler`. + /// + /// This method will likely be extended in future with more support for other first-party + /// features. + /// + /// - important: This **must** be called on the Channel's event loop. + /// - parameters: + /// - position: Where in the pipeline to add the HTTP server handlers, defaults to `.last`. + /// - pipelining: Whether to provide assistance handling HTTP clients that pipeline + /// their requests. Defaults to `true`. If `false`, users will need to handle + /// clients that pipeline themselves. + /// - upgrade: Whether to add a `HTTPServerUpgradeHandler` to the pipeline, configured for + /// HTTP upgrade. Defaults to `nil`, which will not add the handler to the pipeline. If + /// provided should be a tuple of an array of `HTTPServerProtocolUpgrader` and the upgrade + /// completion handler. See the documentation on `HTTPServerUpgradeHandler` for more + /// details. + /// - errorHandling: Whether to provide assistance handling protocol errors (e.g. + /// failure to parse the HTTP request) by sending 400 errors. Defaults to `true`. + /// - throws: If the pipeline could not be configured. + @preconcurrency + public func configureHTTPServerPipeline(position: ChannelPipeline.Position = .last, + withPipeliningAssistance pipelining: Bool = true, + withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil, + withErrorHandling errorHandling: Bool = true) throws { + try self._configureHTTPServerPipeline( + position: position, + withPipeliningAssistance: pipelining, + withServerUpgrade: upgrade, + withErrorHandling: errorHandling + ) + } + #else /// Configure a `ChannelPipeline` for use as a HTTP server. /// /// This function knows how to set up all first-party HTTP channel handlers appropriately @@ -214,6 +386,19 @@ extension ChannelPipeline.SynchronousOperations { withPipeliningAssistance pipelining: Bool = true, withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil, withErrorHandling errorHandling: Bool = true) throws { + try self._configureHTTPServerPipeline( + position: position, + withPipeliningAssistance: pipelining, + withServerUpgrade: upgrade, + withErrorHandling: errorHandling + ) + } + #endif + + private func _configureHTTPServerPipeline(position: ChannelPipeline.Position = .last, + withPipeliningAssistance pipelining: Bool = true, + withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil, + withErrorHandling errorHandling: Bool = true) throws { self.eventLoop.assertInEventLoop() let responseEncoder = HTTPResponseEncoder() diff --git a/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift b/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift index 964f9620..6648f18b 100644 --- a/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift +++ b/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift @@ -83,11 +83,17 @@ extension EmbeddedChannel { } } +#if swift(>=5.7) +private typealias UpgradeCompletionHandler = @Sendable (ChannelHandlerContext) -> Void +#else +private typealias UpgradeCompletionHandler = (ChannelHandlerContext) -> Void +#endif + private func serverHTTPChannelWithAutoremoval(group: EventLoopGroup, pipelining: Bool, upgraders: [HTTPServerProtocolUpgrader], extraHandlers: [ChannelHandler], - _ upgradeCompletionHandler: @escaping (ChannelHandlerContext) -> Void) throws -> (Channel, EventLoopFuture) { + _ upgradeCompletionHandler: @escaping UpgradeCompletionHandler) throws -> (Channel, EventLoopFuture) { let p = group.next().makePromise(of: Channel.self) let c = try ServerBootstrap(group: group) .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) @@ -138,7 +144,7 @@ private func connectedClientChannel(group: EventLoopGroup, serverAddress: Socket private func setUpTestWithAutoremoval(pipelining: Bool = false, upgraders: [HTTPServerProtocolUpgrader], extraHandlers: [ChannelHandler], - _ upgradeCompletionHandler: @escaping (ChannelHandlerContext) -> Void) throws -> (EventLoopGroup, Channel, Channel, Channel) { + _ upgradeCompletionHandler: @escaping UpgradeCompletionHandler) throws -> (EventLoopGroup, Channel, Channel, Channel) { let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) let (serverChannel, connectedServerChannelFuture) = try serverHTTPChannelWithAutoremoval(group: group, pipelining: pipelining, @@ -427,21 +433,21 @@ class HTTPServerUpgradeTestCase: XCTestCase { } func testSimpleUpgradeSucceeds() throws { - var upgradeRequest: HTTPRequestHead? = nil - var upgradeHandlerCbFired = false - var upgraderCbFired = false + let upgradeRequest = UnsafeMutableTransferBox(nil) + let upgradeHandlerCbFired = UnsafeMutableTransferBox(false) + let upgraderCbFired = UnsafeMutableTransferBox(false) let upgrader = SuccessfulUpgrader(forProtocol: "myproto", requiringHeaders: ["kafkaesque"]) { req in - upgradeRequest = req - XCTAssert(upgradeHandlerCbFired) - upgraderCbFired = true + upgradeRequest.wrappedValue = req + XCTAssert(upgradeHandlerCbFired.wrappedValue) + upgraderCbFired.wrappedValue = true } let (group, _, client, connectedServer) = try setUpTestWithAutoremoval(upgraders: [upgrader], extraHandlers: []) { (context) in // This is called before the upgrader gets called. - XCTAssertNil(upgradeRequest) - upgradeHandlerCbFired = true + XCTAssertNil(upgradeRequest.wrappedValue) + upgradeHandlerCbFired.wrappedValue = true // We're closing the connection now. context.close(promise: nil) @@ -469,8 +475,8 @@ class HTTPServerUpgradeTestCase: XCTestCase { // At this time we want to assert that everything got called. Their own callbacks assert // that the ordering was correct. - XCTAssert(upgradeHandlerCbFired) - XCTAssert(upgraderCbFired) + XCTAssert(upgradeHandlerCbFired.wrappedValue) + XCTAssert(upgraderCbFired.wrappedValue) // We also want to confirm that the upgrade handler is no longer in the pipeline. try connectedServer.pipeline.assertDoesNotContainUpgrader() @@ -532,22 +538,22 @@ class HTTPServerUpgradeTestCase: XCTestCase { } func testUpgradeRespectsClientPreference() throws { - var upgradeRequest: HTTPRequestHead? = nil - var upgradeHandlerCbFired = false - var upgraderCbFired = false + let upgradeRequest = UnsafeMutableTransferBox(nil) + let upgradeHandlerCbFired = UnsafeMutableTransferBox(false) + let upgraderCbFired = UnsafeMutableTransferBox(false) let explodingUpgrader = ExplodingUpgrader(forProtocol: "exploder") let successfulUpgrader = SuccessfulUpgrader(forProtocol: "myproto", requiringHeaders: ["kafkaesque"]) { req in - upgradeRequest = req - XCTAssert(upgradeHandlerCbFired) - upgraderCbFired = true + upgradeRequest.wrappedValue = req + XCTAssert(upgradeHandlerCbFired.wrappedValue) + upgraderCbFired.wrappedValue = true } let (group, _, client, connectedServer) = try setUpTestWithAutoremoval(upgraders: [explodingUpgrader, successfulUpgrader], extraHandlers: []) { context in // This is called before the upgrader gets called. - XCTAssertNil(upgradeRequest) - upgradeHandlerCbFired = true + XCTAssertNil(upgradeRequest.wrappedValue) + upgradeHandlerCbFired.wrappedValue = true // We're closing the connection now. context.close(promise: nil) @@ -575,8 +581,8 @@ class HTTPServerUpgradeTestCase: XCTestCase { // At this time we want to assert that everything got called. Their own callbacks assert // that the ordering was correct. - XCTAssert(upgradeHandlerCbFired) - XCTAssert(upgraderCbFired) + XCTAssert(upgradeHandlerCbFired.wrappedValue) + XCTAssert(upgraderCbFired.wrappedValue) // We also want to confirm that the upgrade handler is no longer in the pipeline. try connectedServer.pipeline.waitForUpgraderToBeRemoved() @@ -585,15 +591,15 @@ class HTTPServerUpgradeTestCase: XCTestCase { func testUpgradeFiresUserEvent() throws { // The user event is fired last, so we don't see it until both other callbacks // have fired. - let eventSaver = UserEventSaver() + let eventSaver = UnsafeTransfer(UserEventSaver()) let upgrader = SuccessfulUpgrader(forProtocol: "myproto", requiringHeaders: []) { req in - XCTAssertEqual(eventSaver.events.count, 0) + XCTAssertEqual(eventSaver.wrappedValue.events.count, 0) } let (group, _, client, connectedServer) = try setUpTestWithAutoremoval(upgraders: [upgrader], - extraHandlers: [eventSaver]) { context in - XCTAssertEqual(eventSaver.events.count, 0) + extraHandlers: [eventSaver.wrappedValue]) { context in + XCTAssertEqual(eventSaver.wrappedValue.events.count, 0) context.close(promise: nil) } defer { @@ -620,14 +626,14 @@ class HTTPServerUpgradeTestCase: XCTestCase { // At this time we should have received one user event. We schedule this onto the // event loop to guarantee thread safety. XCTAssertNoThrow(try connectedServer.eventLoop.scheduleTask(deadline: .now()) { - XCTAssertEqual(eventSaver.events.count, 1) - if case .upgradeComplete(let proto, let req) = eventSaver.events[0] { + XCTAssertEqual(eventSaver.wrappedValue.events.count, 1) + if case .upgradeComplete(let proto, let req) = eventSaver.wrappedValue.events[0] { XCTAssertEqual(proto, "myproto") XCTAssertEqual(req.method, .OPTIONS) XCTAssertEqual(req.uri, "*") XCTAssertEqual(req.version, .http1_1) } else { - XCTFail("Unexpected event: \(eventSaver.events[0])") + XCTFail("Unexpected event: \(eventSaver.wrappedValue.events[0])") } }.futureResult.wait()) @@ -636,23 +642,23 @@ class HTTPServerUpgradeTestCase: XCTestCase { } func testUpgraderCanRejectUpgradeForPersonalReasons() throws { - var upgradeRequest: HTTPRequestHead? = nil - var upgradeHandlerCbFired = false - var upgraderCbFired = false + let upgradeRequest = UnsafeMutableTransferBox(nil) + let upgradeHandlerCbFired = UnsafeMutableTransferBox(false) + let upgraderCbFired = UnsafeMutableTransferBox(false) let explodingUpgrader = UpgraderSaysNo(forProtocol: "noproto") let successfulUpgrader = SuccessfulUpgrader(forProtocol: "myproto", requiringHeaders: ["kafkaesque"]) { req in - upgradeRequest = req - XCTAssert(upgradeHandlerCbFired) - upgraderCbFired = true + upgradeRequest.wrappedValue = req + XCTAssert(upgradeHandlerCbFired.wrappedValue) + upgraderCbFired.wrappedValue = true } let errorCatcher = ErrorSaver() let (group, _, client, connectedServer) = try setUpTestWithAutoremoval(upgraders: [explodingUpgrader, successfulUpgrader], extraHandlers: [errorCatcher]) { context in // This is called before the upgrader gets called. - XCTAssertNil(upgradeRequest) - upgradeHandlerCbFired = true + XCTAssertNil(upgradeRequest.wrappedValue) + upgradeHandlerCbFired.wrappedValue = true // We're closing the connection now. context.close(promise: nil) @@ -680,8 +686,8 @@ class HTTPServerUpgradeTestCase: XCTestCase { // At this time we want to assert that everything got called. Their own callbacks assert // that the ordering was correct. - XCTAssert(upgradeHandlerCbFired) - XCTAssert(upgraderCbFired) + XCTAssert(upgradeHandlerCbFired.wrappedValue) + XCTAssert(upgraderCbFired.wrappedValue) // We also want to confirm that the upgrade handler is no longer in the pipeline. try connectedServer.pipeline.waitForUpgraderToBeRemoved() @@ -1089,9 +1095,9 @@ class HTTPServerUpgradeTestCase: XCTestCase { func testUpgradeWithUpgradePayloadInlineWithRequestWorks() throws { enum ReceivedTheWrongThingError: Error { case error } - var upgradeRequest: HTTPRequestHead? = nil - var upgradeHandlerCbFired = false - var upgraderCbFired = false + let upgradeRequest = UnsafeMutableTransferBox(nil) + let upgradeHandlerCbFired = UnsafeMutableTransferBox(false) + let upgraderCbFired = UnsafeMutableTransferBox(false) class CheckWeReadInlineAndExtraData: ChannelDuplexHandler { typealias InboundIn = ByteBuffer @@ -1161,9 +1167,9 @@ class HTTPServerUpgradeTestCase: XCTestCase { } let upgrader = SuccessfulUpgrader(forProtocol: "myproto", requiringHeaders: ["kafkaesque"]) { req in - upgradeRequest = req - XCTAssert(upgradeHandlerCbFired) - upgraderCbFired = true + upgradeRequest.wrappedValue = req + XCTAssert(upgradeHandlerCbFired.wrappedValue) + upgraderCbFired.wrappedValue = true } let promiseGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) @@ -1176,8 +1182,8 @@ class HTTPServerUpgradeTestCase: XCTestCase { let (group, _, client, connectedServer) = try setUpTestWithAutoremoval(upgraders: [upgrader], extraHandlers: []) { (context) in // This is called before the upgrader gets called. - XCTAssertNil(upgradeRequest) - upgradeHandlerCbFired = true + XCTAssertNil(upgradeRequest.wrappedValue) + upgradeHandlerCbFired.wrappedValue = true _ = context.channel.pipeline.addHandler(CheckWeReadInlineAndExtraData(firstByteDonePromise: firstByteDonePromise, secondByteDonePromise: secondByteDonePromise, @@ -1215,8 +1221,8 @@ class HTTPServerUpgradeTestCase: XCTestCase { // At this time we want to assert that everything got called. Their own callbacks assert // that the ordering was correct. - XCTAssert(upgradeHandlerCbFired) - XCTAssert(upgraderCbFired) + XCTAssert(upgradeHandlerCbFired.wrappedValue) + XCTAssert(upgraderCbFired.wrappedValue) // We also want to confirm that the upgrade handler is no longer in the pipeline. try connectedServer.pipeline.assertDoesNotContainUpgrader() @@ -1324,9 +1330,9 @@ class HTTPServerUpgradeTestCase: XCTestCase { } func testWeTolerateUpgradeFuturesFromWrongEventLoops() throws { - var upgradeRequest: HTTPRequestHead? = nil - var upgradeHandlerCbFired = false - var upgraderCbFired = false + let upgradeRequest = UnsafeMutableTransferBox(nil) + let upgradeHandlerCbFired = UnsafeMutableTransferBox(false) + let upgraderCbFired = UnsafeMutableTransferBox(false) let otherELG = MultiThreadedEventLoopGroup(numberOfThreads: 1) defer { XCTAssertNoThrow(try otherELG.syncShutdownGracefully()) @@ -1338,16 +1344,16 @@ class HTTPServerUpgradeTestCase: XCTestCase { // this is the wrong EL otherELG.next().makeSucceededFuture($1) }) { req in - upgradeRequest = req - XCTAssert(upgradeHandlerCbFired) - upgraderCbFired = true + upgradeRequest.wrappedValue = req + XCTAssert(upgradeHandlerCbFired.wrappedValue) + upgraderCbFired.wrappedValue = true } let (group, _, client, connectedServer) = try setUpTestWithAutoremoval(upgraders: [upgrader], extraHandlers: []) { (context) in // This is called before the upgrader gets called. - XCTAssertNil(upgradeRequest) - upgradeHandlerCbFired = true + XCTAssertNil(upgradeRequest.wrappedValue) + upgradeHandlerCbFired.wrappedValue = true // We're closing the connection now. context.close(promise: nil) @@ -1375,8 +1381,8 @@ class HTTPServerUpgradeTestCase: XCTestCase { // At this time we want to assert that everything got called. Their own callbacks assert // that the ordering was correct. - XCTAssert(upgradeHandlerCbFired) - XCTAssert(upgraderCbFired) + XCTAssert(upgradeHandlerCbFired.wrappedValue) + XCTAssert(upgraderCbFired.wrappedValue) // We also want to confirm that the upgrade handler is no longer in the pipeline. try connectedServer.pipeline.assertDoesNotContainUpgrader() diff --git a/Tests/NIOHTTP1Tests/UnsafeTransfer.swift b/Tests/NIOHTTP1Tests/UnsafeTransfer.swift new file mode 100644 index 00000000..5070296e --- /dev/null +++ b/Tests/NIOHTTP1Tests/UnsafeTransfer.swift @@ -0,0 +1,51 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2021-2022 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +/// ``UnsafeTransfer`` can be used to make non-`Sendable` values `Sendable`. +/// As the name implies, the usage of this is unsafe because it disables the sendable checking of the compiler. +/// It can be used similar to `@unsafe Sendable` but for values instead of types. +@usableFromInline +struct UnsafeTransfer { + @usableFromInline + var wrappedValue: Wrapped + + @inlinable + init(_ wrappedValue: Wrapped) { + self.wrappedValue = wrappedValue + } +} + +#if swift(>=5.5) && canImport(_Concurrency) +extension UnsafeTransfer: @unchecked Sendable {} +#endif + +extension UnsafeTransfer: Equatable where Wrapped: Equatable {} +extension UnsafeTransfer: Hashable where Wrapped: Hashable {} + +/// ``UnsafeMutableTransferBox`` can be used to make non-`Sendable` values `Sendable` and mutable. +/// It can be used to capture local mutable values in a `@Sendable` closure and mutate them from within the closure. +/// As the name implies, the usage of this is unsafe because it disables the sendable checking of the compiler and does not add any synchronisation. +@usableFromInline +final class UnsafeMutableTransferBox { + @usableFromInline + var wrappedValue: Wrapped + + @inlinable + init(_ wrappedValue: Wrapped) { + self.wrappedValue = wrappedValue + } +} +#if swift(>=5.5) && canImport(_Concurrency) +extension UnsafeMutableTransferBox: @unchecked Sendable {} +#endif diff --git a/Tests/NIOWebSocketTests/WebSocketServerEndToEndTests.swift b/Tests/NIOWebSocketTests/WebSocketServerEndToEndTests.swift index 99e66962..a5ec8e6d 100644 --- a/Tests/NIOWebSocketTests/WebSocketServerEndToEndTests.swift +++ b/Tests/NIOWebSocketTests/WebSocketServerEndToEndTests.swift @@ -116,8 +116,9 @@ class WebSocketServerEndToEndTests: XCTestCase { private func createTestFixtures(upgraders: [NIOWebSocketServerUpgrader]) -> (loop: EmbeddedEventLoop, serverChannel: EmbeddedChannel, clientChannel: EmbeddedChannel) { let loop = EmbeddedEventLoop() let serverChannel = EmbeddedChannel(loop: loop) - let upgradeConfig = (upgraders: upgraders as [HTTPServerProtocolUpgrader], completionHandler: { (context: ChannelHandlerContext) in } ) - XCTAssertNoThrow(try serverChannel.pipeline.configureHTTPServerPipeline(withServerUpgrade: upgradeConfig).wait()) + XCTAssertNoThrow(try serverChannel.pipeline.configureHTTPServerPipeline( + withServerUpgrade: (upgraders: upgraders as [HTTPServerProtocolUpgrader], completionHandler: { (context: ChannelHandlerContext) in } ) + ).wait()) let clientChannel = EmbeddedChannel(loop: loop) return (loop: loop, serverChannel: serverChannel, clientChannel: clientChannel) }