diff --git a/Tests/NIOPosixTests/AsyncChannelBootstrapTests.swift b/Tests/NIOPosixTests/AsyncChannelBootstrapTests.swift index be7a6721..0f978210 100644 --- a/Tests/NIOPosixTests/AsyncChannelBootstrapTests.swift +++ b/Tests/NIOPosixTests/AsyncChannelBootstrapTests.swift @@ -12,20 +12,20 @@ // //===----------------------------------------------------------------------===// +import NIOConcurrencyHelpers @_spi(AsyncChannel) import NIOCore @_spi(AsyncChannel) @testable import NIOPosix import XCTest @_spi(AsyncChannel) import NIOTLS -import NIOConcurrencyHelpers -fileprivate final class LineDelimiterDecoder: ByteToMessageDecoder { +private final class LineDelimiterDecoder: ByteToMessageDecoder { private let newLine = "\n".utf8.first! typealias InboundIn = ByteBuffer typealias InboundOut = ByteBuffer func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState { - let readable = buffer.withUnsafeReadableBytes { $0.firstIndex(of: newLine) } + let readable = buffer.withUnsafeReadableBytes { $0.firstIndex(of: self.newLine) } if let readable = readable { context.fireChannelRead(self.wrapInboundOut(buffer.readSlice(length: readable)!)) buffer.moveReaderIndex(forwardBy: 1) @@ -35,8 +35,7 @@ fileprivate final class LineDelimiterDecoder: ByteToMessageDecoder { } } - -fileprivate final class TLSUserEventHandler: ChannelInboundHandler { +private final class TLSUserEventHandler: ChannelInboundHandler { typealias InboundIn = ByteBuffer typealias InboundOut = ByteBuffer @@ -52,8 +51,7 @@ fileprivate final class TLSUserEventHandler: ChannelInboundHandler { } } - -fileprivate final class ByteBufferToStringHandler: ChannelInboundHandler { +private final class ByteBufferToStringHandler: ChannelInboundHandler { typealias InboundIn = ByteBuffer typealias InboundOut = String @@ -63,8 +61,7 @@ fileprivate final class ByteBufferToStringHandler: ChannelInboundHandler { } } - -fileprivate final class ByteBufferToByteHandler: ChannelInboundHandler { +private final class ByteBufferToByteHandler: ChannelInboundHandler { typealias InboundIn = ByteBuffer typealias InboundOut = UInt8 @@ -103,7 +100,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { } .bind( host: "127.0.0.1", - port: 1995, + port: 0, childChannelInboundType: String.self, childChannelOutboundType: String.self ) @@ -113,7 +110,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { var iterator = stream.makeAsyncIterator() group.addTask { - try await withThrowingTaskGroup(of: Void.self) { group in + try await withThrowingTaskGroup(of: Void.self) { _ in for try await childChannel in channel.inboundStream { for try await value in childChannel.inboundStream { continuation.yield(.string(value)) @@ -122,7 +119,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { } } - let stringChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup) + let stringChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup, port: channel.channel.localAddress!.port!) stringChannel.writeAndFlush(.init(ByteBuffer(string: "hello\n")), promise: nil) await XCTAsyncAssertEqual(await iterator.next(), .string("hello")) @@ -135,7 +132,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { func testAsyncChannelProtocolNegotiation() throws { XCTAsyncTest { let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 3) - + let channel: NIOAsyncChannel = try await ServerBootstrap(group: eventLoopGroup) .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) .childChannelOption(ChannelOptions.autoRead, value: true) @@ -149,11 +146,11 @@ final class AsyncChannelBootstrapTests: XCTestCase { port: 1995, protocolNegotiationHandlerType: NIOTypedApplicationProtocolNegotiationHandler.self ) - + try await withThrowingTaskGroup(of: Void.self) { group in let (stream, continuation) = AsyncStream.makeStream() var iterator = stream.makeAsyncIterator() - + group.addTask { try await withThrowingTaskGroup(of: Void.self) { group in for try await childChannel in channel.inboundStream { @@ -172,28 +169,28 @@ final class AsyncChannelBootstrapTests: XCTestCase { } } } - - let stringChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup) - + + let stringChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup, port: channel.channel.localAddress!.port!) + // This is for negotiating the protocol stringChannel.writeAndFlush(.init(ByteBuffer(string: "alpn:string\n")), promise: nil) - + // This is the actual content stringChannel.writeAndFlush(.init(ByteBuffer(string: "hello\n")), promise: nil) - + await XCTAsyncAssertEqual(await iterator.next(), .string("hello")) - - let byteChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup) - + + let byteChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup, port: channel.channel.localAddress!.port!) + // This is for negotiating the protocol byteChannel.writeAndFlush(.init(ByteBuffer(string: "alpn:byte\n")), promise: nil) - + // This is the actual content byteChannel.write(.init(ByteBuffer(integer: UInt8(8))), promise: nil) byteChannel.writeAndFlush(.init(ByteBuffer(string: "\n")), promise: nil) - + await XCTAsyncAssertEqual(await iterator.next(), .byte(8)) - + group.cancelAll() } } @@ -202,7 +199,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { func testAsyncChannelNestedProtocolNegotiation() throws { XCTAsyncTest { let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 3) - + let channel: NIOAsyncChannel = try await ServerBootstrap(group: eventLoopGroup) .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) .childChannelOption(ChannelOptions.autoRead, value: true) @@ -216,11 +213,11 @@ final class AsyncChannelBootstrapTests: XCTestCase { port: 1995, protocolNegotiationHandlerType: NIOTypedApplicationProtocolNegotiationHandler.self ) - + try await withThrowingTaskGroup(of: Void.self) { group in let (stream, continuation) = AsyncStream.makeStream() var iterator = stream.makeAsyncIterator() - + group.addTask { try await withThrowingTaskGroup(of: Void.self) { group in for try await childChannel in channel.inboundStream { @@ -239,61 +236,61 @@ final class AsyncChannelBootstrapTests: XCTestCase { } } } - - let stringStringChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup) - + + let stringStringChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup, port: channel.channel.localAddress!.port!) + // This is for negotiating the protocol stringStringChannel.writeAndFlush(.init(ByteBuffer(string: "alpn:string\n")), promise: nil) - + // This is for negotiating the nested protocol stringStringChannel.writeAndFlush(.init(ByteBuffer(string: "alpn:string\n")), promise: nil) - + // This is the actual content stringStringChannel.writeAndFlush(.init(ByteBuffer(string: "hello\n")), promise: nil) - + await XCTAsyncAssertEqual(await iterator.next(), .string("hello")) - - let byteByteChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup) - + + let byteByteChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup, port: channel.channel.localAddress!.port!) + // This is for negotiating the protocol byteByteChannel.writeAndFlush(.init(ByteBuffer(string: "alpn:byte\n")), promise: nil) - + // This is for negotiating the nested protocol byteByteChannel.writeAndFlush(.init(ByteBuffer(string: "alpn:byte\n")), promise: nil) - + // This is the actual content byteByteChannel.write(.init(ByteBuffer(integer: UInt8(8))), promise: nil) byteByteChannel.writeAndFlush(.init(ByteBuffer(string: "\n")), promise: nil) - + await XCTAsyncAssertEqual(await iterator.next(), .byte(8)) - - let stringByteChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup) - + + let stringByteChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup, port: channel.channel.localAddress!.port!) + // This is for negotiating the protocol stringByteChannel.writeAndFlush(.init(ByteBuffer(string: "alpn:string\n")), promise: nil) - + // This is for negotiating the nested protocol stringByteChannel.writeAndFlush(.init(ByteBuffer(string: "alpn:byte\n")), promise: nil) - + // This is the actual content stringByteChannel.write(.init(ByteBuffer(integer: UInt8(8))), promise: nil) stringByteChannel.writeAndFlush(.init(ByteBuffer(string: "\n")), promise: nil) - + await XCTAsyncAssertEqual(await iterator.next(), .byte(8)) - - let byteStringChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup) - + + let byteStringChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup, port: channel.channel.localAddress!.port!) + // This is for negotiating the protocol byteStringChannel.writeAndFlush(.init(ByteBuffer(string: "alpn:byte\n")), promise: nil) - + // This is for negotiating the nested protocol byteStringChannel.writeAndFlush(.init(ByteBuffer(string: "alpn:string\n")), promise: nil) - + // This is the actual content byteStringChannel.writeAndFlush(.init(ByteBuffer(string: "hello\n")), promise: nil) - + await XCTAsyncAssertEqual(await iterator.next(), .string("hello")) - + group.cancelAll() } } @@ -339,11 +336,11 @@ final class AsyncChannelBootstrapTests: XCTestCase { port: 1995, protocolNegotiationHandlerType: NIOTypedApplicationProtocolNegotiationHandler.self ) - + try await withThrowingTaskGroup(of: Void.self) { group in let (stream, continuation) = AsyncStream.makeStream() var iterator = stream.makeAsyncIterator() - + group.addTask { try await withThrowingTaskGroup(of: Void.self) { group in for try await childChannel in channel.inboundStream { @@ -362,23 +359,23 @@ final class AsyncChannelBootstrapTests: XCTestCase { } } } - - let channel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup) - + + let unknownChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup, port: channel.channel.localAddress!.port!) + // This is for negotiating the protocol - channel.writeAndFlush(.init(ByteBuffer(string: "alpn:unknown\n")), promise: nil) - + unknownChannel.writeAndFlush(.init(ByteBuffer(string: "alpn:unknown\n")), promise: nil) + // Checking that we can still create new connections afterwards - let stringChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup) - + let stringChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup, port: channel.channel.localAddress!.port!) + // This is for negotiating the protocol stringChannel.writeAndFlush(.init(ByteBuffer(string: "alpn:string\n")), promise: nil) - + // This is the actual content stringChannel.writeAndFlush(.init(ByteBuffer(string: "hello\n")), promise: nil) - + await XCTAsyncAssertEqual(await iterator.next(), .string("hello")) - + let failedInboundChannel = channels.withLockedValue { channels -> Channel in XCTAssertEqual(channels.count, 2) return channels[0] @@ -394,14 +391,14 @@ final class AsyncChannelBootstrapTests: XCTestCase { // MARK: - Test Helpers - private func makeClientChannel(eventLoopGroup: EventLoopGroup) async throws -> Channel { + private func makeClientChannel(eventLoopGroup: EventLoopGroup, port: Int) async throws -> Channel { return try await ClientBootstrap(group: eventLoopGroup) .channelInitializer { channel in channel.eventLoop.makeCompletedFuture { try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(LineDelimiterDecoder())) } } - .connect(to: .init(ipAddress: "127.0.0.1", port: 1995)) + .connect(to: .init(ipAddress: "127.0.0.1", port: port)) .get() } @@ -486,18 +483,18 @@ final class AsyncChannelBootstrapTests: XCTestCase { } extension AsyncStream { - fileprivate static func makeStream( - of elementType: Element.Type = Element.self, - bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded - ) -> (stream: AsyncStream, continuation: AsyncStream.Continuation) { - var continuation: AsyncStream.Continuation! - let stream = AsyncStream(bufferingPolicy: limit) { continuation = $0 } - return (stream: stream, continuation: continuation!) - } + fileprivate static func makeStream( + of elementType: Element.Type = Element.self, + bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded + ) -> (stream: AsyncStream, continuation: AsyncStream.Continuation) { + var continuation: AsyncStream.Continuation! + let stream = AsyncStream(bufferingPolicy: limit) { continuation = $0 } + return (stream: stream, continuation: continuation!) + } } @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -fileprivate func XCTAsyncAssertEqual(_ lhs: @autoclosure () async throws -> Element, _ rhs: @autoclosure () async throws -> Element, file: StaticString = #filePath, line: UInt = #line) async rethrows { +private func XCTAsyncAssertEqual(_ lhs: @autoclosure () async throws -> Element, _ rhs: @autoclosure () async throws -> Element, file: StaticString = #filePath, line: UInt = #line) async rethrows { let lhsResult = try await lhs() let rhsResult = try await rhs() XCTAssertEqual(lhsResult, rhsResult, file: file, line: line)