Provide NIOAsyncTestingChannel (#2238)

Motivation

Testing versions of NIO code that involve interfacing with Swift
Concurrency is currently a difficult business. In particular,
EmbeddedChannel is not available in Swift concurrency, making it
difficult to write tests where you fully control the I/O.

To that end, we should provide a variation of EmbeddedChannel that makes
testing these things possible.

Modifications

Provide an implementation of NIOAsyncTestingChannel.

Results

Users can write tests confidently with async/await.
This commit is contained in:
Cory Benfield 2022-08-10 11:09:56 +01:00 committed by GitHub
parent ff19f496bd
commit f5448fbbc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1340 additions and 20 deletions

View File

@ -0,0 +1,567 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2022 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
#if compiler(>=5.5.2) && canImport(_Concurrency)
import NIOConcurrencyHelpers
import NIOCore
/// A `Channel` with fine-grained control for testing.
///
/// ``NIOAsyncTestingChannel`` is a `Channel` implementation that does no
/// actual IO but that does have proper eventing mechanism, albeit one that users can
/// control. The prime use-case for ``NIOAsyncTestingChannel`` is in unit tests when you
/// want to feed the inbound events and check the outbound events manually.
///
/// Please remember to call ``finish()`` when you are no longer using this
/// ``NIOAsyncTestingChannel``.
///
/// To feed events through an ``NIOAsyncTestingChannel``'s `ChannelPipeline` use
/// ``NIOAsyncTestingChannel/writeInbound(_:)`` which accepts data of any type. It will then
/// forward that data through the `ChannelPipeline` and the subsequent
/// `ChannelInboundHandler` will receive it through the usual `channelRead`
/// event. The user is responsible for making sure the first
/// `ChannelInboundHandler` expects data of that type.
///
/// Unlike in a regular `ChannelPipeline`, it is expected that the test code will act
/// as the "network layer", using ``readOutbound(as:)`` to observe the data that the
/// `Channel` has "written" to the network, and using ``writeInbound(_:)`` to simulate
/// receiving data from the network. There are also facilities to make it a bit easier
/// to handle the logic for `write` and `flush` (using ``writeOutbound(_:)``), and to
/// extract data that passed the whole way along the channel in `channelRead` (using
/// ``readOutbound(as:)``. Below is a diagram showing the layout of a `ChannelPipeline`
/// inside a ``NIOAsyncTestingChannel``, including the functions that can be used to
/// inject and extract data at each end.
///
/// ```
///
/// Extract data Inject data
/// using readInbound() using writeOutbound()
/// |
/// +---------------+-----------------------------------+---------------+
/// | | ChannelPipeline | |
/// | | TAIL |
/// | +---------------------+ +-----------+----------+ |
/// | | Inbound Handler N | | Outbound Handler 1 | |
/// | +----------+----------+ +-----------+----------+ |
/// | | |
/// | | |
/// | +----------+----------+ +-----------+----------+ |
/// | | Inbound Handler N-1 | | Outbound Handler 2 | |
/// | +----------+----------+ +-----------+----------+ |
/// | . |
/// | . . |
/// | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
/// | [ method call] [method call] |
/// | . . |
/// | . |
/// | +----------+----------+ +-----------+----------+ |
/// | | Inbound Handler 2 | | Outbound Handler M-1 | |
/// | +----------+----------+ +-----------+----------+ |
/// | | |
/// | | |
/// | +----------+----------+ +-----------+----------+ |
/// | | Inbound Handler 1 | | Outbound Handler M | |
/// | +----------+----------+ +-----------+----------+ |
/// | HEAD | |
/// +---------------+-----------------------------------+---------------+
/// |
/// Inject data Extract data
/// using writeInbound() using readOutbound()
/// ```
///
/// - note: ``NIOAsyncTestingChannel`` is currently only compatible with
/// ``NIOAsyncTestingEventLoop``s and cannot be used with `SelectableEventLoop`s from
/// for example `MultiThreadedEventLoopGroup`.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public final class NIOAsyncTestingChannel: Channel {
/// ``LeftOverState`` represents any left-over inbound, outbound, and pending outbound events that hit the
/// ``NIOAsyncTestingChannel`` and were not consumed when ``finish()`` was called on the ``NIOAsyncTestingChannel``.
///
/// ``NIOAsyncTestingChannel`` is most useful in testing and usually in unit tests, you want to consume all inbound and
/// outbound data to verify they are what you expect. Therefore, when you ``finish()`` a ``NIOAsyncTestingChannel`` it will
/// return if it's either ``LeftOverState/clean`` (no left overs) or that it has ``LeftOverState/leftOvers(inbound:outbound:pendingOutbound:)``.
public enum LeftOverState {
/// The ``NIOAsyncTestingChannel`` is clean, ie. no inbound, outbound, or pending outbound data left on ``NIOAsyncTestingChannel/finish()``.
case clean
/// The ``NIOAsyncTestingChannel`` has inbound, outbound, or pending outbound data left on ``NIOAsyncTestingChannel/finish()``.
case leftOvers(inbound: CircularBuffer<NIOAny>, outbound: CircularBuffer<NIOAny>, pendingOutbound: [NIOAny])
/// `true` if the ``NIOAsyncTestingChannel`` was `clean` on ``NIOAsyncTestingChannel/finish()``, ie. there is no unconsumed inbound, outbound, or
/// pending outbound data left on the `Channel`.
public var isClean: Bool {
if case .clean = self {
return true
} else {
return false
}
}
/// `true` if the ``NIOAsyncTestingChannel`` if there was unconsumed inbound, outbound, or pending outbound data left
/// on the `Channel` when it was `finish`ed.
public var hasLeftOvers: Bool {
return !self.isClean
}
}
/// ``BufferState`` represents the state of either the inbound, or the outbound ``NIOAsyncTestingChannel`` buffer.
///
/// These buffers contain data that travelled the `ChannelPipeline` all the way through..
///
/// If the last `ChannelHandler` explicitly (by calling `fireChannelRead`) or implicitly (by not implementing
/// `channelRead`) sends inbound data into the end of the ``NIOAsyncTestingChannel``, it will be held in the
/// ``NIOAsyncTestingChannel``'s inbound buffer. Similarly for `write` on the outbound side. The state of the respective
/// buffer will be returned from ``writeInbound(_:)``/``writeOutbound(_:)`` as a ``BufferState``.
public enum BufferState {
/// The buffer is empty.
case empty
/// The buffer is non-empty.
case full(CircularBuffer<NIOAny>)
/// Returns `true` is the buffer was empty.
public var isEmpty: Bool {
if case .empty = self {
return true
} else {
return false
}
}
/// Returns `true` if the buffer was non-empty.
public var isFull: Bool {
return !self.isEmpty
}
}
/// ``WrongTypeError`` is thrown if you use ``readInbound(as:)`` or ``readOutbound(as:)`` and request a certain type but the first
/// item in the respective buffer is of a different type.
public struct WrongTypeError: Error, Equatable {
/// The type you expected.
public let expected: Any.Type
/// The type of the actual first element.
public let actual: Any.Type
public init(expected: Any.Type, actual: Any.Type) {
self.expected = expected
self.actual = actual
}
public static func == (lhs: WrongTypeError, rhs: WrongTypeError) -> Bool {
return lhs.expected == rhs.expected && lhs.actual == rhs.actual
}
}
/// Returns `true` if the ``NIOAsyncTestingChannel`` is 'active'.
///
/// An active ``NIOAsyncTestingChannel`` can be closed by calling `close` or ``finish()`` on the ``NIOAsyncTestingChannel``.
///
/// - note: An ``NIOAsyncTestingChannel`` starts _inactive_ and can be activated, for example by calling `connect`.
public var isActive: Bool { return channelcore.isActive }
/// - see: `Channel.closeFuture`
public var closeFuture: EventLoopFuture<Void> { return channelcore.closePromise.futureResult }
/// - see: `Channel.allocator`
public let allocator: ByteBufferAllocator = ByteBufferAllocator()
/// - see: `Channel.eventLoop`
public var eventLoop: EventLoop {
return self.testingEventLoop
}
/// Returns the ``NIOAsyncTestingEventLoop`` that this ``NIOAsyncTestingChannel`` uses. This will return the same instance as
/// ``NIOAsyncTestingChannel/eventLoop`` but as the concrete ``NIOAsyncTestingEventLoop`` rather than as `EventLoop` existential.
public let testingEventLoop: NIOAsyncTestingEventLoop
/// `nil` because ``NIOAsyncTestingChannel``s don't have parents.
public let parent: Channel? = nil
// This is only written once, from a single thread, and never written again, so it's _technically_ thread-safe. Most methods cannot safely
// be used from multiple threads, but `isActive`, `isOpen`, `eventLoop`, and `closeFuture` can all safely be used from any thread. Just.
@usableFromInline
/*private but usableFromInline */ var channelcore: EmbeddedChannelCore!
/// Guards any of the getters/setters that can be accessed from any thread.
private let stateLock: Lock = Lock()
// Guarded by `stateLock`
private var _isWritable: Bool = true
// Guarded by `stateLock`
private var _localAddress: SocketAddress? = nil
// Guarded by `stateLock`
private var _remoteAddress: SocketAddress? = nil
private var _pipeline: ChannelPipeline!
/// - see: `Channel._channelCore`
public var _channelCore: ChannelCore {
return channelcore
}
/// - see: `Channel.pipeline`
public var pipeline: ChannelPipeline {
return _pipeline
}
/// - see: `Channel.isWritable`
public var isWritable: Bool {
get {
return self.stateLock.withLock { self._isWritable }
}
set {
self.stateLock.withLockVoid {
self._isWritable = newValue
}
}
}
/// - see: `Channel.localAddress`
public var localAddress: SocketAddress? {
get {
return self.stateLock.withLock { self._localAddress }
}
set {
self.stateLock.withLockVoid {
self._localAddress = newValue
}
}
}
/// - see: `Channel.remoteAddress`
public var remoteAddress: SocketAddress? {
get {
return self.stateLock.withLock { self._remoteAddress }
}
set {
self.stateLock.withLockVoid {
self._remoteAddress = newValue
}
}
}
/// Create a new instance.
///
/// During creation it will automatically also register itself on the ``NIOAsyncTestingEventLoop``.
///
/// - parameters:
/// - loop: The ``NIOAsyncTestingEventLoop`` to use.
public init(loop: NIOAsyncTestingEventLoop = NIOAsyncTestingEventLoop()) {
self.testingEventLoop = loop
self._pipeline = ChannelPipeline(channel: self)
self.channelcore = EmbeddedChannelCore(pipeline: self._pipeline, eventLoop: self.eventLoop)
}
/// Create a new instance.
///
/// During creation it will automatically also register itself on the ``NIOAsyncTestingEventLoop``.
///
/// - parameters:
/// - handler: The `ChannelHandler` to add to the `ChannelPipeline` before register.
/// - loop: The ``NIOAsyncTestingEventLoop`` to use.
public convenience init(handler: ChannelHandler, loop: NIOAsyncTestingEventLoop = NIOAsyncTestingEventLoop()) async {
await self.init(handlers: [handler], loop: loop)
}
/// Create a new instance.
///
/// During creation it will automatically also register itself on the ``NIOAsyncTestingEventLoop``.
///
/// - parameters:
/// - handlers: The `ChannelHandler`s to add to the `ChannelPipeline` before register.
/// - loop: The ``NIOAsyncTestingEventLoop`` to use.
public convenience init(handlers: [ChannelHandler], loop: NIOAsyncTestingEventLoop = NIOAsyncTestingEventLoop()) async {
self.init(loop: loop)
try! await self._pipeline.addHandlers(handlers)
// This will never throw...
try! await self.register()
}
/// Asynchronously closes the ``NIOAsyncTestingChannel``.
///
/// Errors in the ``NIOAsyncTestingChannel`` can be consumed using ``throwIfErrorCaught()``.
///
/// - parameters:
/// - acceptAlreadyClosed: Whether ``finish()`` should throw if the ``NIOAsyncTestingChannel`` has been previously `close`d.
/// - returns: The ``LeftOverState`` of the ``NIOAsyncTestingChannel``. If all the inbound and outbound events have been
/// consumed (using ``readInbound(as:)`` / ``readOutbound(as:)``) and there are no pending outbound events (unflushed
/// writes) this will be ``LeftOverState/clean``. If there are any unconsumed inbound, outbound, or pending outbound
/// events, the ``NIOAsyncTestingChannel`` will returns those as ``LeftOverState/leftOvers(inbound:outbound:pendingOutbound:)``.
public func finish(acceptAlreadyClosed: Bool) async throws -> LeftOverState {
do {
try await self.close().get()
} catch let error as ChannelError {
guard error == .alreadyClosed && acceptAlreadyClosed else {
throw error
}
}
// This can never actually throw.
try! await self.testingEventLoop.executeInContext {
self.testingEventLoop.drainScheduledTasksByRunningAllCurrentlyScheduledTasks()
}
await self.testingEventLoop.run()
try await throwIfErrorCaught()
// This can never actually throw.
return try! await self.testingEventLoop.executeInContext {
let c = self.channelcore!
if c.outboundBuffer.isEmpty && c.inboundBuffer.isEmpty && c.pendingOutboundBuffer.isEmpty {
return .clean
} else {
return .leftOvers(inbound: c.inboundBuffer,
outbound: c.outboundBuffer,
pendingOutbound: c.pendingOutboundBuffer.map { $0.0 })
}
}
}
/// Asynchronously closes the ``NIOAsyncTestingChannel``.
///
/// This method will throw if the `Channel` hit any unconsumed errors or if the `close` fails. Errors in the
/// ``NIOAsyncTestingChannel`` can be consumed using ``throwIfErrorCaught()``.
///
/// - returns: The ``LeftOverState`` of the ``NIOAsyncTestingChannel``. If all the inbound and outbound events have been
/// consumed (using ``readInbound(as:)`` / ``readOutbound(as:)``) and there are no pending outbound events (unflushed
/// writes) this will be ``LeftOverState/clean``. If there are any unconsumed inbound, outbound, or pending outbound
/// events, the ``NIOAsyncTestingChannel`` will returns those as ``LeftOverState/leftOvers(inbound:outbound:pendingOutbound:)``.
public func finish() async throws -> LeftOverState {
return try await self.finish(acceptAlreadyClosed: false)
}
/// If available, this method reads one element of type `T` out of the ``NIOAsyncTestingChannel``'s outbound buffer. If the
/// first element was of a different type than requested, ``WrongTypeError`` will be thrown, if there
/// are no elements in the outbound buffer, `nil` will be returned.
///
/// Data hits the ``NIOAsyncTestingChannel``'s outbound buffer when data was written using `write`, then `flush`ed, and
/// then travelled the `ChannelPipeline` all the way to the front. For data to hit the outbound buffer, the very
/// first `ChannelHandler` must have written and flushed it either explicitly (by calling
/// `ChannelHandlerContext.write` and `flush`) or implicitly by not implementing `write`/`flush`.
///
/// - note: Outbound events travel the `ChannelPipeline` _back to front_.
/// - note: ``NIOAsyncTestingChannel/writeOutbound(_:)`` will `write` data through the `ChannelPipeline`, starting with last
/// `ChannelHandler`.
@inlinable
public func readOutbound<T: Sendable>(as type: T.Type = T.self) async throws -> T? {
try await self.testingEventLoop.executeInContext {
try self._readFromBuffer(buffer: &self.channelcore.outboundBuffer)
}
}
/// If available, this method reads one element of type `T` out of the ``NIOAsyncTestingChannel``'s inbound buffer. If the
/// first element was of a different type than requested, ``WrongTypeError`` will be thrown, if there
/// are no elements in the outbound buffer, `nil` will be returned.
///
/// Data hits the ``NIOAsyncTestingChannel``'s inbound buffer when data was send through the pipeline using `fireChannelRead`
/// and then travelled the `ChannelPipeline` all the way to the back. For data to hit the inbound buffer, the
/// last `ChannelHandler` must have send the event either explicitly (by calling
/// `ChannelHandlerContext.fireChannelRead`) or implicitly by not implementing `channelRead`.
///
/// - note: ``NIOAsyncTestingChannel/writeInbound(_:)`` will fire data through the `ChannelPipeline` using `fireChannelRead`.
@inlinable
public func readInbound<T: Sendable>(as type: T.Type = T.self) async throws -> T? {
try await self.testingEventLoop.executeInContext {
try self._readFromBuffer(buffer: &self.channelcore.inboundBuffer)
}
}
/// Sends an inbound `channelRead` event followed by a `channelReadComplete` event through the `ChannelPipeline`.
///
/// The immediate effect being that the first `ChannelInboundHandler` will get its `channelRead` method called
/// with the data you provide.
///
/// - parameters:
/// - data: The data to fire through the pipeline.
/// - returns: The state of the inbound buffer which contains all the events that travelled the `ChannelPipeline`
// all the way.
@inlinable
@discardableResult public func writeInbound<T: Sendable>(_ data: T) async throws -> BufferState {
try await self.testingEventLoop.executeInContext {
self.pipeline.fireChannelRead(NIOAny(data))
self.pipeline.fireChannelReadComplete()
try self._throwIfErrorCaught()
return self.channelcore.inboundBuffer.isEmpty ? .empty : .full(self.channelcore.inboundBuffer)
}
}
/// Sends an outbound `writeAndFlush` event through the `ChannelPipeline`.
///
/// The immediate effect being that the first `ChannelOutboundHandler` will get its `write` method called
/// with the data you provide. Note that the first `ChannelOutboundHandler` in the pipeline is the _last_ handler
/// because outbound events travel the pipeline from back to front.
///
/// - parameters:
/// - data: The data to fire through the pipeline.
/// - returns: The state of the outbound buffer which contains all the events that travelled the `ChannelPipeline`
// all the way.
@inlinable
@discardableResult public func writeOutbound<T: Sendable>(_ data: T) async throws -> BufferState {
try await self.writeAndFlush(NIOAny(data))
return try await self.testingEventLoop.executeInContext {
return self.channelcore.outboundBuffer.isEmpty ? .empty : .full(self.channelcore.outboundBuffer)
}
}
/// This method will throw the error that is stored in the ``NIOAsyncTestingChannel`` if any.
///
/// The ``NIOAsyncTestingChannel`` will store an error if some error travels the `ChannelPipeline` all the way past its end.
public func throwIfErrorCaught() async throws {
try await self.testingEventLoop.executeInContext {
try self._throwIfErrorCaught()
}
}
@usableFromInline
func _throwIfErrorCaught() throws {
self.testingEventLoop.preconditionInEventLoop()
if let error = self.channelcore.error {
self.channelcore.error = nil
throw error
}
}
@inlinable
func _readFromBuffer<T>(buffer: inout CircularBuffer<NIOAny>) throws -> T? {
self.testingEventLoop.preconditionInEventLoop()
if buffer.isEmpty {
return nil
}
let elem = buffer.removeFirst()
guard let t = self._channelCore.tryUnwrapData(elem, as: T.self) else {
throw WrongTypeError(expected: T.self, actual: type(of: self._channelCore.tryUnwrapData(elem, as: Any.self)!))
}
return t
}
/// - see: `Channel.setOption`
@inlinable
public func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) -> EventLoopFuture<Void> {
if self.eventLoop.inEventLoop {
self.setOptionSync(option, value: value)
return self.eventLoop.makeSucceededVoidFuture()
} else {
return self.eventLoop.submit { self.setOptionSync(option, value: value) }
}
}
@inlinable
internal func setOptionSync<Option: ChannelOption>(_ option: Option, value: Option.Value) {
// No options supported
fatalError("no options supported")
}
/// - see: `Channel.getOption`
@inlinable
public func getOption<Option: ChannelOption>(_ option: Option) -> EventLoopFuture<Option.Value> {
if self.eventLoop.inEventLoop {
return self.eventLoop.makeSucceededFuture(self.getOptionSync(option))
} else {
return self.eventLoop.submit { self.getOptionSync(option) }
}
}
@inlinable
internal func getOptionSync<Option: ChannelOption>(_ option: Option) -> Option.Value {
if option is ChannelOptions.Types.AutoReadOption {
return true as! Option.Value
}
fatalError("option \(option) not supported")
}
/// Fires the (outbound) `bind` event through the `ChannelPipeline`. If the event hits the ``NIOAsyncTestingChannel`` which
/// happens when it travels the `ChannelPipeline` all the way to the front, this will also set the
/// ``NIOAsyncTestingChannel``'s ``localAddress``.
///
/// - parameters:
/// - address: The address to fake-bind to.
/// - promise: The `EventLoopPromise` which will be fulfilled when the fake-bind operation has been done.
public func bind(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
promise?.futureResult.whenSuccess {
self.localAddress = address
}
if self.eventLoop.inEventLoop {
self.pipeline.bind(to: address, promise: promise)
} else {
self.eventLoop.execute {
self.pipeline.bind(to: address, promise: promise)
}
}
}
/// Fires the (outbound) `connect` event through the `ChannelPipeline`. If the event hits the ``NIOAsyncTestingChannel``
/// which happens when it travels the `ChannelPipeline` all the way to the front, this will also set the
/// ``NIOAsyncTestingChannel``'s ``remoteAddress``.
///
/// - parameters:
/// - address: The address to fake-bind to.
/// - promise: The `EventLoopPromise` which will be fulfilled when the fake-bind operation has been done.
public func connect(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
promise?.futureResult.whenSuccess {
self.remoteAddress = address
}
if self.eventLoop.inEventLoop {
self.pipeline.connect(to: address, promise: promise)
} else {
self.eventLoop.execute {
self.pipeline.connect(to: address, promise: promise)
}
}
}
public struct SynchronousOptions: NIOSynchronousChannelOptions {
@usableFromInline
internal let channel: NIOAsyncTestingChannel
fileprivate init(channel: NIOAsyncTestingChannel) {
self.channel = channel
}
@inlinable
public func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) throws {
self.channel.eventLoop.preconditionInEventLoop()
self.channel.setOptionSync(option, value: value)
}
@inlinable
public func getOption<Option: ChannelOption>(_ option: Option) throws -> Option.Value {
self.channel.eventLoop.preconditionInEventLoop()
return self.channel.getOptionSync(option)
}
}
public final var syncOptions: NIOSynchronousChannelOptions? {
return SynchronousOptions(channel: self)
}
}
// MARK: Unchecked sendable
//
// Both of these types are unchecked Sendable because strictly, they aren't. This is
// because they contain NIOAny, a non-Sendable type. In this instance, we tolerate the moving
// of this object across threads because in the overwhelming majority of cases the data types
// in a channel pipeline _are_ `Sendable`, and because these objects only carry NIOAnys in cases
// where the `Channel` itself no longer holds a reference to these objects.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension NIOAsyncTestingChannel.LeftOverState: @unchecked Sendable { }
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension NIOAsyncTestingChannel.BufferState: @unchecked Sendable { }
#endif

View File

@ -2,7 +2,7 @@
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2021 Apple Inc. and the SwiftNIO project authors
// Copyright (c) 2017-2022 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
@ -12,6 +12,8 @@
//
//===----------------------------------------------------------------------===//
import Atomics
import NIOConcurrencyHelpers
import Dispatch
import _NIODataStructures
import NIOCore
@ -235,11 +237,29 @@ public final class EmbeddedEventLoop: EventLoop {
@usableFromInline
class EmbeddedChannelCore: ChannelCore {
var isOpen: Bool = true
var isActive: Bool = false
var isOpen: Bool {
get {
return self._isOpen.load(ordering: .sequentiallyConsistent)
}
set {
self._isOpen.store(newValue, ordering: .sequentiallyConsistent)
}
}
var eventLoop: EventLoop
var closePromise: EventLoopPromise<Void>
var isActive: Bool {
get {
return self._isActive.load(ordering: .sequentiallyConsistent)
}
set {
self._isActive.store(newValue, ordering: .sequentiallyConsistent)
}
}
private let _isOpen = ManagedAtomic(true)
private let _isActive = ManagedAtomic(false)
let eventLoop: EventLoop
let closePromise: EventLoopPromise<Void>
var error: Optional<Error>
private let pipeline: ChannelPipeline
@ -273,16 +293,19 @@ class EmbeddedChannelCore: ChannelCore {
@usableFromInline
func localAddress0() throws -> SocketAddress {
self.eventLoop.preconditionInEventLoop()
throw ChannelError.operationUnsupported
}
@usableFromInline
func remoteAddress0() throws -> SocketAddress {
self.eventLoop.preconditionInEventLoop()
throw ChannelError.operationUnsupported
}
@usableFromInline
func close0(error: Error, mode: CloseMode, promise: EventLoopPromise<Void>?) {
self.eventLoop.preconditionInEventLoop()
guard self.isOpen else {
promise?.fail(ChannelError.alreadyClosed)
return
@ -304,11 +327,13 @@ class EmbeddedChannelCore: ChannelCore {
@usableFromInline
func bind0(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
self.eventLoop.preconditionInEventLoop()
promise?.succeed(())
}
@usableFromInline
func connect0(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
self.eventLoop.preconditionInEventLoop()
isActive = true
promise?.succeed(())
self.pipeline.syncOperations.fireChannelActive()
@ -316,12 +341,14 @@ class EmbeddedChannelCore: ChannelCore {
@usableFromInline
func register0(promise: EventLoopPromise<Void>?) {
self.eventLoop.preconditionInEventLoop()
promise?.succeed(())
self.pipeline.syncOperations.fireChannelRegistered()
}
@usableFromInline
func registerAlreadyConfigured0(promise: EventLoopPromise<Void>?) {
self.eventLoop.preconditionInEventLoop()
isActive = true
register0(promise: promise)
self.pipeline.syncOperations.fireChannelActive()
@ -329,11 +356,13 @@ class EmbeddedChannelCore: ChannelCore {
@usableFromInline
func write0(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
self.eventLoop.preconditionInEventLoop()
self.pendingOutboundBuffer.append((data, promise))
}
@usableFromInline
func flush0() {
self.eventLoop.preconditionInEventLoop()
self.pendingOutboundBuffer.mark()
while self.pendingOutboundBuffer.hasMark, let dataAndPromise = self.pendingOutboundBuffer.popFirst() {
@ -344,25 +373,30 @@ class EmbeddedChannelCore: ChannelCore {
@usableFromInline
func read0() {
self.eventLoop.preconditionInEventLoop()
// NOOP
}
public final func triggerUserOutboundEvent0(_ event: Any, promise: EventLoopPromise<Void>?) {
self.eventLoop.preconditionInEventLoop()
promise?.fail(ChannelError.operationUnsupported)
}
@usableFromInline
func channelRead0(_ data: NIOAny) {
self.eventLoop.preconditionInEventLoop()
addToBuffer(buffer: &inboundBuffer, data: data)
}
public func errorCaught0(error: Error) {
self.eventLoop.preconditionInEventLoop()
if self.error == nil {
self.error = error
}
}
private func addToBuffer<T>(buffer: inout CircularBuffer<T>, data: T) {
self.eventLoop.preconditionInEventLoop()
buffer.append(data)
}
}

View File

@ -50,6 +50,7 @@ class LinuxMainRunnerImpl: LinuxMainRunner {
testCase(AddressedEnvelopeTests.allTests),
testCase(ApplicationProtocolNegotiationHandlerTests.allTests),
testCase(AsyncSequenceCollectTests.allTests),
testCase(AsyncTestingChannelTests.allTests),
testCase(Base64Test.allTests),
testCase(BaseObjectTest.allTests),
testCase(BlockingIOThreadPoolTest.allTests),

View File

@ -0,0 +1,59 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2022 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
//
// AsyncTestingChannelTests+XCTest.swift
//
import XCTest
///
/// NOTE: This file was generated by generate_linux_tests.rb
///
/// Do NOT edit this file directly as it will be regenerated automatically when needed.
///
extension AsyncTestingChannelTests {
@available(*, deprecated, message: "not actually deprecated. Just deprecated to allow deprecated tests (which test deprecated functionality) without warnings")
static var allTests : [(String, (AsyncTestingChannelTests) -> () throws -> Void)] {
return [
("testSingleHandlerInit", testSingleHandlerInit),
("testEmptyInit", testEmptyInit),
("testMultipleHandlerInit", testMultipleHandlerInit),
("testWriteOutboundByteBuffer", testWriteOutboundByteBuffer),
("testWriteOutboundByteBufferMultipleTimes", testWriteOutboundByteBufferMultipleTimes),
("testWriteInboundByteBuffer", testWriteInboundByteBuffer),
("testWriteInboundByteBufferMultipleTimes", testWriteInboundByteBufferMultipleTimes),
("testWriteInboundByteBufferReThrow", testWriteInboundByteBufferReThrow),
("testWriteOutboundByteBufferReThrow", testWriteOutboundByteBufferReThrow),
("testReadOutboundWrongTypeThrows", testReadOutboundWrongTypeThrows),
("testReadInboundWrongTypeThrows", testReadInboundWrongTypeThrows),
("testWrongTypesWithFastpathTypes", testWrongTypesWithFastpathTypes),
("testCloseMultipleTimesThrows", testCloseMultipleTimesThrows),
("testCloseOnInactiveIsOk", testCloseOnInactiveIsOk),
("testEmbeddedLifecycle", testEmbeddedLifecycle),
("testEmbeddedChannelAndPipelineAndChannelCoreShareTheEventLoop", testEmbeddedChannelAndPipelineAndChannelCoreShareTheEventLoop),
("testSendingAnythingOnEmbeddedChannel", testSendingAnythingOnEmbeddedChannel),
("testActiveWhenConnectPromiseFiresAndInactiveWhenClosePromiseFires", testActiveWhenConnectPromiseFiresAndInactiveWhenClosePromiseFires),
("testWriteWithoutFlushDoesNotWrite", testWriteWithoutFlushDoesNotWrite),
("testSetLocalAddressAfterSuccessfulBind", testSetLocalAddressAfterSuccessfulBind),
("testSetRemoteAddressAfterSuccessfulConnect", testSetRemoteAddressAfterSuccessfulConnect),
("testUnprocessedOutboundUserEventFailsOnEmbeddedChannel", testUnprocessedOutboundUserEventFailsOnEmbeddedChannel),
("testEmbeddedChannelWritabilityIsWritable", testEmbeddedChannelWritabilityIsWritable),
("testFinishWithRecursivelyScheduledTasks", testFinishWithRecursivelyScheduledTasks),
("testSyncOptionsAreSupported", testSyncOptionsAreSupported),
("testSecondFinishThrows", testSecondFinishThrows),
]
}
}

View File

@ -0,0 +1,659 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2022 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import XCTest
import Atomics
import NIOCore
@testable import NIOEmbedded
class AsyncTestingChannelTests: XCTestCase {
func testSingleHandlerInit() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
XCTAsyncTest {
class Handler: ChannelInboundHandler {
typealias InboundIn = Never
}
let channel = await NIOAsyncTestingChannel(handler: Handler())
XCTAssertNoThrow(try channel.pipeline.handler(type: Handler.self).wait())
}
#else
throw XCTSkip()
#endif
}
func testEmptyInit() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
class Handler: ChannelInboundHandler {
typealias InboundIn = Never
}
let channel = NIOAsyncTestingChannel()
XCTAssertThrowsError(try channel.pipeline.handler(type: Handler.self).wait()) { e in
XCTAssertEqual(e as? ChannelPipelineError, .notFound)
}
#else
throw XCTSkip()
#endif
}
func testMultipleHandlerInit() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
XCTAsyncTest {
class Handler: ChannelInboundHandler, RemovableChannelHandler {
typealias InboundIn = Never
let identifier: String
init(identifier: String) {
self.identifier = identifier
}
}
let channel = await NIOAsyncTestingChannel(
handlers: [Handler(identifier: "0"), Handler(identifier: "1"), Handler(identifier: "2")]
)
XCTAssertNoThrow(XCTAssertEqual(try channel.pipeline.handler(type: Handler.self).wait().identifier, "0"))
XCTAssertNoThrow(try channel.pipeline.removeHandler(name: "handler0").wait())
XCTAssertNoThrow(XCTAssertEqual(try channel.pipeline.handler(type: Handler.self).wait().identifier, "1"))
XCTAssertNoThrow(try channel.pipeline.removeHandler(name: "handler1").wait())
XCTAssertNoThrow(XCTAssertEqual(try channel.pipeline.handler(type: Handler.self).wait().identifier, "2"))
XCTAssertNoThrow(try channel.pipeline.removeHandler(name: "handler2").wait())
}
#else
throw XCTSkip()
#endif
}
func testWriteOutboundByteBuffer() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
XCTAsyncTest {
let channel = NIOAsyncTestingChannel()
var buf = channel.allocator.buffer(capacity: 1024)
buf.writeString("hello")
let isFull = try await channel.writeOutbound(buf).isFull
XCTAssertTrue(isFull)
let hasLeftovers = try await channel.finish().hasLeftOvers
XCTAssertTrue(hasLeftovers)
let read = try await channel.readOutbound(as: ByteBuffer.self)
XCTAssertNoThrow(XCTAssertEqual(buf, read))
let nextOutboundRead = try await channel.readOutbound(as: ByteBuffer.self)
let nextInboundRead = try await channel.readInbound(as: ByteBuffer.self)
XCTAssertNoThrow(XCTAssertNil(nextOutboundRead))
XCTAssertNoThrow(XCTAssertNil(nextInboundRead))
}
#else
throw XCTSkip()
#endif
}
func testWriteOutboundByteBufferMultipleTimes() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
XCTAsyncTest {
let channel = NIOAsyncTestingChannel()
var buf = channel.allocator.buffer(capacity: 1024)
buf.writeString("hello")
try await XCTAsyncAssertTrue(await channel.writeOutbound(buf).isFull)
try await XCTAsyncAssertEqual(buf, await channel.readOutbound())
try await XCTAsyncAssertNil(await channel.readOutbound(as: ByteBuffer.self))
try await XCTAsyncAssertNil(await channel.readInbound(as: ByteBuffer.self))
var bufB = channel.allocator.buffer(capacity: 1024)
bufB.writeString("again")
try await XCTAsyncAssertTrue(await channel.writeOutbound(bufB).isFull)
try await XCTAsyncAssertTrue(await channel.finish().hasLeftOvers)
try await XCTAsyncAssertEqual(bufB, await channel.readOutbound())
try await XCTAsyncAssertNil(await channel.readOutbound(as: ByteBuffer.self))
try await XCTAsyncAssertNil(await channel.readInbound(as: ByteBuffer.self))
}
#else
throw XCTSkip()
#endif
}
func testWriteInboundByteBuffer() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
XCTAsyncTest {
let channel = NIOAsyncTestingChannel()
var buf = channel.allocator.buffer(capacity: 1024)
buf.writeString("hello")
try await XCTAsyncAssertTrue(await channel.writeInbound(buf).isFull)
try await XCTAsyncAssertTrue(await channel.finish().hasLeftOvers)
try await XCTAsyncAssertEqual(buf, await channel.readInbound())
try await XCTAsyncAssertNil(await channel.readInbound(as: ByteBuffer.self))
try await XCTAsyncAssertNil(await channel.readOutbound(as: ByteBuffer.self))
}
#else
throw XCTSkip()
#endif
}
func testWriteInboundByteBufferMultipleTimes() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
XCTAsyncTest {
let channel = NIOAsyncTestingChannel()
var buf = channel.allocator.buffer(capacity: 1024)
buf.writeString("hello")
try await XCTAsyncAssertTrue(await channel.writeInbound(buf).isFull)
try await XCTAsyncAssertEqual(buf, await channel.readInbound())
try await XCTAsyncAssertNil(await channel.readInbound(as: ByteBuffer.self))
try await XCTAsyncAssertNil(await channel.readOutbound(as: ByteBuffer.self))
var bufB = channel.allocator.buffer(capacity: 1024)
bufB.writeString("again")
try await XCTAsyncAssertTrue(await channel.writeInbound(bufB).isFull)
try await XCTAsyncAssertTrue(await channel.finish().hasLeftOvers)
try await XCTAsyncAssertEqual(bufB, await channel.readInbound())
try await XCTAsyncAssertNil(await channel.readInbound(as: ByteBuffer.self))
try await XCTAsyncAssertNil(await channel.readOutbound(as: ByteBuffer.self))
}
#else
throw XCTSkip()
#endif
}
func testWriteInboundByteBufferReThrow() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
XCTAsyncTest {
let channel = NIOAsyncTestingChannel()
XCTAssertNoThrow(try channel.pipeline.addHandler(ExceptionThrowingInboundHandler()).wait())
await XCTAsyncAssertThrowsError(try await channel.writeInbound("msg")) { error in
XCTAssertEqual(ChannelError.operationUnsupported, error as? ChannelError)
}
try await XCTAsyncAssertTrue(await channel.finish().isClean)
}
#else
throw XCTSkip()
#endif
}
func testWriteOutboundByteBufferReThrow() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
XCTAsyncTest {
let channel = NIOAsyncTestingChannel()
XCTAssertNoThrow(try channel.pipeline.addHandler(ExceptionThrowingOutboundHandler()).wait())
await XCTAsyncAssertThrowsError(try await channel.writeOutbound("msg")) { error in
XCTAssertEqual(ChannelError.operationUnsupported, error as? ChannelError)
}
try await XCTAsyncAssertTrue(await channel.finish().isClean)
}
#else
throw XCTSkip()
#endif
}
func testReadOutboundWrongTypeThrows() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
XCTAsyncTest {
let channel = NIOAsyncTestingChannel()
try await XCTAsyncAssertTrue(await channel.writeOutbound("hello").isFull)
do {
_ = try await channel.readOutbound(as: Int.self)
XCTFail()
} catch let error as NIOAsyncTestingChannel.WrongTypeError {
let expectedError = NIOAsyncTestingChannel.WrongTypeError(expected: Int.self, actual: String.self)
XCTAssertEqual(error, expectedError)
} catch {
XCTFail()
}
}
#else
throw XCTSkip()
#endif
}
func testReadInboundWrongTypeThrows() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
XCTAsyncTest {
let channel = NIOAsyncTestingChannel()
try await XCTAsyncAssertTrue(await channel.writeInbound("hello").isFull)
do {
_ = try await channel.readInbound(as: Int.self)
XCTFail()
} catch let error as NIOAsyncTestingChannel.WrongTypeError {
let expectedError = NIOAsyncTestingChannel.WrongTypeError(expected: Int.self, actual: String.self)
XCTAssertEqual(error, expectedError)
} catch {
XCTFail()
}
}
#else
throw XCTSkip()
#endif
}
func testWrongTypesWithFastpathTypes() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
XCTAsyncTest {
let channel = NIOAsyncTestingChannel()
let buffer = channel.allocator.buffer(capacity: 0)
try await XCTAsyncAssertTrue(await channel.writeOutbound(buffer).isFull)
try await XCTAsyncAssertTrue(await channel.writeOutbound(
AddressedEnvelope<ByteBuffer>(remoteAddress: SocketAddress(ipAddress: "1.2.3.4", port: 5678),
data: buffer)).isFull)
try await XCTAsyncAssertTrue(await channel.writeOutbound(buffer).isFull)
try await XCTAsyncAssertTrue(await channel.writeInbound(buffer).isFull)
try await XCTAsyncAssertTrue(await channel.writeInbound(
AddressedEnvelope<ByteBuffer>(remoteAddress: SocketAddress(ipAddress: "1.2.3.4", port: 5678),
data: buffer)).isFull)
try await XCTAsyncAssertTrue(await channel.writeInbound(buffer).isFull)
func check<Expected: Sendable, Actual>(expected: Expected.Type,
actual: Actual.Type,
file: StaticString = #file,
line: UInt = #line) async {
do {
_ = try await channel.readOutbound(as: Expected.self)
XCTFail("this should have failed", file: (file), line: line)
} catch let error as NIOAsyncTestingChannel.WrongTypeError {
let expectedError = NIOAsyncTestingChannel.WrongTypeError(expected: Expected.self, actual: Actual.self)
XCTAssertEqual(error, expectedError, file: (file), line: line)
} catch {
XCTFail("unexpected error: \(error)", file: (file), line: line)
}
do {
_ = try await channel.readInbound(as: Expected.self)
XCTFail("this should have failed", file: (file), line: line)
} catch let error as NIOAsyncTestingChannel.WrongTypeError {
let expectedError = NIOAsyncTestingChannel.WrongTypeError(expected: Expected.self, actual: Actual.self)
XCTAssertEqual(error, expectedError, file: (file), line: line)
} catch {
XCTFail("unexpected error: \(error)", file: (file), line: line)
}
}
await check(expected: Never.self, actual: IOData.self)
await check(expected: ByteBuffer.self, actual: AddressedEnvelope<ByteBuffer>.self)
await check(expected: AddressedEnvelope<ByteBuffer>.self, actual: IOData.self)
}
#else
throw XCTSkip()
#endif
}
func testCloseMultipleTimesThrows() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
XCTAsyncTest {
let channel = NIOAsyncTestingChannel()
try await XCTAsyncAssertTrue(await channel.finish().isClean)
// Close a second time. This must fail.
do {
try await channel.close()
XCTFail("Second close succeeded")
} catch ChannelError.alreadyClosed {
// Nothing to do here.
}
}
#else
throw XCTSkip()
#endif
}
func testCloseOnInactiveIsOk() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
XCTAsyncTest {
let channel = NIOAsyncTestingChannel()
let inactiveHandler = CloseInChannelInactiveHandler()
XCTAssertNoThrow(try channel.pipeline.addHandler(inactiveHandler).wait())
try await XCTAsyncAssertTrue(await channel.finish().isClean)
// channelInactive should fire only once.
XCTAssertEqual(inactiveHandler.inactiveNotifications, 1)
}
#else
throw XCTSkip()
#endif
}
func testEmbeddedLifecycle() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
XCTAsyncTest {
let handler = ChannelLifecycleHandler()
XCTAssertEqual(handler.currentState, .unregistered)
let channel = await NIOAsyncTestingChannel(handler: handler)
XCTAssertEqual(handler.currentState, .registered)
XCTAssertFalse(channel.isActive)
XCTAssertNoThrow(try channel.connect(to: try SocketAddress(unixDomainSocketPath: "/fake")).wait())
XCTAssertEqual(handler.currentState, .active)
XCTAssertTrue(channel.isActive)
try await XCTAsyncAssertTrue(await channel.finish().isClean)
XCTAssertEqual(handler.currentState, .unregistered)
XCTAssertFalse(channel.isActive)
}
#else
throw XCTSkip()
#endif
}
private final class ExceptionThrowingInboundHandler : ChannelInboundHandler {
typealias InboundIn = String
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
context.fireErrorCaught(ChannelError.operationUnsupported)
}
}
private final class ExceptionThrowingOutboundHandler : ChannelOutboundHandler {
typealias OutboundIn = String
typealias OutboundOut = Never
public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
promise!.fail(ChannelError.operationUnsupported)
}
}
private final class CloseInChannelInactiveHandler: ChannelInboundHandler {
typealias InboundIn = ByteBuffer
public var inactiveNotifications = 0
public func channelInactive(context: ChannelHandlerContext) {
inactiveNotifications += 1
context.close(promise: nil)
}
}
func testEmbeddedChannelAndPipelineAndChannelCoreShareTheEventLoop() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
XCTAsyncTest {
let channel = NIOAsyncTestingChannel()
let pipelineEventLoop = channel.pipeline.eventLoop
XCTAssert(pipelineEventLoop === channel.eventLoop)
XCTAssert(pipelineEventLoop === (channel._channelCore as! EmbeddedChannelCore).eventLoop)
try await XCTAsyncAssertTrue(await channel.finish().isClean)
}
#else
throw XCTSkip()
#endif
}
func testSendingAnythingOnEmbeddedChannel() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
XCTAsyncTest {
let channel = NIOAsyncTestingChannel()
let buffer = ByteBufferAllocator().buffer(capacity: 5)
let socketAddress = try SocketAddress(unixDomainSocketPath: "path")
let handle = NIOFileHandle(descriptor: 1)
let fileRegion = FileRegion(fileHandle: handle, readerIndex: 1, endIndex: 2)
defer {
// fake descriptor, so shouldn't be closed.
XCTAssertNoThrow(try handle.takeDescriptorOwnership())
}
try await channel.writeAndFlush(1)
try await channel.writeAndFlush("1")
try await channel.writeAndFlush(buffer)
try await channel.writeAndFlush(IOData.byteBuffer(buffer))
try await channel.writeAndFlush(IOData.fileRegion(fileRegion))
try await channel.writeAndFlush(AddressedEnvelope(remoteAddress: socketAddress, data: buffer))
}
#else
throw XCTSkip()
#endif
}
func testActiveWhenConnectPromiseFiresAndInactiveWhenClosePromiseFires() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
XCTAsyncTest {
let channel = NIOAsyncTestingChannel()
XCTAssertFalse(channel.isActive)
let connectPromise = channel.eventLoop.makePromise(of: Void.self)
connectPromise.futureResult.whenComplete { (_: Result<Void, Error>) in
XCTAssertTrue(channel.isActive)
}
channel.connect(to: try SocketAddress(ipAddress: "127.0.0.1", port: 0), promise: connectPromise)
try await connectPromise.futureResult.get()
let closePromise = channel.eventLoop.makePromise(of: Void.self)
closePromise.futureResult.whenComplete { (_: Result<Void, Error>) in
XCTAssertFalse(channel.isActive)
}
channel.close(promise: closePromise)
try await closePromise.futureResult.get()
}
#else
throw XCTSkip()
#endif
}
func testWriteWithoutFlushDoesNotWrite() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
XCTAsyncTest {
let channel = NIOAsyncTestingChannel()
let buf = ByteBuffer(bytes: [1])
let writeFuture = channel.write(buf)
try await XCTAsyncAssertNil(await channel.readOutbound(as: ByteBuffer.self))
XCTAssertFalse(writeFuture.isFulfilled)
channel.flush()
try await XCTAsyncAssertNotNil(await channel.readOutbound(as: ByteBuffer.self))
XCTAssertTrue(writeFuture.isFulfilled)
try await XCTAsyncAssertTrue(await channel.finish().isClean)
}
#else
throw XCTSkip()
#endif
}
func testSetLocalAddressAfterSuccessfulBind() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
let channel = NIOAsyncTestingChannel()
let bindPromise = channel.eventLoop.makePromise(of: Void.self)
let socketAddress = try SocketAddress(ipAddress: "127.0.0.1", port: 0)
channel.bind(to: socketAddress, promise: bindPromise)
bindPromise.futureResult.whenComplete { _ in
XCTAssertEqual(channel.localAddress, socketAddress)
}
try bindPromise.futureResult.wait()
#else
throw XCTSkip()
#endif
}
func testSetRemoteAddressAfterSuccessfulConnect() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
let channel = NIOAsyncTestingChannel()
let connectPromise = channel.eventLoop.makePromise(of: Void.self)
let socketAddress = try SocketAddress(ipAddress: "127.0.0.1", port: 0)
channel.connect(to: socketAddress, promise: connectPromise)
connectPromise.futureResult.whenComplete { _ in
XCTAssertEqual(channel.remoteAddress, socketAddress)
}
try connectPromise.futureResult.wait()
#else
throw XCTSkip()
#endif
}
func testUnprocessedOutboundUserEventFailsOnEmbeddedChannel() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
let channel = NIOAsyncTestingChannel()
XCTAssertThrowsError(try channel.triggerUserOutboundEvent("event").wait()) { (error: Error) in
if let error = error as? ChannelError {
XCTAssertEqual(ChannelError.operationUnsupported, error)
} else {
XCTFail("unexpected error: \(error)")
}
}
#else
throw XCTSkip()
#endif
}
func testEmbeddedChannelWritabilityIsWritable() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
let channel = NIOAsyncTestingChannel()
let opaqueChannel: Channel = channel
XCTAssertTrue(channel.isWritable)
XCTAssertTrue(opaqueChannel.isWritable)
channel.isWritable = false
XCTAssertFalse(channel.isWritable)
XCTAssertFalse(opaqueChannel.isWritable)
#else
throw XCTSkip()
#endif
}
func testFinishWithRecursivelyScheduledTasks() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
XCTAsyncTest {
let channel = NIOAsyncTestingChannel()
let invocations = AtomicCounter()
@Sendable func recursivelyScheduleAndIncrement() {
channel.pipeline.eventLoop.scheduleTask(deadline: .distantFuture) {
invocations.increment()
recursivelyScheduleAndIncrement()
}
}
recursivelyScheduleAndIncrement()
_ = try await channel.finish()
XCTAssertEqual(invocations.load(), 1)
}
#else
throw XCTSkip()
#endif
}
func testSyncOptionsAreSupported() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
let channel = NIOAsyncTestingChannel()
try channel.testingEventLoop.submit {
let options = channel.syncOptions
XCTAssertNotNil(options)
// Unconditionally returns true.
XCTAssertEqual(try options?.getOption(ChannelOptions.autoRead), true)
// (Setting options isn't supported.)
}.wait()
#else
throw XCTSkip()
#endif
}
func testSecondFinishThrows() throws {
#if compiler(>=5.5.2) && canImport(_Concurrency)
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
XCTAsyncTest {
let channel = NIOAsyncTestingChannel()
_ = try await channel.finish()
await XCTAsyncAssertThrowsError(try await channel.finish())
}
#else
throw XCTSkip()
#endif
}
}
#if compiler(>=5.5.2) && canImport(_Concurrency)
fileprivate func XCTAsyncAssertTrue(_ predicate: @autoclosure () async throws -> Bool, file: StaticString = #file, line: UInt = #line) async rethrows {
let result = try await predicate()
XCTAssertTrue(result, file: file, line: line)
}
fileprivate func XCTAsyncAssertEqual<Element: Equatable>(_ lhs: @autoclosure () async throws -> Element, _ rhs: @autoclosure () async throws -> Element, file: StaticString = #file, line: UInt = #line) async rethrows {
let lhsResult = try await lhs()
let rhsResult = try await rhs()
XCTAssertEqual(lhsResult, rhsResult, file: file, line: line)
}
fileprivate func XCTAsyncAssertThrowsError<ResultType>(_ expression: @autoclosure () async throws -> ResultType, file: StaticString = #file, line: UInt = #line, _ callback: Optional<(Error) -> Void> = nil) async {
do {
let _ = try await expression()
XCTFail("Did not throw", file: file, line: line)
} catch {
callback?(error)
}
}
fileprivate func XCTAsyncAssertNil(_ expression: @autoclosure () async throws -> Any?, file: StaticString = #file, line: UInt = #line) async rethrows {
let result = try await expression()
XCTAssertNil(result, file: file, line: line)
}
fileprivate func XCTAsyncAssertNotNil(_ expression: @autoclosure () async throws -> Any?, file: StaticString = #file, line: UInt = #line) async rethrows {
let result = try await expression()
XCTAssertNotNil(result, file: file, line: line)
}
/// A simple atomic counter.
final class AtomicCounter: @unchecked Sendable {
// This class has to be `@unchecked Sendable` because ManagedAtomic
// is not sendable.
private let baseCounter = ManagedAtomic(0)
func increment() {
self.baseCounter.wrappingIncrement(ordering: .relaxed)
}
func load() -> Int {
self.baseCounter.load(ordering: .relaxed)
}
}
#endif

View File

@ -18,8 +18,8 @@ services:
test:
image: swift-nio:18.04-5.4
environment:
- MAX_ALLOCS_ALLOWED_1000_addHandlers=45050
- MAX_ALLOCS_ALLOWED_1000_addHandlers_sync=38050
- MAX_ALLOCS_ALLOWED_1000_addHandlers=47050
- MAX_ALLOCS_ALLOWED_1000_addHandlers_sync=40050
- MAX_ALLOCS_ALLOWED_1000_addRemoveHandlers_handlercontext=9050
- MAX_ALLOCS_ALLOWED_1000_addRemoveHandlers_handlername=9050
- MAX_ALLOCS_ALLOWED_1000_addRemoveHandlers_handlertype=9050
@ -28,7 +28,7 @@ services:
- MAX_ALLOCS_ALLOWED_1000_copying_bytebufferview_to_array=1050
- MAX_ALLOCS_ALLOWED_1000_copying_circularbuffer_to_array=1050
- MAX_ALLOCS_ALLOWED_1000_getHandlers=9050
- MAX_ALLOCS_ALLOWED_1000_getHandlers_sync=35
- MAX_ALLOCS_ALLOWED_1000_getHandlers_sync=37
- MAX_ALLOCS_ALLOWED_1000_reqs_1_conn=30400
- MAX_ALLOCS_ALLOWED_1000_tcpbootstraps=4050
- MAX_ALLOCS_ALLOWED_1000_tcpconnections=158050

View File

@ -18,8 +18,8 @@ services:
test:
image: swift-nio:20.04-5.5
environment:
- MAX_ALLOCS_ALLOWED_1000_addHandlers=45050
- MAX_ALLOCS_ALLOWED_1000_addHandlers_sync=38050
- MAX_ALLOCS_ALLOWED_1000_addHandlers=47050
- MAX_ALLOCS_ALLOWED_1000_addHandlers_sync=40050
- MAX_ALLOCS_ALLOWED_1000_addRemoveHandlers_handlercontext=9050
- MAX_ALLOCS_ALLOWED_1000_addRemoveHandlers_handlername=9050
- MAX_ALLOCS_ALLOWED_1000_addRemoveHandlers_handlertype=9050
@ -28,7 +28,7 @@ services:
- MAX_ALLOCS_ALLOWED_1000_copying_bytebufferview_to_array=1050
- MAX_ALLOCS_ALLOWED_1000_copying_circularbuffer_to_array=1050
- MAX_ALLOCS_ALLOWED_1000_getHandlers=9050
- MAX_ALLOCS_ALLOWED_1000_getHandlers_sync=35
- MAX_ALLOCS_ALLOWED_1000_getHandlers_sync=37
- MAX_ALLOCS_ALLOWED_1000_reqs_1_conn=30400
- MAX_ALLOCS_ALLOWED_1000_tcpbootstraps=4050
- MAX_ALLOCS_ALLOWED_1000_tcpconnections=158050

View File

@ -18,8 +18,8 @@ services:
test:
image: swift-nio:20.04-5.6
environment:
- MAX_ALLOCS_ALLOWED_1000_addHandlers=45050
- MAX_ALLOCS_ALLOWED_1000_addHandlers_sync=38050
- MAX_ALLOCS_ALLOWED_1000_addHandlers=47050
- MAX_ALLOCS_ALLOWED_1000_addHandlers_sync=40050
- MAX_ALLOCS_ALLOWED_1000_addRemoveHandlers_handlercontext=9050
- MAX_ALLOCS_ALLOWED_1000_addRemoveHandlers_handlername=9050
- MAX_ALLOCS_ALLOWED_1000_addRemoveHandlers_handlertype=9050
@ -28,7 +28,7 @@ services:
- MAX_ALLOCS_ALLOWED_1000_copying_bytebufferview_to_array=1050
- MAX_ALLOCS_ALLOWED_1000_copying_circularbuffer_to_array=1050
- MAX_ALLOCS_ALLOWED_1000_getHandlers=9050
- MAX_ALLOCS_ALLOWED_1000_getHandlers_sync=35
- MAX_ALLOCS_ALLOWED_1000_getHandlers_sync=37
- MAX_ALLOCS_ALLOWED_1000_reqs_1_conn=30400
- MAX_ALLOCS_ALLOWED_1000_tcpbootstraps=4050
- MAX_ALLOCS_ALLOWED_1000_tcpconnections=154050

View File

@ -17,8 +17,8 @@ services:
test:
image: swift-nio:20.04-5.7
environment:
- MAX_ALLOCS_ALLOWED_1000_addHandlers=45050
- MAX_ALLOCS_ALLOWED_1000_addHandlers_sync=38050
- MAX_ALLOCS_ALLOWED_1000_addHandlers=47050
- MAX_ALLOCS_ALLOWED_1000_addHandlers_sync=40050
- MAX_ALLOCS_ALLOWED_1000_addRemoveHandlers_handlercontext=9050
- MAX_ALLOCS_ALLOWED_1000_addRemoveHandlers_handlername=9050
- MAX_ALLOCS_ALLOWED_1000_addRemoveHandlers_handlertype=9050
@ -27,7 +27,7 @@ services:
- MAX_ALLOCS_ALLOWED_1000_copying_bytebufferview_to_array=1050
- MAX_ALLOCS_ALLOWED_1000_copying_circularbuffer_to_array=1050
- MAX_ALLOCS_ALLOWED_1000_getHandlers=9050
- MAX_ALLOCS_ALLOWED_1000_getHandlers_sync=35
- MAX_ALLOCS_ALLOWED_1000_getHandlers_sync=37
- MAX_ALLOCS_ALLOWED_1000_reqs_1_conn=30400
- MAX_ALLOCS_ALLOWED_1000_tcpbootstraps=4050
- MAX_ALLOCS_ALLOWED_1000_tcpconnections=153050

View File

@ -17,8 +17,8 @@ services:
test:
image: swift-nio:20.04-main
environment:
- MAX_ALLOCS_ALLOWED_1000_addHandlers=45050
- MAX_ALLOCS_ALLOWED_1000_addHandlers_sync=37050
- MAX_ALLOCS_ALLOWED_1000_addHandlers=47050
- MAX_ALLOCS_ALLOWED_1000_addHandlers_sync=39050
- MAX_ALLOCS_ALLOWED_1000_addRemoveHandlers_handlercontext=9050
- MAX_ALLOCS_ALLOWED_1000_addRemoveHandlers_handlername=9050
- MAX_ALLOCS_ALLOWED_1000_addRemoveHandlers_handlertype=9050
@ -27,7 +27,7 @@ services:
- MAX_ALLOCS_ALLOWED_1000_copying_bytebufferview_to_array=1050
- MAX_ALLOCS_ALLOWED_1000_copying_circularbuffer_to_array=1050
- MAX_ALLOCS_ALLOWED_1000_getHandlers=9050
- MAX_ALLOCS_ALLOWED_1000_getHandlers_sync=35
- MAX_ALLOCS_ALLOWED_1000_getHandlers_sync=37
- MAX_ALLOCS_ALLOWED_1000_reqs_1_conn=30400
- MAX_ALLOCS_ALLOWED_1000_tcpbootstraps=4050
- MAX_ALLOCS_ALLOWED_1000_tcpconnections=153050