swift-nio/Sources/NIOEmbedded/Embedded.swift

880 lines
34 KiB
Swift

//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2022 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Atomics
import NIOConcurrencyHelpers
import Dispatch
import _NIODataStructures
import NIOCore
import DequeModule
internal struct EmbeddedScheduledTask {
let id: UInt64
let task: () -> Void
let failFn: (Error) -> ()
let readyTime: NIODeadline
let insertOrder: UInt64
init(id: UInt64, readyTime: NIODeadline, insertOrder: UInt64, task: @escaping () -> Void, _ failFn: @escaping (Error) -> ()) {
self.id = id
self.readyTime = readyTime
self.insertOrder = insertOrder
self.task = task
self.failFn = failFn
}
func fail(_ error: Error) {
self.failFn(error)
}
}
extension EmbeddedScheduledTask: Comparable {
static func < (lhs: EmbeddedScheduledTask, rhs: EmbeddedScheduledTask) -> Bool {
if lhs.readyTime == rhs.readyTime {
return lhs.insertOrder < rhs.insertOrder
} else {
return lhs.readyTime < rhs.readyTime
}
}
static func == (lhs: EmbeddedScheduledTask, rhs: EmbeddedScheduledTask) -> Bool {
return lhs.id == rhs.id
}
}
/// An `EventLoop` that is embedded in the current running context with no external
/// control.
///
/// Unlike more complex `EventLoop`s, such as `SelectableEventLoop`, the `EmbeddedEventLoop`
/// has no proper eventing mechanism. Instead, reads and writes are fully controlled by the
/// entity that instantiates the `EmbeddedEventLoop`. This property makes `EmbeddedEventLoop`
/// of limited use for many application purposes, but highly valuable for testing and other
/// kinds of mocking.
///
/// Time is controllable on an `EmbeddedEventLoop`. It begins at `NIODeadline.uptimeNanoseconds(0)`
/// and may be advanced by a fixed amount by using `advanceTime(by:)`, or advanced to a point in
/// time with `advanceTime(to:)`.
///
/// - warning: Unlike `SelectableEventLoop`, `EmbeddedEventLoop` **is not thread-safe**. This
/// is because it is intended to be run in the thread that instantiated it. Users are
/// responsible for ensuring they never call into the `EmbeddedEventLoop` in an
/// unsynchronized fashion.
public final class EmbeddedEventLoop: EventLoop {
/// The current "time" for this event loop. This is an amount in nanoseconds.
/* private but tests */ internal var _now: NIODeadline = .uptimeNanoseconds(0)
private var scheduledTaskCounter: UInt64 = 0
private var scheduledTasks = PriorityQueue<EmbeddedScheduledTask>()
/// Keep track of where promises are allocated to ensure we can identify their source if they leak.
private var _promiseCreationStore: [_NIOEventLoopFutureIdentifier: (file: StaticString, line: UInt)] = [:]
// The number of the next task to be created. We track the order so that when we execute tasks
// scheduled at the same time, we may do so in the order in which they were submitted for
// execution.
private var taskNumber: UInt64 = 0
private func nextTaskNumber() -> UInt64 {
defer {
self.taskNumber += 1
}
return self.taskNumber
}
/// - see: `EventLoop.inEventLoop`
public var inEventLoop: Bool {
return true
}
/// Initialize a new `EmbeddedEventLoop`.
public init() { }
/// - see: `EventLoop.scheduleTask(deadline:_:)`
@discardableResult
public func scheduleTask<T>(deadline: NIODeadline, _ task: @escaping () throws -> T) -> Scheduled<T> {
let promise: EventLoopPromise<T> = makePromise()
self.scheduledTaskCounter += 1
let task = EmbeddedScheduledTask(id: self.scheduledTaskCounter, readyTime: deadline, insertOrder: self.nextTaskNumber(), task: {
do {
promise.succeed(try task())
} catch let err {
promise.fail(err)
}
}, promise.fail)
let taskId = task.id
let scheduled = Scheduled(promise: promise, cancellationTask: {
self.scheduledTasks.removeFirst { $0.id == taskId }
})
scheduledTasks.push(task)
return scheduled
}
/// - see: `EventLoop.scheduleTask(in:_:)`
@discardableResult
public func scheduleTask<T>(in: TimeAmount, _ task: @escaping () throws -> T) -> Scheduled<T> {
return scheduleTask(deadline: self._now + `in`, task)
}
/// On an `EmbeddedEventLoop`, `execute` will simply use `scheduleTask` with a deadline of _now_. This means that
/// `task` will be run the next time you call `EmbeddedEventLoop.run`.
public func execute(_ task: @escaping () -> Void) {
self.scheduleTask(deadline: self._now, task)
}
/// Run all tasks that have previously been submitted to this `EmbeddedEventLoop`, either by calling `execute` or
/// events that have been enqueued using `scheduleTask`/`scheduleRepeatedTask`/`scheduleRepeatedAsyncTask` and whose
/// deadlines have expired.
///
/// - seealso: `EmbeddedEventLoop.advanceTime`.
public func run() {
// Execute all tasks that are currently enqueued to be executed *now*.
self.advanceTime(to: self._now)
}
/// Runs the event loop and moves "time" forward by the given amount, running any scheduled
/// tasks that need to be run.
public func advanceTime(by increment: TimeAmount) {
self.advanceTime(to: self._now + increment)
}
/// Runs the event loop and moves "time" forward to the given point in time, running any scheduled
/// tasks that need to be run.
///
/// - Note: If `deadline` is before the current time, the current time will not be advanced.
public func advanceTime(to deadline: NIODeadline) {
let newTime = max(deadline, self._now)
while let nextTask = self.scheduledTasks.peek() {
guard nextTask.readyTime <= newTime else {
break
}
// Now we want to grab all tasks that are ready to execute at the same
// time as the first.
var tasks = Array<EmbeddedScheduledTask>()
while let candidateTask = self.scheduledTasks.peek(), candidateTask.readyTime == nextTask.readyTime {
tasks.append(candidateTask)
self.scheduledTasks.pop()
}
// Set the time correctly before we call into user code, then
// call in for all tasks.
self._now = nextTask.readyTime
for task in tasks {
task.task()
}
}
// Finally ensure we got the time right.
self._now = newTime
}
internal func drainScheduledTasksByRunningAllCurrentlyScheduledTasks() {
var currentlyScheduledTasks = self.scheduledTasks
while let nextTask = currentlyScheduledTasks.pop() {
self._now = nextTask.readyTime
nextTask.task()
}
// Just fail all the remaining scheduled tasks. Despite having run all the tasks that were
// scheduled when we entered the method this may still contain tasks as running the tasks
// may have enqueued more tasks.
while let task = self.scheduledTasks.pop() {
task.fail(EventLoopError.shutdown)
}
}
/// - see: `EventLoop.close`
func close() throws {
// Nothing to do here
}
/// - see: `EventLoop.shutdownGracefully`
public func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
run()
queue.sync {
callback(nil)
}
}
public func _preconditionSafeToWait(file: StaticString, line: UInt) {
// EmbeddedEventLoop always allows a wait, as waiting will essentially always block
// wait()
return
}
public func _promiseCreated(futureIdentifier: _NIOEventLoopFutureIdentifier, file: StaticString, line: UInt) {
precondition(_isDebugAssertConfiguration())
self._promiseCreationStore[futureIdentifier] = (file: file, line: line)
}
public func _promiseCompleted(futureIdentifier: _NIOEventLoopFutureIdentifier) -> (file: StaticString, line: UInt)? {
precondition(_isDebugAssertConfiguration())
return self._promiseCreationStore.removeValue(forKey: futureIdentifier)
}
public func _preconditionSafeToSyncShutdown(file: StaticString, line: UInt) {
// EmbeddedEventLoop always allows a sync shutdown.
return
}
deinit {
precondition(scheduledTasks.isEmpty, "Embedded event loop freed with unexecuted scheduled tasks!")
}
}
@usableFromInline
class EmbeddedChannelCore: ChannelCore {
var isOpen: Bool {
get {
return self._isOpen.load(ordering: .sequentiallyConsistent)
}
set {
self._isOpen.store(newValue, ordering: .sequentiallyConsistent)
}
}
var isActive: Bool {
get {
return self._isActive.load(ordering: .sequentiallyConsistent)
}
set {
self._isActive.store(newValue, ordering: .sequentiallyConsistent)
}
}
var allowRemoteHalfClosure: Bool {
get {
return self._allowRemoteHalfClosure.load(ordering: .sequentiallyConsistent)
}
set {
self._allowRemoteHalfClosure.store(newValue, ordering: .sequentiallyConsistent)
}
}
private let _isOpen = ManagedAtomic(true)
private let _isActive = ManagedAtomic(false)
private let _allowRemoteHalfClosure = ManagedAtomic(false)
let eventLoop: EventLoop
let closePromise: EventLoopPromise<Void>
var error: Optional<Error>
private let pipeline: ChannelPipeline
init(pipeline: ChannelPipeline, eventLoop: EventLoop) {
closePromise = eventLoop.makePromise()
self.pipeline = pipeline
self.eventLoop = eventLoop
self.error = nil
}
deinit {
assert(!self.isOpen && !self.isActive,
"leaked an open EmbeddedChannel, maybe forgot to call channel.finish()?")
isOpen = false
closePromise.succeed(())
}
/// Contains the flushed items that went into the `Channel` (and on a regular channel would have hit the network).
@usableFromInline
var outboundBuffer: CircularBuffer<NIOAny> = CircularBuffer()
/// Contains observers that want to consume the first element that would be appended to the `outboundBuffer`
@usableFromInline
var outboundBufferConsumer: Deque<(NIOAny) -> Void> = []
/// Contains the unflushed items that went into the `Channel`
@usableFromInline
var pendingOutboundBuffer: MarkedCircularBuffer<(NIOAny, EventLoopPromise<Void>?)> = MarkedCircularBuffer(initialCapacity: 16)
/// Contains the items that travelled the `ChannelPipeline` all the way and hit the tail channel handler. On a
/// regular `Channel` these items would be lost.
@usableFromInline
var inboundBuffer: CircularBuffer<NIOAny> = CircularBuffer()
/// Contains observers that want to consume the first element that would be appended to the `inboundBuffer`
@usableFromInline
var inboundBufferConsumer: Deque<(NIOAny) -> Void> = []
@usableFromInline
var localAddress: SocketAddress?
@usableFromInline
var remoteAddress: SocketAddress?
@usableFromInline
func localAddress0() throws -> SocketAddress {
self.eventLoop.preconditionInEventLoop()
if let localAddress = self.localAddress {
return localAddress
} else {
throw ChannelError.operationUnsupported
}
}
@usableFromInline
func remoteAddress0() throws -> SocketAddress {
self.eventLoop.preconditionInEventLoop()
if let remoteAddress = self.remoteAddress {
return remoteAddress
} else {
throw ChannelError.operationUnsupported
}
}
@usableFromInline
func close0(error: Error, mode: CloseMode, promise: EventLoopPromise<Void>?) {
self.eventLoop.preconditionInEventLoop()
guard self.isOpen else {
promise?.fail(ChannelError.alreadyClosed)
return
}
isOpen = false
isActive = false
promise?.succeed(())
// As we called register() in the constructor of EmbeddedChannel we also need to ensure we call unregistered here.
self.pipeline.syncOperations.fireChannelInactive()
self.pipeline.syncOperations.fireChannelUnregistered()
eventLoop.execute {
// ensure this is executed in a delayed fashion as the users code may still traverse the pipeline
self.removeHandlers(pipeline: self.pipeline)
self.closePromise.succeed(())
}
}
@usableFromInline
func bind0(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
self.eventLoop.preconditionInEventLoop()
promise?.succeed(())
}
@usableFromInline
func connect0(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
self.eventLoop.preconditionInEventLoop()
isActive = true
promise?.succeed(())
self.pipeline.syncOperations.fireChannelActive()
}
@usableFromInline
func register0(promise: EventLoopPromise<Void>?) {
self.eventLoop.preconditionInEventLoop()
promise?.succeed(())
self.pipeline.syncOperations.fireChannelRegistered()
}
@usableFromInline
func registerAlreadyConfigured0(promise: EventLoopPromise<Void>?) {
self.eventLoop.preconditionInEventLoop()
isActive = true
register0(promise: promise)
self.pipeline.syncOperations.fireChannelActive()
}
@usableFromInline
func write0(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
self.eventLoop.preconditionInEventLoop()
self.pendingOutboundBuffer.append((data, promise))
}
@usableFromInline
func flush0() {
self.eventLoop.preconditionInEventLoop()
self.pendingOutboundBuffer.mark()
while self.pendingOutboundBuffer.hasMark, let dataAndPromise = self.pendingOutboundBuffer.popFirst() {
self.addToBuffer(
buffer: &self.outboundBuffer,
consumer: &self.outboundBufferConsumer,
data: dataAndPromise.0
)
dataAndPromise.1?.succeed(())
}
}
@usableFromInline
func read0() {
self.eventLoop.preconditionInEventLoop()
// NOOP
}
public final func triggerUserOutboundEvent0(_ event: Any, promise: EventLoopPromise<Void>?) {
self.eventLoop.preconditionInEventLoop()
promise?.fail(ChannelError.operationUnsupported)
}
@usableFromInline
func channelRead0(_ data: NIOAny) {
self.eventLoop.preconditionInEventLoop()
self.addToBuffer(
buffer: &self.inboundBuffer,
consumer: &self.inboundBufferConsumer,
data: data
)
}
public func errorCaught0(error: Error) {
self.eventLoop.preconditionInEventLoop()
if self.error == nil {
self.error = error
}
}
private func addToBuffer(
buffer: inout CircularBuffer<NIOAny>,
consumer: inout Deque<(NIOAny) -> Void>,
data: NIOAny
) {
self.eventLoop.preconditionInEventLoop()
if let consume = consumer.popFirst() {
consume(data)
} else {
buffer.append(data)
}
}
}
/// `EmbeddedChannel` is a `Channel` implementation that does neither any
/// actual IO nor has a proper eventing mechanism. The prime use-case for
/// `EmbeddedChannel` is in unit tests when you want to feed the inbound events
/// and check the outbound events manually.
///
/// Please remember to call `finish()` when you are no longer using this
/// `EmbeddedChannel`.
///
/// To feed events through an `EmbeddedChannel`'s `ChannelPipeline` use
/// `EmbeddedChannel.writeInbound` which accepts data of any type. It will then
/// forward that data through the `ChannelPipeline` and the subsequent
/// `ChannelInboundHandler` will receive it through the usual `channelRead`
/// event. The user is responsible for making sure the first
/// `ChannelInboundHandler` expects data of that type.
///
/// `EmbeddedChannel` automatically collects arriving outbound data and makes it
/// available one-by-one through `readOutbound`.
///
/// - note: `EmbeddedChannel` is currently only compatible with
/// `EmbeddedEventLoop`s and cannot be used with `SelectableEventLoop`s from
/// for example `MultiThreadedEventLoopGroup`.
/// - warning: Unlike other `Channel`s, `EmbeddedChannel` **is not thread-safe**. This
/// is because it is intended to be run in the thread that instantiated it. Users are
/// responsible for ensuring they never call into an `EmbeddedChannel` in an
/// unsynchronized fashion. `EmbeddedEventLoop`s notes also apply as
/// `EmbeddedChannel` uses an `EmbeddedEventLoop` as its `EventLoop`.
public final class EmbeddedChannel: Channel {
/// `LeftOverState` represents any left-over inbound, outbound, and pending outbound events that hit the
/// `EmbeddedChannel` and were not consumed when `finish` was called on the `EmbeddedChannel`.
///
/// `EmbeddedChannel` is most useful in testing and usually in unit tests, you want to consume all inbound and
/// outbound data to verify they are what you expect. Therefore, when you `finish` an `EmbeddedChannel` it will
/// return if it's either `.clean` (no left overs) or that it has `.leftOvers`.
public enum LeftOverState {
/// The `EmbeddedChannel` is clean, ie. no inbound, outbound, or pending outbound data left on `finish`.
case clean
/// The `EmbeddedChannel` has inbound, outbound, or pending outbound data left on `finish`.
case leftOvers(inbound: [NIOAny], outbound: [NIOAny], pendingOutbound: [NIOAny])
/// `true` if the `EmbeddedChannel` was `clean` on `finish`, ie. there is no unconsumed inbound, outbound, or
/// pending outbound data left on the `Channel`.
public var isClean: Bool {
if case .clean = self {
return true
} else {
return false
}
}
/// `true` if the `EmbeddedChannel` if there was unconsumed inbound, outbound, or pending outbound data left
/// on the `Channel` when it was `finish`ed.
public var hasLeftOvers: Bool {
return !self.isClean
}
}
/// `BufferState` represents the state of either the inbound, or the outbound `EmbeddedChannel` buffer. These
/// buffers contain data that travelled the `ChannelPipeline` all the way.
///
/// If the last `ChannelHandler` explicitly (by calling `fireChannelRead`) or implicitly (by not implementing
/// `channelRead`) sends inbound data into the end of the `EmbeddedChannel`, it will be held in the
/// `EmbeddedChannel`'s inbound buffer. Similarly for `write` on the outbound side. The state of the respective
/// buffer will be returned from `writeInbound`/`writeOutbound` as a `BufferState`.
public enum BufferState {
/// The buffer is empty.
case empty
/// The buffer is non-empty.
case full([NIOAny])
/// Returns `true` is the buffer was empty.
public var isEmpty: Bool {
if case .empty = self {
return true
} else {
return false
}
}
/// Returns `true` if the buffer was non-empty.
public var isFull: Bool {
return !self.isEmpty
}
}
/// `WrongTypeError` is throws if you use `readInbound` or `readOutbound` and request a certain type but the first
/// item in the respective buffer is of a different type.
public struct WrongTypeError: Error, Equatable {
/// The type you expected.
public let expected: Any.Type
/// The type of the actual first element.
public let actual: Any.Type
public init(expected: Any.Type, actual: Any.Type) {
self.expected = expected
self.actual = actual
}
public static func == (lhs: WrongTypeError, rhs: WrongTypeError) -> Bool {
return lhs.expected == rhs.expected && lhs.actual == rhs.actual
}
}
/// Returns `true` if the `EmbeddedChannel` is 'active'.
///
/// An active `EmbeddedChannel` can be closed by calling `close` or `finish` on the `EmbeddedChannel`.
///
/// - note: An `EmbeddedChannel` starts _inactive_ and can be activated, for example by calling `connect`.
public var isActive: Bool { return channelcore.isActive }
/// - see: `ChannelOptions.Types.AllowRemoteHalfClosureOption`
public var allowRemoteHalfClosure: Bool {
get {
return channelcore.allowRemoteHalfClosure
}
set {
channelcore.allowRemoteHalfClosure = newValue
}
}
/// - see: `Channel.closeFuture`
public var closeFuture: EventLoopFuture<Void> { return channelcore.closePromise.futureResult }
@usableFromInline
/*private but usableFromInline */ lazy var channelcore: EmbeddedChannelCore = EmbeddedChannelCore(pipeline: self._pipeline, eventLoop: self.eventLoop)
/// - see: `Channel._channelCore`
public var _channelCore: ChannelCore {
return channelcore
}
/// - see: `Channel.pipeline`
public var pipeline: ChannelPipeline {
return _pipeline
}
/// - see: `Channel.isWritable`
public var isWritable: Bool = true
/// Synchronously closes the `EmbeddedChannel`.
///
/// Errors in the `EmbeddedChannel` can be consumed using `throwIfErrorCaught`.
///
/// - parameters:
/// - acceptAlreadyClosed: Whether `finish` should throw if the `EmbeddedChannel` has been previously `close`d.
/// - returns: The `LeftOverState` of the `EmbeddedChannel`. If all the inbound and outbound events have been
/// consumed (using `readInbound` / `readOutbound`) and there are no pending outbound events (unflushed
/// writes) this will be `.clean`. If there are any unconsumed inbound, outbound, or pending outbound
/// events, the `EmbeddedChannel` will returns those as `.leftOvers(inbound:outbound:pendingOutbound:)`.
public func finish(acceptAlreadyClosed: Bool) throws -> LeftOverState {
do {
try close().wait()
} catch let error as ChannelError {
guard error == .alreadyClosed && acceptAlreadyClosed else {
throw error
}
}
self.embeddedEventLoop.drainScheduledTasksByRunningAllCurrentlyScheduledTasks()
self.embeddedEventLoop.run()
try throwIfErrorCaught()
let c = self.channelcore
if c.outboundBuffer.isEmpty && c.inboundBuffer.isEmpty && c.pendingOutboundBuffer.isEmpty {
return .clean
} else {
return .leftOvers(inbound: Array(c.inboundBuffer),
outbound: Array(c.outboundBuffer),
pendingOutbound: c.pendingOutboundBuffer.map { $0.0 })
}
}
/// Synchronously closes the `EmbeddedChannel`.
///
/// This method will throw if the `Channel` hit any unconsumed errors or if the `close` fails. Errors in the
/// `EmbeddedChannel` can be consumed using `throwIfErrorCaught`.
///
/// - returns: The `LeftOverState` of the `EmbeddedChannel`. If all the inbound and outbound events have been
/// consumed (using `readInbound` / `readOutbound`) and there are no pending outbound events (unflushed
/// writes) this will be `.clean`. If there are any unconsumed inbound, outbound, or pending outbound
/// events, the `EmbeddedChannel` will returns those as `.leftOvers(inbound:outbound:pendingOutbound:)`.
public func finish() throws -> LeftOverState {
return try self.finish(acceptAlreadyClosed: false)
}
private var _pipeline: ChannelPipeline!
/// - see: `Channel.allocator`
public var allocator: ByteBufferAllocator = ByteBufferAllocator()
/// - see: `Channel.eventLoop`
public var eventLoop: EventLoop {
return self.embeddedEventLoop
}
/// Returns the `EmbeddedEventLoop` that this `EmbeddedChannel` uses. This will return the same instance as
/// `EmbeddedChannel.eventLoop` but as the concrete `EmbeddedEventLoop` rather than as `EventLoop` existential.
public var embeddedEventLoop: EmbeddedEventLoop = EmbeddedEventLoop()
/// - see: `Channel.localAddress`
public var localAddress: SocketAddress? {
get {
self.channelcore.localAddress
}
set {
self.channelcore.localAddress = newValue
}
}
/// - see: `Channel.remoteAddress`
public var remoteAddress: SocketAddress? {
get {
self.channelcore.remoteAddress
}
set {
self.channelcore.remoteAddress = newValue
}
}
/// `nil` because `EmbeddedChannel`s don't have parents.
public let parent: Channel? = nil
/// If available, this method reads one element of type `T` out of the `EmbeddedChannel`'s outbound buffer. If the
/// first element was of a different type than requested, `EmbeddedChannel.WrongTypeError` will be thrown, if there
/// are no elements in the outbound buffer, `nil` will be returned.
///
/// Data hits the `EmbeddedChannel`'s outbound buffer when data was written using `write`, then `flush`ed, and
/// then travelled the `ChannelPipeline` all the way too the front. For data to hit the outbound buffer, the very
/// first `ChannelHandler` must have written and flushed it either explicitly (by calling
/// `ChannelHandlerContext.write` and `flush`) or implicitly by not implementing `write`/`flush`.
///
/// - note: Outbound events travel the `ChannelPipeline` _back to front_.
/// - note: `EmbeddedChannel.writeOutbound` will `write` data through the `ChannelPipeline`, starting with last
/// `ChannelHandler`.
@inlinable
public func readOutbound<T>(as type: T.Type = T.self) throws -> T? {
return try _readFromBuffer(buffer: &channelcore.outboundBuffer)
}
/// If available, this method reads one element of type `T` out of the `EmbeddedChannel`'s inbound buffer. If the
/// first element was of a different type than requested, `EmbeddedChannel.WrongTypeError` will be thrown, if there
/// are no elements in the outbound buffer, `nil` will be returned.
///
/// Data hits the `EmbeddedChannel`'s inbound buffer when data was send through the pipeline using `fireChannelRead`
/// and then travelled the `ChannelPipeline` all the way too the back. For data to hit the inbound buffer, the
/// last `ChannelHandler` must have send the event either explicitly (by calling
/// `ChannelHandlerContext.fireChannelRead`) or implicitly by not implementing `channelRead`.
///
/// - note: `EmbeddedChannel.writeInbound` will fire data through the `ChannelPipeline` using `fireChannelRead`.
@inlinable
public func readInbound<T>(as type: T.Type = T.self) throws -> T? {
return try _readFromBuffer(buffer: &channelcore.inboundBuffer)
}
/// Sends an inbound `channelRead` event followed by a `channelReadComplete` event through the `ChannelPipeline`.
///
/// The immediate effect being that the first `ChannelInboundHandler` will get its `channelRead` method called
/// with the data you provide.
///
/// - parameters:
/// - data: The data to fire through the pipeline.
/// - returns: The state of the inbound buffer which contains all the events that travelled the `ChannelPipeline`
// all the way.
@inlinable
@discardableResult public func writeInbound<T>(_ data: T) throws -> BufferState {
pipeline.fireChannelRead(NIOAny(data))
pipeline.fireChannelReadComplete()
try throwIfErrorCaught()
return self.channelcore.inboundBuffer.isEmpty ? .empty : .full(Array(self.channelcore.inboundBuffer))
}
/// Sends an outbound `writeAndFlush` event through the `ChannelPipeline`.
///
/// The immediate effect being that the first `ChannelOutboundHandler` will get its `write` method called
/// with the data you provide. Note that the first `ChannelOutboundHandler` in the pipeline is the _last_ handler
/// because outbound events travel the pipeline from back to front.
///
/// - parameters:
/// - data: The data to fire through the pipeline.
/// - returns: The state of the outbound buffer which contains all the events that travelled the `ChannelPipeline`
// all the way.
@inlinable
@discardableResult public func writeOutbound<T>(_ data: T) throws -> BufferState {
try writeAndFlush(NIOAny(data)).wait()
return self.channelcore.outboundBuffer.isEmpty ? .empty : .full(Array(self.channelcore.outboundBuffer))
}
/// This method will throw the error that is stored in the `EmbeddedChannel` if any.
///
/// The `EmbeddedChannel` will store an error some error travels the `ChannelPipeline` all the way past its end.
public func throwIfErrorCaught() throws {
if let error = channelcore.error {
channelcore.error = nil
throw error
}
}
@inlinable
func _readFromBuffer<T>(buffer: inout CircularBuffer<NIOAny>) throws -> T? {
if buffer.isEmpty {
return nil
}
let elem = buffer.removeFirst()
guard let t = self._channelCore.tryUnwrapData(elem, as: T.self) else {
throw WrongTypeError(expected: T.self, actual: type(of: self._channelCore.tryUnwrapData(elem, as: Any.self)!))
}
return t
}
/// Create a new instance.
///
/// During creation it will automatically also register itself on the `EmbeddedEventLoop`.
///
/// - parameters:
/// - handler: The `ChannelHandler` to add to the `ChannelPipeline` before register or `nil` if none should be added.
/// - loop: The `EmbeddedEventLoop` to use.
public convenience init(handler: ChannelHandler? = nil, loop: EmbeddedEventLoop = EmbeddedEventLoop()) {
let handlers = handler.map { [$0] } ?? []
self.init(handlers: handlers, loop: loop)
}
/// Create a new instance.
///
/// During creation it will automatically also register itself on the `EmbeddedEventLoop`.
///
/// - parameters:
/// - handlers: The `ChannelHandler`s to add to the `ChannelPipeline` before register.
/// - loop: The `EmbeddedEventLoop` to use.
public init(handlers: [ChannelHandler], loop: EmbeddedEventLoop = EmbeddedEventLoop()) {
self.embeddedEventLoop = loop
self._pipeline = ChannelPipeline(channel: self)
try! self._pipeline.syncOperations.addHandlers(handlers)
// This will never throw...
try! register().wait()
}
/// - see: `Channel.setOption`
@inlinable
public func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) -> EventLoopFuture<Void> {
self.setOptionSync(option, value: value)
return self.eventLoop.makeSucceededVoidFuture()
}
@inlinable
internal func setOptionSync<Option: ChannelOption>(_ option: Option, value: Option.Value) {
if option is ChannelOptions.Types.AllowRemoteHalfClosureOption {
self.allowRemoteHalfClosure = value as! Bool
return
}
// No other options supported
fatalError("option not supported")
}
/// - see: `Channel.getOption`
@inlinable
public func getOption<Option: ChannelOption>(_ option: Option) -> EventLoopFuture<Option.Value> {
return self.eventLoop.makeSucceededFuture(self.getOptionSync(option))
}
@inlinable
internal func getOptionSync<Option: ChannelOption>(_ option: Option) -> Option.Value {
if option is ChannelOptions.Types.AutoReadOption {
return true as! Option.Value
}
if option is ChannelOptions.Types.AllowRemoteHalfClosureOption {
return self.allowRemoteHalfClosure as! Option.Value
}
fatalError("option \(option) not supported")
}
/// Fires the (outbound) `bind` event through the `ChannelPipeline`. If the event hits the `EmbeddedChannel` which
/// happens when it travels the `ChannelPipeline` all the way to the front, this will also set the
/// `EmbeddedChannel`'s `localAddress`.
///
/// - parameters:
/// - address: The address to fake-bind to.
/// - promise: The `EventLoopPromise` which will be fulfilled when the fake-bind operation has been done.
public func bind(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
promise?.futureResult.whenSuccess {
self.localAddress = address
}
pipeline.bind(to: address, promise: promise)
}
/// Fires the (outbound) `connect` event through the `ChannelPipeline`. If the event hits the `EmbeddedChannel`
/// which happens when it travels the `ChannelPipeline` all the way to the front, this will also set the
/// `EmbeddedChannel`'s `remoteAddress`.
///
/// - parameters:
/// - address: The address to fake-bind to.
/// - promise: The `EventLoopPromise` which will be fulfilled when the fake-bind operation has been done.
public func connect(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
promise?.futureResult.whenSuccess {
self.remoteAddress = address
}
pipeline.connect(to: address, promise: promise)
}
}
extension EmbeddedChannel {
public struct SynchronousOptions: NIOSynchronousChannelOptions {
@usableFromInline
internal let channel: EmbeddedChannel
fileprivate init(channel: EmbeddedChannel) {
self.channel = channel
}
@inlinable
public func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) throws {
self.channel.setOptionSync(option, value: value)
}
@inlinable
public func getOption<Option: ChannelOption>(_ option: Option) throws -> Option.Value {
return self.channel.getOptionSync(option)
}
}
public final var syncOptions: NIOSynchronousChannelOptions? {
return SynchronousOptions(channel: self)
}
}
@available(*, unavailable)
extension EmbeddedChannel.SynchronousOptions: Sendable {}