swift-nio/Sources/NIOCore/Codec.swift

825 lines
35 KiB
Swift

//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
/// State of the current decoding process.
public enum DecodingState: Sendable {
/// Continue decoding.
case `continue`
/// Stop decoding until more data is ready to be processed.
case needMoreData
}
/// Common errors thrown by `ByteToMessageDecoder`s.
public enum ByteToMessageDecoderError: Error {
/// More data has been received by a `ByteToMessageHandler` despite the fact that an error has previously been
/// emitted. The associated `Error` is the error previously emitted and the `ByteBuffer` is the extra data that has
/// been received. The common cause for this error to be emitted is the user not having torn down the `Channel`
/// after previously an `Error` has been sent through the pipeline using `fireErrorCaught`.
case dataReceivedInErrorState(Error, ByteBuffer)
/// This error can be thrown by `ByteToMessageDecoder`s if there was unexpectedly some left-over data when the
/// `ByteToMessageDecoder` was removed from the pipeline or the `Channel` was closed.
case leftoverDataWhenDone(ByteBuffer)
}
extension ByteToMessageDecoderError {
// TODO: For NIO 3, make this an enum case (or whatever best way for Errors we have come up with).
/// This error can be thrown by `ByteToMessageDecoder`s if the incoming payload is larger than the max specified.
public struct PayloadTooLargeError: Error {
public init() {}
}
}
/// `ByteToMessageDecoder`s decode bytes in a stream-like fashion from `ByteBuffer` to another message type.
///
/// ### Purpose
///
/// A `ByteToMessageDecoder` provides a simplified API for handling streams of incoming data that can be broken
/// up into messages. This API boils down to two methods: `decode`, and `decodeLast`. These two methods, when
/// implemented, will be used by a `ByteToMessageHandler` paired with a `ByteToMessageDecoder` to decode the
/// incoming byte stream into a sequence of messages.
///
/// The reason this helper exists is to smooth away some of the boilerplate and edge case handling code that
/// is often necessary when implementing parsers in a SwiftNIO `ChannelPipeline`. A `ByteToMessageDecoder`
/// never needs to worry about how inbound bytes will be buffered, as `ByteToMessageHandler` deals with that
/// automatically. A `ByteToMessageDecoder` also never needs to worry about memory exclusivity violations
/// that can occur when re-entrant `ChannelPipeline` operations occur, as `ByteToMessageHandler` will deal with
/// those as well.
///
/// ### Implementing ByteToMessageDecoder
///
/// A type that implements `ByteToMessageDecoder` may implement two methods: decode and decodeLast. Implementations
/// must implement decode: if they do not implement decodeLast, a default implementation will be used that
/// simply calls decode.
///
/// `decode` is the main decoding method, and is the one that will be called most often. `decode` is invoked
/// whenever data is received by the wrapping `ByteToMessageHandler`. It is invoked with a `ByteBuffer` containing
/// all the received data (including any data previously buffered), as well as a `ChannelHandlerContext` that can be
/// used in the `decode` function.
///
/// `decode` is called in a loop by the `ByteToMessageHandler`. This loop continues until one of two cases occurs:
///
/// 1. The input `ByteBuffer` has no more readable bytes (i.e. `.readableBytes == 0`); OR
/// 2. The `decode` method returns `.needMoreData`.
///
/// The reason this method is invoked in a loop is to ensure that the stream-like properties of inbound data are
/// respected. It is entirely possible for `ByteToMessageDecoder` to receive either fewer bytes than a single message,
/// or multiple messages in one go. Rather than have the `ByteToMessageDecoder` handle all of the complexity of this,
/// the logic can be boiled down to a single choice: has the `ByteToMessageDecoder` been able to move the state forward
/// or not? If it has, rather than containing an internal loop it may simply return `.continue` in order to request that
/// `decode` be invoked again immediately. If it has not, it can return `.needMoreData` to ask to be left alone until more
/// data has been returned from the network.
///
/// Essentially, if the next parsing step could not be taken because there wasn't enough data available, return `.needMoreData`.
/// Otherwise, return `.continue`. This will allow a `ByteToMessageDecoder` implementation to ignore the awkward way data
/// arrives from the network, and to just treat it as a series of `decode` calls.
///
/// `decodeLast` is a cousin of `decode`. It is also called in a loop, but unlike with `decode` this loop will only ever
/// occur once: when the `ChannelHandlerContext` belonging to this `ByteToMessageDecoder` is about to become invalidated.
/// This invalidation happens in two situations: when EOF is received from the network, or when the `ByteToMessageDecoder`
/// is being removed from the `ChannelPipeline`. The distinction between these two states is captured by the value of
/// `seenEOF`.
///
/// In this condition, the `ByteToMessageDecoder` must now produce any final messages it can with the bytes it has
/// available. In protocols where EOF is used as a message delimiter, having `decodeLast` called with `seenEOF == true`
/// may produce further messages. In other cases, `decodeLast` may choose to deliver any buffered bytes as "leftovers",
/// either in error messages or via `channelRead`. This can occur if, for example, a protocol upgrade is occurring.
///
/// As with `decode`, `decodeLast` is invoked in a loop. This allows the same simplification as `decode` allows: when
/// a message is completely parsed, the `decodeLast` function can return `.continue` and be re-invoked from the top,
/// rather than containing an internal loop.
///
/// Note that the value of `seenEOF` may change between calls to `decodeLast` in some rare situations.
///
/// ### Implementers Notes
///
/// /// `ByteToMessageHandler` will turn your `ByteToMessageDecoder` into a `ChannelInboundHandler`. `ByteToMessageHandler`
/// also solves a couple of tricky issues for you. Most importantly, in a `ByteToMessageDecoder` you do _not_ need to
/// worry about re-entrancy. Your code owns the passed-in `ByteBuffer` for the duration of the `decode`/`decodeLast` call and
/// can modify it at will.
///
/// If a custom frame decoder is required, then one needs to be careful when implementing
/// one with `ByteToMessageDecoder`. Ensure there are enough bytes in the buffer for a
/// complete frame by checking `buffer.readableBytes`. If there are not enough bytes
/// for a complete frame, return without modifying the reader index to allow more bytes to arrive.
///
/// To check for complete frames without modifying the reader index, use methods like `buffer.getInteger`.
/// You _MUST_ use the reader index when using methods like `buffer.getInteger`.
/// For example calling `buffer.getInteger(at: 0)` is assuming the frame starts at the beginning of the buffer, which
/// is not always the case. Use `buffer.getInteger(at: buffer.readerIndex)` instead.
///
/// If you move the reader index forward, either manually or by using one of `buffer.read*` methods, you must ensure
/// that you no longer need to see those bytes again as they will not be returned to you the next time `decode` is
/// called. If you still need those bytes to come back, consider taking a local copy of buffer inside the function to
/// perform your read operations on.
///
/// The `ByteBuffer` passed in as `buffer` is a slice of a larger buffer owned by the `ByteToMessageDecoder`
/// implementation. Some aspects of this buffer are preserved across calls to `decode`, meaning that any changes to
/// those properties you make in your `decode` method will be reflected in the next call to decode. In particular,
/// moving the reader index forward persists across calls. When your method returns, if the reader index has advanced,
/// those bytes are considered "consumed" and will not be available in future calls to `decode`.
/// Please note, however, that the numerical value of the `readerIndex` itself is not preserved, and may not be the same
/// from one call to the next. Please do not rely on this numerical value: if you need
/// to recall where a byte is relative to the `readerIndex`, use an offset rather than an absolute value.
///
/// ### Using ByteToMessageDecoder
///
/// To add a `ByteToMessageDecoder` to the `ChannelPipeline` use
///
/// channel.pipeline.addHandler(ByteToMessageHandler(MyByteToMessageDecoder()))
///
public protocol ByteToMessageDecoder {
/// The type of the messages this `ByteToMessageDecoder` decodes to.
associatedtype InboundOut
/// Decode from a `ByteBuffer`.
///
/// This method will be called in a loop until either the input `ByteBuffer` has nothing to read left or
/// `DecodingState.needMoreData` is returned. If `DecodingState.continue` is returned and the `ByteBuffer`
/// contains more readable bytes, this method will immediately be invoked again, unless `decodeLast` needs
/// to be invoked instead.
///
/// - parameters:
/// - context: The `ChannelHandlerContext` which this `ByteToMessageDecoder` belongs to.
/// - buffer: The `ByteBuffer` from which we decode.
/// - returns: `DecodingState.continue` if we should continue calling this method or `DecodingState.needMoreData` if it should be called
/// again once more data is present in the `ByteBuffer`.
mutating func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState
/// Decode from a `ByteBuffer` when no more data is incoming and the `ByteToMessageDecoder` is about to leave
/// the pipeline.
///
/// This method is called in a loop only once, when the `ChannelHandlerContext` goes inactive (i.e. when `channelInactive` is fired or
/// the `ByteToMessageDecoder` is removed from the pipeline).
///
/// Like with `decode`, this method will be called in a loop until either `DecodingState.needMoreData` is returned from the method
/// or until the input `ByteBuffer` has no more readable bytes. If `DecodingState.continue` is returned and the `ByteBuffer`
/// contains more readable bytes, this method will immediately be invoked again.
///
/// - parameters:
/// - context: The `ChannelHandlerContext` which this `ByteToMessageDecoder` belongs to.
/// - buffer: The `ByteBuffer` from which we decode.
/// - seenEOF: `true` if EOF has been seen. Usually if this is `false` the handler has been removed.
/// - returns: `DecodingState.continue` if we should continue calling this method or `DecodingState.needMoreData` if it should be called
/// again when more data is present in the `ByteBuffer`.
mutating func decodeLast(context: ChannelHandlerContext, buffer: inout ByteBuffer, seenEOF: Bool) throws -> DecodingState
/// Called once this `ByteToMessageDecoder` is removed from the `ChannelPipeline`.
///
/// - parameters:
/// - context: The `ChannelHandlerContext` which this `ByteToMessageDecoder` belongs to.
mutating func decoderRemoved(context: ChannelHandlerContext)
/// Called when this `ByteToMessageDecoder` is added to the `ChannelPipeline`.
///
/// - parameters:
/// - context: The `ChannelHandlerContext` which this `ByteToMessageDecoder` belongs to.
mutating func decoderAdded(context: ChannelHandlerContext)
/// Determine if the read bytes in the given `ByteBuffer` should be reclaimed and their associated memory freed.
/// Be aware that reclaiming memory may involve memory copies and so is not free.
///
/// - parameters:
/// - buffer: The `ByteBuffer` to check
/// - return: `true` if memory should be reclaimed, `false` otherwise.
mutating func shouldReclaimBytes(buffer: ByteBuffer) -> Bool
}
/// Some `ByteToMessageDecoder`s need to observe `write`s (which are outbound events). `ByteToMessageDecoder`s which
/// implement the `WriteObservingByteToMessageDecoder` protocol will be notified about every outbound write.
///
/// `WriteObservingByteToMessageDecoder` may only observe a `write` and must not try to transform or block it in any
/// way. After the `write` method returns the `write` will be forwarded to the next outbound handler.
public protocol WriteObservingByteToMessageDecoder: ByteToMessageDecoder {
/// The type of `write`s.
associatedtype OutboundIn
/// `write` is called for every incoming `write` incoming to the corresponding `ByteToMessageHandler`.
///
/// - parameters:
/// - data: The data that was written.
mutating func write(data: OutboundIn)
}
extension ByteToMessageDecoder {
public mutating func decoderRemoved(context: ChannelHandlerContext) {
}
public mutating func decoderAdded(context: ChannelHandlerContext) {
}
/// Default implementation to detect once bytes should be reclaimed.
public func shouldReclaimBytes(buffer: ByteBuffer) -> Bool {
// We want to reclaim in the following cases:
//
// 1. If there is at least 2kB of memory to reclaim
// 2. If the buffer is more than 50% reclaimable memory and is at least
// 1kB in size.
if buffer.readerIndex >= 2048 {
return true
}
return buffer.storageCapacity > 1024 && (buffer.storageCapacity - buffer.readerIndex) < buffer.readerIndex
}
public func wrapInboundOut(_ value: InboundOut) -> NIOAny {
return NIOAny(value)
}
public mutating func decodeLast(context: ChannelHandlerContext, buffer: inout ByteBuffer, seenEOF: Bool) throws -> DecodingState {
while try self.decode(context: context, buffer: &buffer) == .continue {}
return .needMoreData
}
}
private struct B2MDBuffer {
/// `B2MDBuffer`'s internal state, either we're already processing a buffer or we're ready to.
private enum State {
case processingInProgress
case ready
}
/// Can we produce a buffer to be processed right now or not?
enum BufferAvailability {
/// No, because no bytes available
case nothingAvailable
/// No, because we're already processing one
case bufferAlreadyBeingProcessed
/// Yes please, here we go.
case available(ByteBuffer)
}
/// Result of a try to process a buffer.
enum BufferProcessingResult {
/// Could not process a buffer because we are already processing one on the same call stack.
case cannotProcessReentrantly
/// Yes, we did process some.
case didProcess(DecodingState)
}
private var state: State = .ready
private var buffers: CircularBuffer<ByteBuffer> = CircularBuffer(initialCapacity: 4)
private let emptyByteBuffer: ByteBuffer
init(emptyByteBuffer: ByteBuffer) {
assert(emptyByteBuffer.readableBytes == 0)
self.emptyByteBuffer = emptyByteBuffer
}
}
// MARK: B2MDBuffer Main API
extension B2MDBuffer {
/// Start processing some bytes if possible, if we receive a returned buffer (through `.available(ByteBuffer)`)
/// we _must_ indicate the processing has finished by calling `finishProcessing`.
mutating func startProcessing(allowEmptyBuffer: Bool) -> BufferAvailability {
switch self.state {
case .processingInProgress:
return .bufferAlreadyBeingProcessed
case .ready where self.buffers.count > 0:
var buffer = self.buffers.removeFirst()
buffer.writeBuffers(self.buffers)
self.buffers.removeAll(keepingCapacity: self.buffers.capacity < 16) // don't grow too much
if buffer.readableBytes > 0 || allowEmptyBuffer {
self.state = .processingInProgress
return .available(buffer)
} else {
return .nothingAvailable
}
case .ready:
assert(self.buffers.isEmpty)
if allowEmptyBuffer {
self.state = .processingInProgress
return .available(self.emptyByteBuffer)
}
return .nothingAvailable
}
}
mutating func finishProcessing(remainder buffer: inout ByteBuffer) -> Void {
assert(self.state == .processingInProgress)
self.state = .ready
if buffer.readableBytes == 0 && self.buffers.isEmpty {
// fast path, no bytes left and no other buffers, just return
return
}
if buffer.readableBytes > 0 {
self.buffers.prepend(buffer)
} else {
buffer.discardReadBytes()
buffer.writeBuffers(self.buffers)
self.buffers.removeAll(keepingCapacity: self.buffers.capacity < 16) // don't grow too much
self.buffers.append(buffer)
}
}
mutating func append(buffer: ByteBuffer) {
if buffer.readableBytes > 0 {
self.buffers.append(buffer)
}
}
}
// MARK: B2MDBuffer Helpers
private extension ByteBuffer {
mutating func writeBuffers(_ buffers: CircularBuffer<ByteBuffer>) {
guard buffers.count > 0 else {
return
}
var requiredCapacity: Int = self.writerIndex
for buffer in buffers {
requiredCapacity += buffer.readableBytes
}
self.reserveCapacity(requiredCapacity)
for var buffer in buffers {
self.writeBuffer(&buffer)
}
}
}
private extension B2MDBuffer {
func _testOnlyOneBuffer() -> ByteBuffer? {
switch self.buffers.count {
case 0:
return nil
case 1:
return self.buffers.first
default:
let firstIndex = self.buffers.startIndex
var firstBuffer = self.buffers[firstIndex]
for var buffer in self.buffers[self.buffers.index(after: firstIndex)...] {
firstBuffer.writeBuffer(&buffer)
}
return firstBuffer
}
}
}
/// A handler which turns a given `ByteToMessageDecoder` into a `ChannelInboundHandler` that can then be added to a
/// `ChannelPipeline`.
///
/// Most importantly, `ByteToMessageHandler` handles the tricky buffer management for you and flattens out all
/// re-entrancy on `channelRead` that may happen in the `ChannelPipeline`.
public final class ByteToMessageHandler<Decoder: ByteToMessageDecoder> {
public typealias InboundIn = ByteBuffer
public typealias InboundOut = Decoder.InboundOut
private enum DecodeMode {
/// This is a usual decode, ie. not the last chunk
case normal
/// Last chunk
case last
}
private enum RemovalState {
/// Not added to any `ChannelPipeline` yet.
case notAddedToPipeline
/// No one tried to remove this handler.
case notBeingRemoved
/// The user-triggered removal has been started but isn't complete yet. This state will not be entered if the
/// removal is triggered by Channel teardown.
case removalStarted
/// The user-triggered removal is complete. This state will not be entered if the removal is triggered by
/// Channel teardown.
case removalCompleted
/// This handler has been removed from the pipeline.
case handlerRemovedCalled
}
private enum State {
case active
case leftoversNeedProcessing
case done
case error(Error)
var isError: Bool {
switch self {
case .active, .leftoversNeedProcessing, .done:
return false
case .error:
return true
}
}
var isFinalState: Bool {
switch self {
case .active, .leftoversNeedProcessing:
return false
case .done, .error:
return true
}
}
var isActive: Bool {
switch self {
case .done, .error, .leftoversNeedProcessing:
return false
case .active:
return true
}
}
var isLeftoversNeedProcessing: Bool {
switch self {
case .done, .error, .active:
return false
case .leftoversNeedProcessing:
return true
}
}
}
internal private(set) var decoder: Decoder? // only `nil` if we're already decoding (ie. we're re-entered)
private let maximumBufferSize: Int?
private var queuedWrites = CircularBuffer<NIOAny>(initialCapacity: 1) // queues writes received whilst we're already decoding (re-entrant write)
private var state: State = .active {
willSet {
assert(!self.state.isFinalState, "illegal state on state set: \(self.state)") // we can never leave final states
}
}
private var removalState: RemovalState = .notAddedToPipeline
// sadly to construct a B2MDBuffer we need an empty ByteBuffer which we can only get from the allocator, so IUO.
private var buffer: B2MDBuffer!
private var seenEOF: Bool = false
private var selfAsCanDequeueWrites: CanDequeueWrites? = nil
/// @see: ByteToMessageHandler.init(_:maximumBufferSize)
public convenience init(_ decoder: Decoder) {
self.init(decoder, maximumBufferSize: nil)
}
/// Initialize a `ByteToMessageHandler`.
///
/// - parameters:
/// - decoder: The `ByteToMessageDecoder` to decode the bytes into message.
/// - maximumBufferSize: The maximum number of bytes to aggregate in-memory.
public init(_ decoder: Decoder, maximumBufferSize: Int? = nil) {
self.decoder = decoder
self.maximumBufferSize = maximumBufferSize
}
deinit {
if self.removalState != .notAddedToPipeline {
// we have been added to the pipeline, if not, we don't need to check our state.
assert(self.removalState == .handlerRemovedCalled,
"illegal state in deinit: removalState = \(self.removalState)")
assert(self.state.isFinalState, "illegal state in deinit: state = \(self.state)")
}
}
}
#if swift(>=5.7)
@available(*, unavailable)
extension ByteToMessageHandler: Sendable {}
#endif
// MARK: ByteToMessageHandler: Test Helpers
extension ByteToMessageHandler {
internal var cumulationBuffer: ByteBuffer? {
return self.buffer._testOnlyOneBuffer()
}
}
private protocol CanDequeueWrites {
func dequeueWrites()
}
extension ByteToMessageHandler: CanDequeueWrites where Decoder: WriteObservingByteToMessageDecoder {
fileprivate func dequeueWrites() {
while self.queuedWrites.count > 0 {
// self.decoder can't be `nil`, this is only allowed to be called when we're not already on the stack
self.decoder!.write(data: self.unwrapOutboundIn(self.queuedWrites.removeFirst()))
}
}
}
// MARK: ByteToMessageHandler's Main API
extension ByteToMessageHandler {
@inline(__always) // allocations otherwise (reconsider with Swift 5.1)
private func withNextBuffer(allowEmptyBuffer: Bool, _ body: (inout Decoder, inout ByteBuffer) throws -> DecodingState) rethrows -> B2MDBuffer.BufferProcessingResult {
switch self.buffer.startProcessing(allowEmptyBuffer: allowEmptyBuffer) {
case .bufferAlreadyBeingProcessed:
return .cannotProcessReentrantly
case .nothingAvailable:
return .didProcess(.needMoreData)
case .available(var buffer):
var possiblyReclaimBytes = false
var decoder: Decoder? = nil
swap(&decoder, &self.decoder)
assert(decoder != nil) // self.decoder only `nil` if we're being re-entered, but .available means we're not
defer {
swap(&decoder, &self.decoder)
if buffer.readableBytes > 0 && possiblyReclaimBytes {
// we asserted above that the decoder we just swapped back in was non-nil so now `self.decoder` must
// be non-nil.
if self.decoder!.shouldReclaimBytes(buffer: buffer) {
buffer.discardReadBytes()
}
}
self.buffer.finishProcessing(remainder: &buffer)
}
let decodeResult = try body(&decoder!, &buffer)
// If we .continue, there's no point in trying to reclaim bytes because we'll loop again. If we need more
// data on the other hand, we should try to reclaim some of those bytes.
possiblyReclaimBytes = decodeResult == .needMoreData
return .didProcess(decodeResult)
}
}
private func processLeftovers(context: ChannelHandlerContext) {
guard self.state.isActive else {
// we are processing or have already processed the leftovers
return
}
do {
switch try self.decodeLoop(context: context, decodeMode: .last) {
case .didProcess:
self.state = .done
case .cannotProcessReentrantly:
self.state = .leftoversNeedProcessing
}
} catch {
self.state = .error(error)
context.fireErrorCaught(error)
}
}
private func tryDecodeWrites() {
if self.queuedWrites.count > 0 {
// this must succeed because unless we implement `CanDequeueWrites`, `queuedWrites` must always be empty.
self.selfAsCanDequeueWrites!.dequeueWrites()
}
}
private func decodeLoop(context: ChannelHandlerContext, decodeMode: DecodeMode) throws -> B2MDBuffer.BufferProcessingResult {
assert(!self.state.isError)
var allowEmptyBuffer = decodeMode == .last
while (self.state.isActive && self.removalState == .notBeingRemoved) || decodeMode == .last {
let result = try self.withNextBuffer(allowEmptyBuffer: allowEmptyBuffer) { decoder, buffer in
let decoderResult: DecodingState
if decodeMode == .normal {
assert(self.state.isActive, "illegal state for normal decode: \(self.state)")
decoderResult = try decoder.decode(context: context, buffer: &buffer)
} else {
allowEmptyBuffer = false
decoderResult = try decoder.decodeLast(context: context, buffer: &buffer, seenEOF: self.seenEOF)
}
if decoderResult == .needMoreData, let maximumBufferSize = self.maximumBufferSize, buffer.readableBytes > maximumBufferSize {
throw ByteToMessageDecoderError.PayloadTooLargeError()
}
return decoderResult
}
switch result {
case .didProcess(.continue):
self.tryDecodeWrites()
continue
case .didProcess(.needMoreData):
if self.queuedWrites.count > 0 {
self.tryDecodeWrites()
continue // we might have received more, so let's spin once more
} else {
return .didProcess(.needMoreData)
}
case .cannotProcessReentrantly:
return .cannotProcessReentrantly
}
}
return .didProcess(.continue)
}
}
// MARK: ByteToMessageHandler: ChannelInboundHandler
extension ByteToMessageHandler: ChannelInboundHandler {
public func handlerAdded(context: ChannelHandlerContext) {
guard self.removalState == .notAddedToPipeline else {
preconditionFailure("\(self) got readded to a ChannelPipeline but ByteToMessageHandler is single-use")
}
self.removalState = .notBeingRemoved
self.buffer = B2MDBuffer(emptyByteBuffer: context.channel.allocator.buffer(capacity: 0))
// here we can force it because we know that the decoder isn't in use if we're just adding this handler
self.selfAsCanDequeueWrites = self as? CanDequeueWrites // we need to cache this as it allocates.
self.decoder!.decoderAdded(context: context)
}
public func handlerRemoved(context: ChannelHandlerContext) {
// very likely, the removal state is `.notBeingRemoved` or `.removalCompleted` here but we can't assert it
// because the pipeline might be torn down during the formal removal process.
self.removalState = .handlerRemovedCalled
if !self.state.isFinalState {
self.state = .done
}
self.selfAsCanDequeueWrites = nil
// here we can force it because we know that the decoder isn't in use because the removal is always
// eventLoop.execute'd
self.decoder!.decoderRemoved(context: context)
}
/// Calls `decode` until there is nothing left to decode.
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let buffer = self.unwrapInboundIn(data)
if case .error(let error) = self.state {
context.fireErrorCaught(ByteToMessageDecoderError.dataReceivedInErrorState(error, buffer))
return
}
self.buffer.append(buffer: buffer)
do {
switch try self.decodeLoop(context: context, decodeMode: .normal) {
case .didProcess:
switch self.state {
case .active:
() // cool, all normal
case .done, .error:
() // fair, all done already
case .leftoversNeedProcessing:
// seems like we received a `channelInactive` or `handlerRemoved` whilst we were processing a read
switch try self.decodeLoop(context: context, decodeMode: .last) {
case .didProcess:
() // expected and cool
case .cannotProcessReentrantly:
preconditionFailure("bug in NIO: non-reentrant decode loop couldn't run \(self), \(self.state)")
}
self.state = .done
}
case .cannotProcessReentrantly:
// fine, will be done later
()
}
} catch {
self.state = .error(error)
context.fireErrorCaught(error)
}
}
/// Call `decodeLast` before forward the event through the pipeline.
public func channelInactive(context: ChannelHandlerContext) {
self.seenEOF = true
self.processLeftovers(context: context)
context.fireChannelInactive()
}
public func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
if event as? ChannelEvent == .some(.inputClosed) {
self.seenEOF = true
self.processLeftovers(context: context)
}
context.fireUserInboundEventTriggered(event)
}
}
extension ByteToMessageHandler: ChannelOutboundHandler, _ChannelOutboundHandler where Decoder: WriteObservingByteToMessageDecoder {
public typealias OutboundIn = Decoder.OutboundIn
public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
if self.decoder != nil {
let data = self.unwrapOutboundIn(data)
assert(self.queuedWrites.isEmpty)
self.decoder!.write(data: data)
} else {
self.queuedWrites.append(data)
}
context.write(data, promise: promise)
}
}
/// A protocol for straightforward encoders which encode custom messages to `ByteBuffer`s.
/// To add a `MessageToByteEncoder` to a `ChannelPipeline`, use
/// `channel.pipeline.addHandler(MessageToByteHandler(myEncoder)`.
public protocol MessageToByteEncoder {
associatedtype OutboundIn
/// Called once there is data to encode.
///
/// - parameters:
/// - data: The data to encode into a `ByteBuffer`.
/// - out: The `ByteBuffer` into which we want to encode.
func encode(data: OutboundIn, out: inout ByteBuffer) throws
}
extension ByteToMessageHandler: RemovableChannelHandler {
public func removeHandler(context: ChannelHandlerContext, removalToken: ChannelHandlerContext.RemovalToken) {
precondition(self.removalState == .notBeingRemoved)
self.removalState = .removalStarted
context.eventLoop.execute {
self.processLeftovers(context: context)
assert(!self.state.isLeftoversNeedProcessing, "illegal state: \(self.state)")
switch self.removalState {
case .removalStarted:
self.removalState = .removalCompleted
case .handlerRemovedCalled:
// if we're here, then the channel has also been torn down between the start and the completion of
// the user-triggered removal. That's okay.
()
default:
assertionFailure("illegal removal state: \(self.removalState)")
}
// this is necessary as it'll complete the promise.
context.leavePipeline(removalToken: removalToken)
}
}
}
/// A handler which turns a given `MessageToByteEncoder` into a `ChannelOutboundHandler` that can then be added to a
/// `ChannelPipeline`.
public final class MessageToByteHandler<Encoder: MessageToByteEncoder>: ChannelOutboundHandler {
public typealias OutboundOut = ByteBuffer
public typealias OutboundIn = Encoder.OutboundIn
private enum State {
case notInChannelYet
case operational
case error(Error)
case done
var readyToBeAddedToChannel: Bool {
switch self {
case .notInChannelYet:
return true
case .operational, .error, .done:
return false
}
}
}
private var state: State = .notInChannelYet
private let encoder: Encoder
private var buffer: ByteBuffer? = nil
public init(_ encoder: Encoder) {
self.encoder = encoder
}
}
#if swift(>=5.7)
@available(*, unavailable)
extension MessageToByteHandler: Sendable {}
#endif
extension MessageToByteHandler {
public func handlerAdded(context: ChannelHandlerContext) {
precondition(self.state.readyToBeAddedToChannel,
"illegal state when adding to Channel: \(self.state)")
self.state = .operational
self.buffer = context.channel.allocator.buffer(capacity: 256)
}
public func handlerRemoved(context: ChannelHandlerContext) {
self.state = .done
self.buffer = nil
}
public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
switch self.state {
case .notInChannelYet:
preconditionFailure("MessageToByteHandler.write called before it was added to a Channel")
case .error(let error):
promise?.fail(error)
context.fireErrorCaught(error)
return
case .done:
// let's just ignore this
return
case .operational:
// there's actually some work to do here
break
}
let data = self.unwrapOutboundIn(data)
do {
self.buffer!.clear()
try self.encoder.encode(data: data, out: &self.buffer!)
context.write(self.wrapOutboundOut(self.buffer!), promise: promise)
} catch {
self.state = .error(error)
promise?.fail(error)
context.fireErrorCaught(error)
}
}
}