Add `NIOAsyncTestingChannel.waitForOut/InboundWrite()` (#2307)

This commit is contained in:
David Nadoba 2022-11-08 12:45:24 +01:00 committed by GitHub
parent af41276062
commit 6e404d1614
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 185 additions and 9 deletions

View File

@ -26,7 +26,8 @@ var targets: [PackageDescription.Target] = [
dependencies: ["NIOCore",
"NIOConcurrencyHelpers",
"_NIODataStructures",
swiftAtomics]),
swiftAtomics,
swiftCollections]),
.target(name: "NIOPosix",
dependencies: ["CNIOLinux",
"CNIODarwin",

View File

@ -363,6 +363,39 @@ public final class NIOAsyncTestingChannel: Channel {
}
}
/// This method is similar to ``NIOAsyncTestingChannel/readOutbound(as:)`` but will wait if the outbound buffer is empty.
/// If available, this method reads one element of type `T` out of the ``NIOAsyncTestingChannel``'s outbound buffer. If the
/// first element was of a different type than requested, ``WrongTypeError`` will be thrown, if there
/// are no elements in the outbound buffer, `nil` will be returned.
///
/// Data hits the ``NIOAsyncTestingChannel``'s outbound buffer when data was written using `write`, then `flush`ed, and
/// then travelled the `ChannelPipeline` all the way to the front. For data to hit the outbound buffer, the very
/// first `ChannelHandler` must have written and flushed it either explicitly (by calling
/// `ChannelHandlerContext.write` and `flush`) or implicitly by not implementing `write`/`flush`.
///
/// - note: Outbound events travel the `ChannelPipeline` _back to front_.
/// - note: ``NIOAsyncTestingChannel/writeOutbound(_:)`` will `write` data through the `ChannelPipeline`, starting with last
/// `ChannelHandler`.
public func waitForOutboundWrite<T: Sendable>(as type: T.Type = T.self) async throws -> T {
try await withCheckedThrowingContinuation { continuation in
self.testingEventLoop.execute {
do {
if let element: T = try self._readFromBuffer(buffer: &self.channelcore.outboundBuffer) {
continuation.resume(returning: element)
return
}
self.channelcore.outboundBufferConsumer.append { element in
continuation.resume(with: Result {
try self._cast(element)
})
}
} catch {
continuation.resume(throwing: error)
}
}
}
}
/// If available, this method reads one element of type `T` out of the ``NIOAsyncTestingChannel``'s inbound buffer. If the
/// first element was of a different type than requested, ``WrongTypeError`` will be thrown, if there
/// are no elements in the outbound buffer, `nil` will be returned.
@ -380,6 +413,37 @@ public final class NIOAsyncTestingChannel: Channel {
}
}
/// This method is similar to ``NIOAsyncTestingChannel/readInbound(as:)`` but will wait if the inbound buffer is empty.
/// If available, this method reads one element of type `T` out of the ``NIOAsyncTestingChannel``'s inbound buffer. If the
/// first element was of a different type than requested, ``WrongTypeError`` will be thrown, if there
/// are no elements in the outbound buffer, this method will wait until an element is in the inbound buffer.
///
/// Data hits the ``NIOAsyncTestingChannel``'s inbound buffer when data was send through the pipeline using `fireChannelRead`
/// and then travelled the `ChannelPipeline` all the way to the back. For data to hit the inbound buffer, the
/// last `ChannelHandler` must have send the event either explicitly (by calling
/// `ChannelHandlerContext.fireChannelRead`) or implicitly by not implementing `channelRead`.
///
/// - note: ``NIOAsyncTestingChannel/writeInbound(_:)`` will fire data through the `ChannelPipeline` using `fireChannelRead`.
public func waitForInboundWrite<T: Sendable>(as type: T.Type = T.self) async throws -> T {
try await withCheckedThrowingContinuation { continuation in
self.testingEventLoop.execute {
do {
if let element: T = try self._readFromBuffer(buffer: &self.channelcore.inboundBuffer) {
continuation.resume(returning: element)
return
}
self.channelcore.inboundBufferConsumer.append { element in
continuation.resume(with: Result {
try self._cast(element)
})
}
} catch {
continuation.resume(throwing: error)
}
}
}
}
/// Sends an inbound `channelRead` event followed by a `channelReadComplete` event through the `ChannelPipeline`.
///
/// The immediate effect being that the first `ChannelInboundHandler` will get its `channelRead` method called
@ -436,6 +500,7 @@ public final class NIOAsyncTestingChannel: Channel {
}
}
@inlinable
func _readFromBuffer<T>(buffer: inout CircularBuffer<NIOAny>) throws -> T? {
self.testingEventLoop.preconditionInEventLoop()
@ -443,9 +508,13 @@ public final class NIOAsyncTestingChannel: Channel {
if buffer.isEmpty {
return nil
}
let elem = buffer.removeFirst()
guard let t = self._channelCore.tryUnwrapData(elem, as: T.self) else {
throw WrongTypeError(expected: T.self, actual: type(of: self._channelCore.tryUnwrapData(elem, as: Any.self)!))
return try self._cast(buffer.removeFirst(), to: T.self)
}
@inlinable
func _cast<T>(_ element: NIOAny, to: T.Type = T.self) throws -> T {
guard let t = self._channelCore.tryUnwrapData(element, as: T.self) else {
throw WrongTypeError(expected: T.self, actual: type(of: self._channelCore.tryUnwrapData(element, as: Any.self)!))
}
return t
}

View File

@ -17,7 +17,7 @@ import NIOConcurrencyHelpers
import Dispatch
import _NIODataStructures
import NIOCore
import DequeModule
internal struct EmbeddedScheduledTask {
let id: UInt64
@ -282,6 +282,10 @@ class EmbeddedChannelCore: ChannelCore {
@usableFromInline
var outboundBuffer: CircularBuffer<NIOAny> = CircularBuffer()
/// Contains observers that want to consume the first element that would be appended to the `outboundBuffer`
@usableFromInline
var outboundBufferConsumer: Deque<(NIOAny) -> Void> = []
/// Contains the unflushed items that went into the `Channel`
@usableFromInline
var pendingOutboundBuffer: MarkedCircularBuffer<(NIOAny, EventLoopPromise<Void>?)> = MarkedCircularBuffer(initialCapacity: 16)
@ -291,6 +295,10 @@ class EmbeddedChannelCore: ChannelCore {
@usableFromInline
var inboundBuffer: CircularBuffer<NIOAny> = CircularBuffer()
/// Contains observers that want to consume the first element that would be appended to the `inboundBuffer`
@usableFromInline
var inboundBufferConsumer: Deque<(NIOAny) -> Void> = []
@usableFromInline
func localAddress0() throws -> SocketAddress {
self.eventLoop.preconditionInEventLoop()
@ -366,7 +374,11 @@ class EmbeddedChannelCore: ChannelCore {
self.pendingOutboundBuffer.mark()
while self.pendingOutboundBuffer.hasMark, let dataAndPromise = self.pendingOutboundBuffer.popFirst() {
self.addToBuffer(buffer: &self.outboundBuffer, data: dataAndPromise.0)
self.addToBuffer(
buffer: &self.outboundBuffer,
consumer: &self.outboundBufferConsumer,
data: dataAndPromise.0
)
dataAndPromise.1?.succeed(())
}
}
@ -385,7 +397,11 @@ class EmbeddedChannelCore: ChannelCore {
@usableFromInline
func channelRead0(_ data: NIOAny) {
self.eventLoop.preconditionInEventLoop()
addToBuffer(buffer: &inboundBuffer, data: data)
self.addToBuffer(
buffer: &self.inboundBuffer,
consumer: &self.inboundBufferConsumer,
data: data
)
}
public func errorCaught0(error: Error) {
@ -395,10 +411,18 @@ class EmbeddedChannelCore: ChannelCore {
}
}
private func addToBuffer<T>(buffer: inout CircularBuffer<T>, data: T) {
private func addToBuffer(
buffer: inout CircularBuffer<NIOAny>,
consumer: inout Deque<(NIOAny) -> Void>,
data: NIOAny
) {
self.eventLoop.preconditionInEventLoop()
if let consume = consumer.popFirst() {
consume(data)
} else {
buffer.append(data)
}
}
}
/// `EmbeddedChannel` is a `Channel` implementation that does neither any

View File

@ -30,6 +30,10 @@ extension AsyncTestingChannelTests {
("testSingleHandlerInit", testSingleHandlerInit),
("testEmptyInit", testEmptyInit),
("testMultipleHandlerInit", testMultipleHandlerInit),
("testWaitForInboundWrite", testWaitForInboundWrite),
("testWaitForMultipleInboundWritesInParallel", testWaitForMultipleInboundWritesInParallel),
("testWaitForOutboundWrite", testWaitForOutboundWrite),
("testWaitForMultipleOutboundWritesInParallel", testWaitForMultipleOutboundWritesInParallel),
("testWriteOutboundByteBuffer", testWriteOutboundByteBuffer),
("testWriteOutboundByteBufferMultipleTimes", testWriteOutboundByteBufferMultipleTimes),
("testWriteInboundByteBuffer", testWriteInboundByteBuffer),

View File

@ -70,6 +70,84 @@ class AsyncTestingChannelTests: XCTestCase {
}
}
func testWaitForInboundWrite() throws {
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
XCTAsyncTest {
let channel = NIOAsyncTestingChannel()
let task = Task {
try await XCTAsyncAssertEqual(try await channel.waitForInboundWrite(), 1)
try await XCTAsyncAssertEqual(try await channel.waitForInboundWrite(), 2)
try await XCTAsyncAssertEqual(try await channel.waitForInboundWrite(), 3)
}
try await channel.writeInbound(1)
try await channel.writeInbound(2)
try await channel.writeInbound(3)
try await task.value
}
}
func testWaitForMultipleInboundWritesInParallel() throws {
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
XCTAsyncTest {
let channel = NIOAsyncTestingChannel()
let task = Task {
let task1 = Task { try await channel.waitForInboundWrite(as: Int.self) }
let task2 = Task { try await channel.waitForInboundWrite(as: Int.self) }
let task3 = Task { try await channel.waitForInboundWrite(as: Int.self) }
try await XCTAsyncAssertEqual(Set([
try await task1.value,
try await task2.value,
try await task3.value,
]), [1, 2, 3])
}
try await channel.writeInbound(1)
try await channel.writeInbound(2)
try await channel.writeInbound(3)
try await task.value
}
}
func testWaitForOutboundWrite() throws {
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
XCTAsyncTest {
let channel = NIOAsyncTestingChannel()
let task = Task {
try await XCTAsyncAssertEqual(try await channel.waitForOutboundWrite(), 1)
try await XCTAsyncAssertEqual(try await channel.waitForOutboundWrite(), 2)
try await XCTAsyncAssertEqual(try await channel.waitForOutboundWrite(), 3)
}
try await channel.writeOutbound(1)
try await channel.writeOutbound(2)
try await channel.writeOutbound(3)
try await task.value
}
}
func testWaitForMultipleOutboundWritesInParallel() throws {
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
XCTAsyncTest {
let channel = NIOAsyncTestingChannel()
let task = Task {
let task1 = Task { try await channel.waitForOutboundWrite(as: Int.self) }
let task2 = Task { try await channel.waitForOutboundWrite(as: Int.self) }
let task3 = Task { try await channel.waitForOutboundWrite(as: Int.self) }
try await XCTAsyncAssertEqual(Set([
try await task1.value,
try await task2.value,
try await task3.value,
]), [1, 2, 3])
}
try await channel.writeOutbound(1)
try await channel.writeOutbound(2)
try await channel.writeOutbound(3)
try await task.value
}
}
func testWriteOutboundByteBuffer() throws {
guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() }
XCTAsyncTest {