AsyncChannelBootstrapTests bind to 0 instead (#2412)
# Motivation The `AsyncChannelBootstrapTests` were failing when running parallel since they were all using the same port. # Modification This PR uses port `0` to let the system assign a port instead. It also runs the formatter on this file. # Result No more tests failures.
This commit is contained in:
parent
fd35cd9e52
commit
e3497ffdc5
|
@ -12,20 +12,20 @@
|
|||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
import NIOConcurrencyHelpers
|
||||
@_spi(AsyncChannel) import NIOCore
|
||||
@_spi(AsyncChannel) @testable import NIOPosix
|
||||
import XCTest
|
||||
@_spi(AsyncChannel) import NIOTLS
|
||||
import NIOConcurrencyHelpers
|
||||
|
||||
fileprivate final class LineDelimiterDecoder: ByteToMessageDecoder {
|
||||
private final class LineDelimiterDecoder: ByteToMessageDecoder {
|
||||
private let newLine = "\n".utf8.first!
|
||||
|
||||
typealias InboundIn = ByteBuffer
|
||||
typealias InboundOut = ByteBuffer
|
||||
|
||||
func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
|
||||
let readable = buffer.withUnsafeReadableBytes { $0.firstIndex(of: newLine) }
|
||||
let readable = buffer.withUnsafeReadableBytes { $0.firstIndex(of: self.newLine) }
|
||||
if let readable = readable {
|
||||
context.fireChannelRead(self.wrapInboundOut(buffer.readSlice(length: readable)!))
|
||||
buffer.moveReaderIndex(forwardBy: 1)
|
||||
|
@ -35,8 +35,7 @@ fileprivate final class LineDelimiterDecoder: ByteToMessageDecoder {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
fileprivate final class TLSUserEventHandler: ChannelInboundHandler {
|
||||
private final class TLSUserEventHandler: ChannelInboundHandler {
|
||||
typealias InboundIn = ByteBuffer
|
||||
typealias InboundOut = ByteBuffer
|
||||
|
||||
|
@ -52,8 +51,7 @@ fileprivate final class TLSUserEventHandler: ChannelInboundHandler {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
fileprivate final class ByteBufferToStringHandler: ChannelInboundHandler {
|
||||
private final class ByteBufferToStringHandler: ChannelInboundHandler {
|
||||
typealias InboundIn = ByteBuffer
|
||||
typealias InboundOut = String
|
||||
|
||||
|
@ -63,8 +61,7 @@ fileprivate final class ByteBufferToStringHandler: ChannelInboundHandler {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
fileprivate final class ByteBufferToByteHandler: ChannelInboundHandler {
|
||||
private final class ByteBufferToByteHandler: ChannelInboundHandler {
|
||||
typealias InboundIn = ByteBuffer
|
||||
typealias InboundOut = UInt8
|
||||
|
||||
|
@ -103,7 +100,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
|
|||
}
|
||||
.bind(
|
||||
host: "127.0.0.1",
|
||||
port: 1995,
|
||||
port: 0,
|
||||
childChannelInboundType: String.self,
|
||||
childChannelOutboundType: String.self
|
||||
)
|
||||
|
@ -113,7 +110,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
|
|||
var iterator = stream.makeAsyncIterator()
|
||||
|
||||
group.addTask {
|
||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||
try await withThrowingTaskGroup(of: Void.self) { _ in
|
||||
for try await childChannel in channel.inboundStream {
|
||||
for try await value in childChannel.inboundStream {
|
||||
continuation.yield(.string(value))
|
||||
|
@ -122,7 +119,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
let stringChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup)
|
||||
let stringChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup, port: channel.channel.localAddress!.port!)
|
||||
stringChannel.writeAndFlush(.init(ByteBuffer(string: "hello\n")), promise: nil)
|
||||
|
||||
await XCTAsyncAssertEqual(await iterator.next(), .string("hello"))
|
||||
|
@ -135,7 +132,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
|
|||
func testAsyncChannelProtocolNegotiation() throws {
|
||||
XCTAsyncTest {
|
||||
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 3)
|
||||
|
||||
|
||||
let channel: NIOAsyncChannel<NegotiationResult, Never> = try await ServerBootstrap(group: eventLoopGroup)
|
||||
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
|
||||
.childChannelOption(ChannelOptions.autoRead, value: true)
|
||||
|
@ -149,11 +146,11 @@ final class AsyncChannelBootstrapTests: XCTestCase {
|
|||
port: 1995,
|
||||
protocolNegotiationHandlerType: NIOTypedApplicationProtocolNegotiationHandler<NegotiationResult>.self
|
||||
)
|
||||
|
||||
|
||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||
let (stream, continuation) = AsyncStream<StringOrByte>.makeStream()
|
||||
var iterator = stream.makeAsyncIterator()
|
||||
|
||||
|
||||
group.addTask {
|
||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||
for try await childChannel in channel.inboundStream {
|
||||
|
@ -172,28 +169,28 @@ final class AsyncChannelBootstrapTests: XCTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
let stringChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup)
|
||||
|
||||
|
||||
let stringChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup, port: channel.channel.localAddress!.port!)
|
||||
|
||||
// This is for negotiating the protocol
|
||||
stringChannel.writeAndFlush(.init(ByteBuffer(string: "alpn:string\n")), promise: nil)
|
||||
|
||||
|
||||
// This is the actual content
|
||||
stringChannel.writeAndFlush(.init(ByteBuffer(string: "hello\n")), promise: nil)
|
||||
|
||||
|
||||
await XCTAsyncAssertEqual(await iterator.next(), .string("hello"))
|
||||
|
||||
let byteChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup)
|
||||
|
||||
|
||||
let byteChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup, port: channel.channel.localAddress!.port!)
|
||||
|
||||
// This is for negotiating the protocol
|
||||
byteChannel.writeAndFlush(.init(ByteBuffer(string: "alpn:byte\n")), promise: nil)
|
||||
|
||||
|
||||
// This is the actual content
|
||||
byteChannel.write(.init(ByteBuffer(integer: UInt8(8))), promise: nil)
|
||||
byteChannel.writeAndFlush(.init(ByteBuffer(string: "\n")), promise: nil)
|
||||
|
||||
|
||||
await XCTAsyncAssertEqual(await iterator.next(), .byte(8))
|
||||
|
||||
|
||||
group.cancelAll()
|
||||
}
|
||||
}
|
||||
|
@ -202,7 +199,7 @@ final class AsyncChannelBootstrapTests: XCTestCase {
|
|||
func testAsyncChannelNestedProtocolNegotiation() throws {
|
||||
XCTAsyncTest {
|
||||
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 3)
|
||||
|
||||
|
||||
let channel: NIOAsyncChannel<NegotiationResult, Never> = try await ServerBootstrap(group: eventLoopGroup)
|
||||
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
|
||||
.childChannelOption(ChannelOptions.autoRead, value: true)
|
||||
|
@ -216,11 +213,11 @@ final class AsyncChannelBootstrapTests: XCTestCase {
|
|||
port: 1995,
|
||||
protocolNegotiationHandlerType: NIOTypedApplicationProtocolNegotiationHandler<NegotiationResult>.self
|
||||
)
|
||||
|
||||
|
||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||
let (stream, continuation) = AsyncStream<StringOrByte>.makeStream()
|
||||
var iterator = stream.makeAsyncIterator()
|
||||
|
||||
|
||||
group.addTask {
|
||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||
for try await childChannel in channel.inboundStream {
|
||||
|
@ -239,61 +236,61 @@ final class AsyncChannelBootstrapTests: XCTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
let stringStringChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup)
|
||||
|
||||
|
||||
let stringStringChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup, port: channel.channel.localAddress!.port!)
|
||||
|
||||
// This is for negotiating the protocol
|
||||
stringStringChannel.writeAndFlush(.init(ByteBuffer(string: "alpn:string\n")), promise: nil)
|
||||
|
||||
|
||||
// This is for negotiating the nested protocol
|
||||
stringStringChannel.writeAndFlush(.init(ByteBuffer(string: "alpn:string\n")), promise: nil)
|
||||
|
||||
|
||||
// This is the actual content
|
||||
stringStringChannel.writeAndFlush(.init(ByteBuffer(string: "hello\n")), promise: nil)
|
||||
|
||||
|
||||
await XCTAsyncAssertEqual(await iterator.next(), .string("hello"))
|
||||
|
||||
let byteByteChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup)
|
||||
|
||||
|
||||
let byteByteChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup, port: channel.channel.localAddress!.port!)
|
||||
|
||||
// This is for negotiating the protocol
|
||||
byteByteChannel.writeAndFlush(.init(ByteBuffer(string: "alpn:byte\n")), promise: nil)
|
||||
|
||||
|
||||
// This is for negotiating the nested protocol
|
||||
byteByteChannel.writeAndFlush(.init(ByteBuffer(string: "alpn:byte\n")), promise: nil)
|
||||
|
||||
|
||||
// This is the actual content
|
||||
byteByteChannel.write(.init(ByteBuffer(integer: UInt8(8))), promise: nil)
|
||||
byteByteChannel.writeAndFlush(.init(ByteBuffer(string: "\n")), promise: nil)
|
||||
|
||||
|
||||
await XCTAsyncAssertEqual(await iterator.next(), .byte(8))
|
||||
|
||||
let stringByteChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup)
|
||||
|
||||
|
||||
let stringByteChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup, port: channel.channel.localAddress!.port!)
|
||||
|
||||
// This is for negotiating the protocol
|
||||
stringByteChannel.writeAndFlush(.init(ByteBuffer(string: "alpn:string\n")), promise: nil)
|
||||
|
||||
|
||||
// This is for negotiating the nested protocol
|
||||
stringByteChannel.writeAndFlush(.init(ByteBuffer(string: "alpn:byte\n")), promise: nil)
|
||||
|
||||
|
||||
// This is the actual content
|
||||
stringByteChannel.write(.init(ByteBuffer(integer: UInt8(8))), promise: nil)
|
||||
stringByteChannel.writeAndFlush(.init(ByteBuffer(string: "\n")), promise: nil)
|
||||
|
||||
|
||||
await XCTAsyncAssertEqual(await iterator.next(), .byte(8))
|
||||
|
||||
let byteStringChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup)
|
||||
|
||||
|
||||
let byteStringChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup, port: channel.channel.localAddress!.port!)
|
||||
|
||||
// This is for negotiating the protocol
|
||||
byteStringChannel.writeAndFlush(.init(ByteBuffer(string: "alpn:byte\n")), promise: nil)
|
||||
|
||||
|
||||
// This is for negotiating the nested protocol
|
||||
byteStringChannel.writeAndFlush(.init(ByteBuffer(string: "alpn:string\n")), promise: nil)
|
||||
|
||||
|
||||
// This is the actual content
|
||||
byteStringChannel.writeAndFlush(.init(ByteBuffer(string: "hello\n")), promise: nil)
|
||||
|
||||
|
||||
await XCTAsyncAssertEqual(await iterator.next(), .string("hello"))
|
||||
|
||||
|
||||
group.cancelAll()
|
||||
}
|
||||
}
|
||||
|
@ -339,11 +336,11 @@ final class AsyncChannelBootstrapTests: XCTestCase {
|
|||
port: 1995,
|
||||
protocolNegotiationHandlerType: NIOTypedApplicationProtocolNegotiationHandler<NegotiationResult>.self
|
||||
)
|
||||
|
||||
|
||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||
let (stream, continuation) = AsyncStream<StringOrByte>.makeStream()
|
||||
var iterator = stream.makeAsyncIterator()
|
||||
|
||||
|
||||
group.addTask {
|
||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||
for try await childChannel in channel.inboundStream {
|
||||
|
@ -362,23 +359,23 @@ final class AsyncChannelBootstrapTests: XCTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
let channel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup)
|
||||
|
||||
|
||||
let unknownChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup, port: channel.channel.localAddress!.port!)
|
||||
|
||||
// This is for negotiating the protocol
|
||||
channel.writeAndFlush(.init(ByteBuffer(string: "alpn:unknown\n")), promise: nil)
|
||||
|
||||
unknownChannel.writeAndFlush(.init(ByteBuffer(string: "alpn:unknown\n")), promise: nil)
|
||||
|
||||
// Checking that we can still create new connections afterwards
|
||||
let stringChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup)
|
||||
|
||||
let stringChannel = try await self.makeClientChannel(eventLoopGroup: eventLoopGroup, port: channel.channel.localAddress!.port!)
|
||||
|
||||
// This is for negotiating the protocol
|
||||
stringChannel.writeAndFlush(.init(ByteBuffer(string: "alpn:string\n")), promise: nil)
|
||||
|
||||
|
||||
// This is the actual content
|
||||
stringChannel.writeAndFlush(.init(ByteBuffer(string: "hello\n")), promise: nil)
|
||||
|
||||
|
||||
await XCTAsyncAssertEqual(await iterator.next(), .string("hello"))
|
||||
|
||||
|
||||
let failedInboundChannel = channels.withLockedValue { channels -> Channel in
|
||||
XCTAssertEqual(channels.count, 2)
|
||||
return channels[0]
|
||||
|
@ -394,14 +391,14 @@ final class AsyncChannelBootstrapTests: XCTestCase {
|
|||
|
||||
// MARK: - Test Helpers
|
||||
|
||||
private func makeClientChannel(eventLoopGroup: EventLoopGroup) async throws -> Channel {
|
||||
private func makeClientChannel(eventLoopGroup: EventLoopGroup, port: Int) async throws -> Channel {
|
||||
return try await ClientBootstrap(group: eventLoopGroup)
|
||||
.channelInitializer { channel in
|
||||
channel.eventLoop.makeCompletedFuture {
|
||||
try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(LineDelimiterDecoder()))
|
||||
}
|
||||
}
|
||||
.connect(to: .init(ipAddress: "127.0.0.1", port: 1995))
|
||||
.connect(to: .init(ipAddress: "127.0.0.1", port: port))
|
||||
.get()
|
||||
}
|
||||
|
||||
|
@ -486,18 +483,18 @@ final class AsyncChannelBootstrapTests: XCTestCase {
|
|||
}
|
||||
|
||||
extension AsyncStream {
|
||||
fileprivate static func makeStream(
|
||||
of elementType: Element.Type = Element.self,
|
||||
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
|
||||
) -> (stream: AsyncStream<Element>, continuation: AsyncStream<Element>.Continuation) {
|
||||
var continuation: AsyncStream<Element>.Continuation!
|
||||
let stream = AsyncStream<Element>(bufferingPolicy: limit) { continuation = $0 }
|
||||
return (stream: stream, continuation: continuation!)
|
||||
}
|
||||
fileprivate static func makeStream(
|
||||
of elementType: Element.Type = Element.self,
|
||||
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
|
||||
) -> (stream: AsyncStream<Element>, continuation: AsyncStream<Element>.Continuation) {
|
||||
var continuation: AsyncStream<Element>.Continuation!
|
||||
let stream = AsyncStream<Element>(bufferingPolicy: limit) { continuation = $0 }
|
||||
return (stream: stream, continuation: continuation!)
|
||||
}
|
||||
}
|
||||
|
||||
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
|
||||
fileprivate func XCTAsyncAssertEqual<Element: Equatable>(_ lhs: @autoclosure () async throws -> Element, _ rhs: @autoclosure () async throws -> Element, file: StaticString = #filePath, line: UInt = #line) async rethrows {
|
||||
private func XCTAsyncAssertEqual<Element: Equatable>(_ lhs: @autoclosure () async throws -> Element, _ rhs: @autoclosure () async throws -> Element, file: StaticString = #filePath, line: UInt = #line) async rethrows {
|
||||
let lhsResult = try await lhs()
|
||||
let rhsResult = try await rhs()
|
||||
XCTAssertEqual(lhsResult, rhsResult, file: file, line: line)
|
||||
|
|
Loading…
Reference in New Issue