remove HTTPResponseCompressor (#865)


We're not happy with HTTPResponseCompressor's API and it needs to
incubate a little more, hence moving to


- removed HTTPResponseDecoder
- removed the zlib dependency


- no more HTTPResponseDecoder
This commit is contained in:
Johannes Weiss 2019-03-04 13:52:30 +00:00 committed by GitHub
parent a1981eb0fd
commit baa3390b15
No known key found for this signature in database
8 changed files with 2 additions and 1088 deletions

View File

@ -32,7 +32,7 @@ var targets: [PackageDescription.Target] = [
.target(name: "NIOConcurrencyHelpers",
dependencies: ["CNIOAtomics"]),
.target(name: "NIOHTTP1",
dependencies: ["NIO", "NIOConcurrencyHelpers", "CNIOHTTPParser", "CNIOZlib"]),
dependencies: ["NIO", "NIOConcurrencyHelpers", "CNIOHTTPParser"]),
.target(name: "NIOEchoServer",
dependencies: ["NIO", "NIOConcurrencyHelpers"]),
.target(name: "NIOEchoClient",
@ -40,11 +40,6 @@ var targets: [PackageDescription.Target] = [
.target(name: "NIOHTTP1Server",
dependencies: ["NIO", "NIOHTTP1", "NIOConcurrencyHelpers"]),
.target(name: "CNIOHTTPParser"),
.target(name: "CNIOZlib",
dependencies: [],
linkerSettings: [
.target(name: "NIOTLS", dependencies: ["NIO"]),
.target(name: "NIOChatServer",
dependencies: ["NIO", "NIOConcurrencyHelpers"]),

View File

@ -1,13 +0,0 @@
// This source file is part of the SwiftNIO open source project
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
// SPDX-License-Identifier: Apache-2.0

View File

@ -1,32 +0,0 @@
// This source file is part of the SwiftNIO open source project
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
// SPDX-License-Identifier: Apache-2.0
#ifndef C_NIO_ZLIB_H
#define C_NIO_ZLIB_H
#include <zlib.h>
static inline int CNIOZlib_deflateInit2(z_streamp strm,
int level,
int method,
int windowBits,
int memLevel,
int strategy) {
return deflateInit2(strm, level, method, windowBits, memLevel, strategy);
static inline int CNIOZlib_inflateInit2(z_streamp strm, int windowBits) {
return inflateInit2(strm, windowBits);

View File

@ -1,397 +0,0 @@
// This source file is part of the SwiftNIO open source project
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
// SPDX-License-Identifier: Apache-2.0
import CNIOZlib
import NIO
extension String {
/// Test if this `Collection` starts with the unicode scalars of `needle`.
/// - note: This will be faster than `String.startsWith` as no unicode normalisations are performed.
/// - parameters:
/// - needle: The `Collection` of `Unicode.Scalar`s to match at the beginning of `self`
/// - returns: If `self` started with the elements contained in `needle`.
func startsWithSameUnicodeScalars<S: StringProtocol>(string needle: S) -> Bool {
return self.unicodeScalars.starts(with: needle.unicodeScalars)
/// Given a header value, extracts the q value if there is one present. If one is not present,
/// returns the default q value, 1.0.
private func qValueFromHeader(_ text: String) -> Float {
let headerParts = text.split(separator: ";", maxSplits: 1, omittingEmptySubsequences: false)
guard headerParts.count > 1 && headerParts[1].count > 0 else {
return 1
// We have a Q value.
let qValue = Float(headerParts[1].split(separator: "=", maxSplits: 1, omittingEmptySubsequences: false)[1]) ?? 0
if qValue < 0 || qValue > 1 || qValue.isNaN {
return 0
return qValue
/// A HTTPResponseCompressor is a duplex channel handler that handles automatic streaming compression of
/// HTTP responses. It respects the client's Accept-Encoding preferences, including q-values if present,
/// and ensures that clients are served the compression algorithm that works best for them.
/// This compressor supports gzip and deflate. It works best if many writes are made between flushes.
/// Note that this compressor performs the compression on the event loop thread. This means that compressing
/// some resources, particularly those that do not benefit from compression or that could have been compressed
/// ahead-of-time instead of dynamically, could be a waste of CPU time and latency for relatively minimal
/// benefit. This channel handler should be present in the pipeline only for dynamically-generated and
/// highly-compressible content, which will see the biggest benefits from streaming compression.
public final class HTTPResponseCompressor: ChannelDuplexHandler {
public typealias InboundIn = HTTPServerRequestPart
public typealias InboundOut = HTTPServerRequestPart
public typealias OutboundIn = HTTPServerResponsePart
public typealias OutboundOut = HTTPServerResponsePart
public enum CompressionError: Error {
case uncompressedWritesPending
case noDataToWrite
fileprivate enum CompressionAlgorithm: String {
case gzip = "gzip"
case deflate = "deflate"
// Private variable for storing stream data.
private var stream = z_stream()
private var algorithm: CompressionAlgorithm?
// A queue of accept headers.
private var acceptQueue = CircularBuffer<[String]>(initialCapacity: 8)
private var pendingResponse: PartialHTTPResponse!
private var pendingWritePromise: EventLoopPromise<Void>!
private let initialByteBufferCapacity: Int
public init(initialByteBufferCapacity: Int = 1024) {
self.initialByteBufferCapacity = initialByteBufferCapacity
public func handlerAdded(context: ChannelHandlerContext) {
pendingResponse = PartialHTTPResponse(bodyBuffer: initialByteBufferCapacity))
pendingWritePromise = context.eventLoop.makePromise()
public func handlerRemoved(context: ChannelHandlerContext) {
if algorithm != nil {
algorithm = nil
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
if case .head(let requestHead) = unwrapInboundIn(data) {
acceptQueue.append(requestHead.headers[canonicalForm: "accept-encoding"])
public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
let httpData = unwrapOutboundIn(data)
switch httpData {
case .head(var responseHead):
algorithm = compressionAlgorithm()
guard algorithm != nil else {
context.write(wrapOutboundOut(.head(responseHead)), promise: promise)
// Previous handlers in the pipeline might have already set this header even though
// they should not as it is compressor responsibility to decide what encoding to use
responseHead.headers.replaceOrAdd(name: "Content-Encoding", value: algorithm!.rawValue)
initializeEncoder(encoding: algorithm!)
pendingWritePromise.futureResult.cascade(to: promise)
case .body(let body):
if algorithm != nil {
pendingWritePromise.futureResult.cascade(to: promise)
} else {
context.write(data, promise: promise)
case .end:
// This compress is not done in flush because we need to be done with the
// compressor now.
guard algorithm != nil else {
context.write(data, promise: promise)
pendingWritePromise.futureResult.cascade(to: promise)
emitPendingWrites(context: context)
algorithm = nil
public func flush(context: ChannelHandlerContext) {
emitPendingWrites(context: context)
/// Determines the compression algorithm to use for the next response.
/// Returns the compression algorithm to use, or nil if the next response
/// should not be compressed.
private func compressionAlgorithm() -> CompressionAlgorithm? {
let acceptHeaders = acceptQueue.removeFirst()
var gzipQValue: Float = -1
var deflateQValue: Float = -1
var anyQValue: Float = -1
for acceptHeader in acceptHeaders {
if acceptHeader.startsWithSameUnicodeScalars(string: "gzip") || acceptHeader.startsWithSameUnicodeScalars(string: "x-gzip") {
gzipQValue = qValueFromHeader(acceptHeader)
} else if acceptHeader.startsWithSameUnicodeScalars(string: "deflate") {
deflateQValue = qValueFromHeader(acceptHeader)
} else if acceptHeader.startsWithSameUnicodeScalars(string: "*") {
anyQValue = qValueFromHeader(acceptHeader)
if gzipQValue > 0 || deflateQValue > 0 {
return gzipQValue > deflateQValue ? .gzip : .deflate
} else if anyQValue > 0 {
// Though gzip is usually less well compressed than deflate, it has slightly
// wider support because it's unabiguous. We therefore default to that unless
// the client has expressed a preference.
return .gzip
return nil
/// Set up the encoder for compressing data according to a specific
/// algorithm.
private func initializeEncoder(encoding: CompressionAlgorithm) {
// zlib docs say: The application must initialize zalloc, zfree and opaque before calling the init function.
stream.zalloc = nil
stream.zfree = nil
stream.opaque = nil
let windowBits: Int32
switch encoding {
case .deflate:
windowBits = 15
case .gzip:
windowBits = 16 + 15
let rc = CNIOZlib_deflateInit2(&stream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, windowBits, 8, Z_DEFAULT_STRATEGY)
precondition(rc == Z_OK, "Unexpected return from zlib init: \(rc)")
private func deinitializeEncoder() {
// We deliberately discard the result here because we just want to free up
// the pending data.
/// Emits all pending buffered writes to the network, optionally compressing the
/// data. Resets the pending write buffer and promise.
/// Called either when a HTTP end message is received or our flush() method is called.
private func emitPendingWrites(context: ChannelHandlerContext) {
let writesToEmit = pendingResponse.flush(compressor: &stream, allocator:
var pendingPromise = pendingWritePromise
if let writeHead = writesToEmit.0 {
context.write(wrapOutboundOut(.head(writeHead)), promise: pendingPromise)
pendingPromise = nil
if let writeBody = writesToEmit.1 {
context.write(wrapOutboundOut(.body(.byteBuffer(writeBody))), promise: pendingPromise)
pendingPromise = nil
if let writeEnd = writesToEmit.2 {
context.write(wrapOutboundOut(writeEnd), promise: pendingPromise)
pendingPromise = nil
// If we still have the pending promise, we never emitted a write. Fail the promise,
// as anything that is listening for its data somehow lost it.
if let stillPendingPromise = pendingPromise {
// Reset the pending promise.
pendingWritePromise = context.eventLoop.makePromise()
/// A buffer object that allows us to keep track of how much of a HTTP response we've seen before
/// a flush.
/// The strategy used in this module is that we want to have as much information as possible before
/// we compress, and to compress as few times as possible. This is because in the ideal situation we
/// will have a complete HTTP response to compress in one shot, allowing us to update the content
/// length, rather than force the response to be chunked. It is much easier to do the right thing
/// if we can encapsulate our ideas about how HTTP responses in an entity like this.
private struct PartialHTTPResponse {
var head: HTTPResponseHead?
var body: ByteBuffer
var end: HTTPServerResponsePart?
private let initialBufferSize: Int
var isCompleteResponse: Bool {
return head != nil && end != nil
var mustFlush: Bool {
return end != nil
init(bodyBuffer: ByteBuffer) {
body = bodyBuffer
initialBufferSize = bodyBuffer.capacity
mutating func bufferResponseHead(_ head: HTTPResponseHead) {
precondition(self.head == nil)
self.head = head
mutating func bufferBodyPart(_ bodyPart: IOData) {
switch bodyPart {
case .byteBuffer(var buffer):
case .fileRegion:
fatalError("Cannot currently compress file regions")
mutating func bufferResponseEnd(_ end: HTTPServerResponsePart) {
precondition(self.end == nil)
guard case .end = end else {
fatalError("Buffering wrong entity type: \(end)")
self.end = end
private mutating func clear() {
head = nil
end = nil
mutating private func compressBody(compressor: inout z_stream, allocator: ByteBufferAllocator, flag: Int32) -> ByteBuffer? {
guard body.readableBytes > 0 else {
return nil
// deflateBound() provides an upper limit on the number of bytes the input can
// compress to. We add 5 bytes to handle the fact that Z_SYNC_FLUSH will append
// an empty stored block that is 5 bytes long.
let bufferSize = Int(deflateBound(&compressor, UInt(body.readableBytes)))
var outputBuffer = allocator.buffer(capacity: bufferSize)
// Now do the one-shot compression. All the data should have been consumed.
compressor.oneShotDeflate(from: &body, to: &outputBuffer, flag: flag)
precondition(body.readableBytes == 0)
precondition(outputBuffer.readableBytes > 0)
return outputBuffer
/// Flushes the buffered data into its constituent parts.
/// Returns a three-tuple of a HTTP response head, compressed body bytes, and any end that
/// may have been buffered. Each of these types is optional.
/// If the head is flushed, it will have had its headers mutated based on whether we had the whole
/// response or not. If nil, the head has previously been emitted.
/// If the body is nil, it means no writes were buffered (that is, our buffer of bytes has no
/// readable bytes in it). This should usually mean that no write is issued.
/// Calling this function resets the buffer, freeing any excess memory allocated in the internal
/// buffer and losing all copies of the other HTTP data. At this point it may freely be reused.
mutating func flush(compressor: inout z_stream, allocator: ByteBufferAllocator) -> (HTTPResponseHead?, ByteBuffer?, HTTPServerResponsePart?) {
let flag = mustFlush ? Z_FINISH : Z_SYNC_FLUSH
let body = compressBody(compressor: &compressor, allocator: allocator, flag: flag)
if let bodyLength = body?.readableBytes, isCompleteResponse && bodyLength > 0 {
head!.headers.remove(name: "transfer-encoding")
head!.headers.replaceOrAdd(name: "content-length", value: "\(bodyLength)")
} else if head != nil && head!.status.mayHaveResponseBody {
head!.headers.remove(name: "content-length")
head!.headers.replaceOrAdd(name: "transfer-encoding", value: "chunked")
let response = (head, body, end)
return response
private extension z_stream {
/// Executes deflate from one buffer to another buffer. The advantage of this method is that it
/// will ensure that the stream is "safe" after each call (that is, that the stream does not have
/// pointers to byte buffers any longer).
mutating func oneShotDeflate(from: inout ByteBuffer, to: inout ByteBuffer, flag: Int32) {
defer {
self.avail_in = 0
self.next_in = nil
self.avail_out = 0
self.next_out = nil
from.readWithUnsafeMutableReadableBytes { dataPtr in
let typedPtr = dataPtr.baseAddress!.assumingMemoryBound(to: UInt8.self)
let typedDataPtr = UnsafeMutableBufferPointer(start: typedPtr,
count: dataPtr.count)
self.avail_in = UInt32(typedDataPtr.count)
self.next_in = typedDataPtr.baseAddress!
let rc = deflateToBuffer(buffer: &to, flag: flag)
precondition(rc == Z_OK || rc == Z_STREAM_END, "One-shot compression failed: \(rc)")
return typedDataPtr.count - Int(self.avail_in)
/// A private function that sets the deflate target buffer and then calls deflate.
/// This relies on having the input set by the previous caller: it will use whatever input was
/// configured.
private mutating func deflateToBuffer(buffer: inout ByteBuffer, flag: Int32) -> Int32 {
var rc = Z_OK
buffer.writeWithUnsafeMutableBytes { outputPtr in
let typedOutputPtr = UnsafeMutableBufferPointer(start: outputPtr.baseAddress!.assumingMemoryBound(to: UInt8.self),
count: outputPtr.count)
self.avail_out = UInt32(typedOutputPtr.count)
self.next_out = typedOutputPtr.baseAddress!
rc = deflate(&self, flag)
return typedOutputPtr.count - Int(self.avail_out)
return rc

View File

@ -60,7 +60,6 @@ import XCTest

View File

@ -1,56 +0,0 @@
// This source file is part of the SwiftNIO open source project
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
// SPDX-License-Identifier: Apache-2.0
// HTTPResponseCompressorTest+XCTest.swift
import XCTest
/// NOTE: This file was generated by generate_linux_tests.rb
/// Do NOT edit this file directly as it will be regenerated automatically when needed.
extension HTTPResponseCompressorTest {
static var allTests : [(String, (HTTPResponseCompressorTest) -> () throws -> Void)] {
return [
("testCanCompressSimpleBodies", testCanCompressSimpleBodies),
("testCanCompressSimpleBodiesGzip", testCanCompressSimpleBodiesGzip),
("testCanCompressDeflateWithAwkwardFlushes", testCanCompressDeflateWithAwkwardFlushes),
("testCanCompressGzipWithAwkwardFlushes", testCanCompressGzipWithAwkwardFlushes),
("testDoesNotCompressWithoutAcceptEncodingHeader", testDoesNotCompressWithoutAcceptEncodingHeader),
("testHandlesPipelinedRequestsProperly", testHandlesPipelinedRequestsProperly),
("testHandlesBasicQValues", testHandlesBasicQValues),
("testAlwaysPrefersHighestQValue", testAlwaysPrefersHighestQValue),
("testAsteriskMeansGzip", testAsteriskMeansGzip),
("testIgnoresUnknownAlgorithms", testIgnoresUnknownAlgorithms),
("testNonNumericQValuePreventsChoice", testNonNumericQValuePreventsChoice),
("testNaNQValuePreventsChoice", testNaNQValuePreventsChoice),
("testInfinityQValuePreventsChoice", testInfinityQValuePreventsChoice),
("testNegativeInfinityQValuePreventsChoice", testNegativeInfinityQValuePreventsChoice),
("testOutOfRangeQValuePreventsChoice", testOutOfRangeQValuePreventsChoice),
("testOverridesContentEncodingHeader", testOverridesContentEncodingHeader),
("testRemovingHandlerFailsPendingWrites", testRemovingHandlerFailsPendingWrites),
("testDoesNotBufferWritesNoAlgorithm", testDoesNotBufferWritesNoAlgorithm),
("testStartsWithSameUnicodeScalarsWorksOnEmptyStrings", testStartsWithSameUnicodeScalarsWorksOnEmptyStrings),
("testStartsWithSameUnicodeScalarsWorksOnLongerNeedleFalse", testStartsWithSameUnicodeScalarsWorksOnLongerNeedleFalse),
("testStartsWithSameUnicodeScalarsWorksOnSameStrings", testStartsWithSameUnicodeScalarsWorksOnSameStrings),
("testStartsWithSameUnicodeScalarsWorksOnPrefix", testStartsWithSameUnicodeScalarsWorksOnPrefix),
("testStartsWithSameUnicodeScalarsSaysNoForTheSameStringInDifferentNormalisations", testStartsWithSameUnicodeScalarsSaysNoForTheSameStringInDifferentNormalisations),
("testStartsWithSaysYesForTheSameStringInDifferentNormalisations", testStartsWithSaysYesForTheSameStringInDifferentNormalisations),

View File

@ -1,583 +0,0 @@
// This source file is part of the SwiftNIO open source project
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
// SPDX-License-Identifier: Apache-2.0
import XCTest
import CNIOZlib
@testable import NIO
@testable import NIOHTTP1
private class PromiseOrderer {
private var promiseArray: Array<EventLoopPromise<Void>>
private let eventLoop: EventLoop
internal init(eventLoop: EventLoop) {
promiseArray = Array()
self.eventLoop = eventLoop
func makePromise(file: StaticString = #file, line: UInt = #line) -> EventLoopPromise<Void> {
let promise = eventLoop.makePromise(of: Void.self, file: file, line: line)
return promise
private func appendPromise(_ promise: EventLoopPromise<Void>) {
let thisPromiseIndex = promiseArray.count
promise.futureResult.whenComplete { (_: Result<Void, Error>) in
let priorFutures = self.promiseArray[0..<thisPromiseIndex]
let subsequentFutures = self.promiseArray[(thisPromiseIndex + 1)...]
let allPriorFuturesFired = { $0.futureResult.isFulfilled }.allSatisfy { $0 }
let allSubsequentFuturesUnfired = { $0.futureResult.isFulfilled }.allSatisfy { !$0 }
func waitUntilComplete() throws {
guard let promise = promiseArray.last else { return }
try promise.futureResult.wait()
private extension ByteBuffer {
mutating func withUnsafeMutableReadableUInt8Bytes<T>(_ body: (UnsafeMutableBufferPointer<UInt8>) throws -> T) rethrows -> T {
return try self.withUnsafeMutableReadableBytes { (ptr: UnsafeMutableRawBufferPointer) -> T in
let baseInputPointer = ptr.baseAddress?.assumingMemoryBound(to: UInt8.self)
let inputBufferPointer = UnsafeMutableBufferPointer(start: baseInputPointer, count: ptr.count)
return try body(inputBufferPointer)
mutating func writeWithUnsafeMutableUInt8Bytes(_ body: (UnsafeMutableBufferPointer<UInt8>) throws -> Int) rethrows -> Int {
return try self.writeWithUnsafeMutableBytes { (ptr: UnsafeMutableRawBufferPointer) -> Int in
let baseInputPointer = ptr.baseAddress?.assumingMemoryBound(to: UInt8.self)
let inputBufferPointer = UnsafeMutableBufferPointer(start: baseInputPointer, count: ptr.count)
return try body(inputBufferPointer)
mutating func merge<S: Sequence>(_ others: S) -> ByteBuffer where S.Element == ByteBuffer {
for var buffer in others {
return self
private extension z_stream {
static func decompressDeflate(compressedBytes: inout ByteBuffer, outputBuffer: inout ByteBuffer) {
decompress(compressedBytes: &compressedBytes, outputBuffer: &outputBuffer, windowSize: 15)
static func decompressGzip(compressedBytes: inout ByteBuffer, outputBuffer: inout ByteBuffer) {
decompress(compressedBytes: &compressedBytes, outputBuffer: &outputBuffer, windowSize: 16 + 15)
private static func decompress(compressedBytes: inout ByteBuffer, outputBuffer: inout ByteBuffer, windowSize: Int32) {
compressedBytes.withUnsafeMutableReadableUInt8Bytes { inputPointer in
outputBuffer.writeWithUnsafeMutableUInt8Bytes { outputPointer -> Int in
var stream = z_stream()
// zlib requires we initialize next_in, avail_in, zalloc, zfree and opaque before calling inflateInit2.
stream.next_in = inputPointer.baseAddress!
stream.avail_in = UInt32(inputPointer.count)
stream.next_out = outputPointer.baseAddress!
stream.avail_out = UInt32(outputPointer.count)
stream.zalloc = nil
stream.zfree = nil
stream.opaque = nil
var rc = CNIOZlib_inflateInit2(&stream, windowSize)
precondition(rc == Z_OK)
rc = inflate(&stream, Z_FINISH)
XCTAssertEqual(rc, Z_STREAM_END)
XCTAssertEqual(stream.avail_in, 0)
rc = inflateEnd(&stream)
XCTAssertEqual(rc, Z_OK)
return outputPointer.count - Int(stream.avail_out)
class HTTPResponseCompressorTest: XCTestCase {
private enum WriteStrategy {
case once
case intermittentFlushes
private func sendRequest(acceptEncoding: String?, channel: EmbeddedChannel) throws {
var requestHead = HTTPRequestHead(version: HTTPVersion(major: 1, minor: 1), method: .GET, uri: "/")
if let acceptEncoding = acceptEncoding {
requestHead.headers.add(name: "Accept-Encoding", value: acceptEncoding)
try channel.writeInbound(HTTPServerRequestPart.head(requestHead))
try channel.writeInbound(HTTPServerRequestPart.end(nil))
private func clientParsingChannel() -> EmbeddedChannel {
let channel = EmbeddedChannel()
XCTAssertNoThrow(try channel.pipeline.addHTTPClientHandlers().wait())
return channel
private func writeOneChunk(head: HTTPResponseHead, body: [ByteBuffer], channel: EmbeddedChannel) throws {
let promiseOrderer = PromiseOrderer(eventLoop: channel.eventLoop)
channel.pipeline.write(NIOAny(HTTPServerResponsePart.head(head)), promise: promiseOrderer.makePromise())
for bodyChunk in body {
promise: promiseOrderer.makePromise())
promise: promiseOrderer.makePromise())
// Get all the promises to fire.
try promiseOrderer.waitUntilComplete()
private func writeIntermittentFlushes(head: HTTPResponseHead, body: [ByteBuffer], channel: EmbeddedChannel) throws {
let promiseOrderer = PromiseOrderer(eventLoop: channel.eventLoop)
var writeCount = 0
channel.pipeline.write(NIOAny(HTTPServerResponsePart.head(head)), promise: promiseOrderer.makePromise())
for bodyChunk in body {
promise: promiseOrderer.makePromise())
writeCount += 1
if writeCount % 3 == 0 {
promise: promiseOrderer.makePromise())
// Get all the promises to fire.
try promiseOrderer.waitUntilComplete()
private func compressResponse(head: HTTPResponseHead,
body: [ByteBuffer],
channel: EmbeddedChannel,
writeStrategy: WriteStrategy = .once) throws -> (HTTPResponseHead, [ByteBuffer]) {
switch writeStrategy {
case .once:
try writeOneChunk(head: head, body: body, channel: channel)
case .intermittentFlushes:
try writeIntermittentFlushes(head: head, body: body, channel: channel)
// Feed the output from this channel into a client one. We need to have the client see a
// HTTP request to avoid this exploding.
let clientChannel = clientParsingChannel()
defer {
_ = try! clientChannel.finish()
var requestHead = HTTPRequestHead(version: HTTPVersion(major: 1, minor: 1), method: .GET, uri: "/")
requestHead.headers.add(name: "host", value: "")
clientChannel.write(NIOAny(HTTPClientRequestPart.head(requestHead)), promise: nil)
clientChannel.write(NIOAny(HTTPClientRequestPart.end(nil)), promise: nil)
while let b = try channel.readOutbound(as: ByteBuffer.self) {
try clientChannel.writeInbound(b)
// The first inbound datum will be the response head. The rest will be byte buffers, until
// the last, which is the end.
var head: HTTPResponseHead? = nil
var dataChunks = [ByteBuffer]()
loop: while let responsePart: HTTPClientResponsePart = try clientChannel.readInbound() {
switch responsePart {
case .head(let h):
precondition(head == nil)
head = h
case .body(let data):
case .end:
break loop
return (head!, dataChunks)
private func assertDecompressedResponseMatches(responseData: inout ByteBuffer,
expectedResponse: ByteBuffer,
allocator: ByteBufferAllocator,
decompressor: (inout ByteBuffer, inout ByteBuffer) -> Void) {
var outputBuffer = allocator.buffer(capacity: expectedResponse.readableBytes)
decompressor(&responseData, &outputBuffer)
XCTAssertEqual(expectedResponse, outputBuffer)
private func assertDeflatedResponse(channel: EmbeddedChannel, writeStrategy: WriteStrategy = .once) throws {
let bodySize = 2048
let response = HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1),
status: .ok)
let body = [UInt8](repeating: 60, count: bodySize)
var bodyBuffer = channel.allocator.buffer(capacity: bodySize)
var bodyChunks = [ByteBuffer]()
for index in stride(from: 0, to: bodyBuffer.readableBytes, by: 2) {
bodyChunks.append(bodyBuffer.getSlice(at: index, length: 2)!)
let data = try compressResponse(head: response,
body: bodyChunks,
channel: channel,
writeStrategy: writeStrategy)
let compressedResponse = data.0
var compressedChunks = data.1
var compressedBody = compressedChunks[0].merge(compressedChunks[1...])
XCTAssertEqual(compressedResponse.headers[canonicalForm: "content-encoding"], ["deflate"])
switch writeStrategy {
case .once:
XCTAssertEqual(compressedResponse.headers[canonicalForm: "content-length"], ["\(compressedBody.readableBytes)"])
XCTAssertEqual(compressedResponse.headers[canonicalForm: "transfer-encoding"], [])
case .intermittentFlushes:
XCTAssertEqual(compressedResponse.headers[canonicalForm: "content-length"], [])
XCTAssertEqual(compressedResponse.headers[canonicalForm: "transfer-encoding"], ["chunked"])
assertDecompressedResponseMatches(responseData: &compressedBody,
expectedResponse: bodyBuffer,
allocator: channel.allocator,
decompressor: z_stream.decompressDeflate)
private func assertGzippedResponse(channel: EmbeddedChannel, writeStrategy: WriteStrategy = .once, additionalHeaders: HTTPHeaders = HTTPHeaders()) throws {
let bodySize = 2048
var response = HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1),
status: .ok)
response.headers = additionalHeaders
let body = [UInt8](repeating: 60, count: bodySize)
var bodyBuffer = channel.allocator.buffer(capacity: bodySize)
var bodyChunks = [ByteBuffer]()
for index in stride(from: 0, to: bodyBuffer.readableBytes, by: 2) {
bodyChunks.append(bodyBuffer.getSlice(at: index, length: 2)!)
let data = try compressResponse(head: response,
body: bodyChunks,
channel: channel,
writeStrategy: writeStrategy)
let compressedResponse = data.0
var compressedChunks = data.1
var compressedBody = compressedChunks[0].merge(compressedChunks[1...])
XCTAssertEqual(compressedResponse.headers[canonicalForm: "content-encoding"], ["gzip"])
switch writeStrategy {
case .once:
XCTAssertEqual(compressedResponse.headers[canonicalForm: "content-length"], ["\(compressedBody.readableBytes)"])
XCTAssertEqual(compressedResponse.headers[canonicalForm: "transfer-encoding"], [])
case .intermittentFlushes:
XCTAssertEqual(compressedResponse.headers[canonicalForm: "content-length"], [])
XCTAssertEqual(compressedResponse.headers[canonicalForm: "transfer-encoding"], ["chunked"])
assertDecompressedResponseMatches(responseData: &compressedBody,
expectedResponse: bodyBuffer,
allocator: channel.allocator,
decompressor: z_stream.decompressGzip)
private func assertUncompressedResponse(channel: EmbeddedChannel, writeStrategy: WriteStrategy = .once) throws {
let bodySize = 2048
let response = HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1),
status: .ok)
let body = [UInt8](repeating: 60, count: bodySize)
var bodyBuffer = channel.allocator.buffer(capacity: bodySize)
var bodyChunks = [ByteBuffer]()
for index in stride(from: 0, to: bodyBuffer.readableBytes, by: 2) {
bodyChunks.append(bodyBuffer.getSlice(at: index, length: 2)!)
let data = try compressResponse(head: response,
body: bodyChunks,
channel: channel,
writeStrategy: writeStrategy)
let compressedResponse = data.0
var compressedChunks = data.1
let uncompressedBody = compressedChunks[0].merge(compressedChunks[1...])
XCTAssertEqual(compressedResponse.headers[canonicalForm: "content-encoding"], [])
XCTAssertEqual(uncompressedBody.readableBytes, 2048)
XCTAssertEqual(uncompressedBody, bodyBuffer)
private func compressionChannel() throws -> EmbeddedChannel {
let channel = EmbeddedChannel()
XCTAssertNoThrow(try channel.pipeline.addHandler(HTTPResponseEncoder()).wait())
XCTAssertNoThrow(try channel.pipeline.addHandler(HTTPResponseCompressor()).wait())
return channel
func testCanCompressSimpleBodies() throws {
let channel = try compressionChannel()
defer {
XCTAssertNoThrow(try channel.finish())
try sendRequest(acceptEncoding: "deflate", channel: channel)
try assertDeflatedResponse(channel: channel)
func testCanCompressSimpleBodiesGzip() throws {
let channel = try compressionChannel()
defer {
XCTAssertNoThrow(try channel.finish())
try sendRequest(acceptEncoding: "gzip", channel: channel)
try assertGzippedResponse(channel: channel)
func testCanCompressDeflateWithAwkwardFlushes() throws {
let channel = try compressionChannel()
defer {
XCTAssertNoThrow(try channel.finish())
try sendRequest(acceptEncoding: "deflate", channel: channel)
try assertDeflatedResponse(channel: channel, writeStrategy: .intermittentFlushes)
func testCanCompressGzipWithAwkwardFlushes() throws {
let channel = try compressionChannel()
defer {
XCTAssertNoThrow(try channel.finish())
try sendRequest(acceptEncoding: "gzip", channel: channel)
try assertGzippedResponse(channel: channel, writeStrategy: .intermittentFlushes)
func testDoesNotCompressWithoutAcceptEncodingHeader() throws {
let channel = try compressionChannel()
defer {
XCTAssertNoThrow(try channel.finish())
try sendRequest(acceptEncoding: nil, channel: channel)
try assertUncompressedResponse(channel: channel)
func testHandlesPipelinedRequestsProperly() throws {
let channel = try compressionChannel()
defer {
XCTAssertNoThrow(try channel.finish())
// Three requests: one for deflate, one for gzip, one for nothing.
try sendRequest(acceptEncoding: "deflate", channel: channel)
try sendRequest(acceptEncoding: "gzip", channel: channel)
try sendRequest(acceptEncoding: "identity", channel: channel)
try assertDeflatedResponse(channel: channel)
try assertGzippedResponse(channel: channel)
try assertUncompressedResponse(channel: channel)
func testHandlesBasicQValues() throws {
let channel = try compressionChannel()
defer {
XCTAssertNoThrow(try channel.finish())
try sendRequest(acceptEncoding: "gzip, deflate;q=0.5", channel: channel)
try assertGzippedResponse(channel: channel)
func testAlwaysPrefersHighestQValue() throws {
let channel = try compressionChannel()
defer {
XCTAssertNoThrow(try channel.finish())
try sendRequest(acceptEncoding: "deflate;q=0.5, gzip;q=0.8, *;q=0.3", channel: channel)
try assertGzippedResponse(channel: channel)
func testAsteriskMeansGzip() throws {
let channel = try compressionChannel()
defer {
XCTAssertNoThrow(try channel.finish())
try sendRequest(acceptEncoding: "*", channel: channel)
try assertGzippedResponse(channel: channel)
func testIgnoresUnknownAlgorithms() throws {
let channel = try compressionChannel()
defer {
XCTAssertNoThrow(try channel.finish())
try sendRequest(acceptEncoding: "br", channel: channel)
try assertUncompressedResponse(channel: channel)
func testNonNumericQValuePreventsChoice() throws {
let channel = try compressionChannel()
defer {
XCTAssertNoThrow(try channel.finish())
try sendRequest(acceptEncoding: "deflate;q=fish=fish, gzip;q=0.3", channel: channel)
try assertGzippedResponse(channel: channel)
func testNaNQValuePreventsChoice() throws {
let channel = try compressionChannel()
defer {
XCTAssertNoThrow(try channel.finish())
try sendRequest(acceptEncoding: "deflate;q=NaN, gzip;q=0.3", channel: channel)
try assertGzippedResponse(channel: channel)
func testInfinityQValuePreventsChoice() throws {
let channel = try compressionChannel()
defer {
XCTAssertNoThrow(try channel.finish())
try sendRequest(acceptEncoding: "deflate;q=Inf, gzip;q=0.3", channel: channel)
try assertGzippedResponse(channel: channel)
func testNegativeInfinityQValuePreventsChoice() throws {
let channel = try compressionChannel()
defer {
XCTAssertNoThrow(try channel.finish())
try sendRequest(acceptEncoding: "deflate;q=-Inf, gzip;q=0.3", channel: channel)
try assertGzippedResponse(channel: channel)
func testOutOfRangeQValuePreventsChoice() throws {
let channel = try compressionChannel()
defer {
XCTAssertNoThrow(try channel.finish())
try sendRequest(acceptEncoding: "deflate;q=2.2, gzip;q=0.3", channel: channel)
try assertGzippedResponse(channel: channel)
func testOverridesContentEncodingHeader() throws {
let channel = try compressionChannel()
defer {
XCTAssertNoThrow(try channel.finish())
try sendRequest(acceptEncoding: "deflate;q=2.2, gzip;q=0.3", channel: channel)
try assertGzippedResponse(channel: channel, additionalHeaders: HTTPHeaders([("Content-Encoding", "deflate")]))
func testRemovingHandlerFailsPendingWrites() throws {
let channel = try compressionChannel()
try sendRequest(acceptEncoding: "gzip", channel: channel)
let head = HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: .ok)
let writePromise = channel.eventLoop.makePromise(of: Void.self)
channel.write(NIOAny(HTTPServerResponsePart.head(head)), promise: writePromise) {
XCTFail("Write succeeded")
}.whenFailure { err in
switch err {
case HTTPResponseCompressor.CompressionError.uncompressedWritesPending:
// ok
do {
try writePromise.futureResult.wait()
} catch {
// We don't care about errors here, we just need to block the
// test until we're done.
func testDoesNotBufferWritesNoAlgorithm() throws {
let channel = try compressionChannel()
try sendRequest(acceptEncoding: nil, channel: channel)
let head = HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: .ok)
let writePromise = channel.eventLoop.makePromise(of: Void.self)
channel.writeAndFlush(NIOAny(HTTPServerResponsePart.head(head)), promise: writePromise)
try writePromise.futureResult.wait()
func testStartsWithSameUnicodeScalarsWorksOnEmptyStrings() throws {
XCTAssertTrue("".startsWithSameUnicodeScalars(string: ""))
func testStartsWithSameUnicodeScalarsWorksOnLongerNeedleFalse() throws {
XCTAssertFalse("_".startsWithSameUnicodeScalars(string: "__"))
func testStartsWithSameUnicodeScalarsWorksOnSameStrings() throws {
XCTAssertTrue("beer".startsWithSameUnicodeScalars(string: "beer"))
func testStartsWithSameUnicodeScalarsWorksOnPrefix() throws {
XCTAssertTrue("beer is good".startsWithSameUnicodeScalars(string: "beer"))
func testStartsWithSameUnicodeScalarsSaysNoForTheSameStringInDifferentNormalisations() throws {
let nfcEncodedEAigu = "\u{e9}"
let nfdEncodedEAigu = "\u{65}\u{301}"
XCTAssertEqual(nfcEncodedEAigu, nfdEncodedEAigu)
XCTAssertTrue(nfcEncodedEAigu.startsWithSameUnicodeScalars(string: nfcEncodedEAigu))
XCTAssertTrue(nfdEncodedEAigu.startsWithSameUnicodeScalars(string: nfdEncodedEAigu))
// the both do _not_ start like the other
XCTAssertFalse(nfcEncodedEAigu.startsWithSameUnicodeScalars(string: nfdEncodedEAigu))
XCTAssertFalse(nfdEncodedEAigu.startsWithSameUnicodeScalars(string: nfcEncodedEAigu))
func testStartsWithSaysYesForTheSameStringInDifferentNormalisations() throws {
let nfcEncodedEAigu = "\u{e9}"
let nfdEncodedEAigu = "\u{65}\u{301}"
XCTAssertEqual(nfcEncodedEAigu, nfdEncodedEAigu)
XCTAssertTrue(nfcEncodedEAigu.starts(with: nfcEncodedEAigu))
XCTAssertTrue(nfdEncodedEAigu.starts(with: nfdEncodedEAigu))
// the both do start like the other as we do unicode normalisation
XCTAssertTrue(nfcEncodedEAigu.starts(with: nfdEncodedEAigu))
XCTAssertTrue(nfdEncodedEAigu.starts(with: nfcEncodedEAigu))

View File

@ -2,6 +2,7 @@
- renamed all instances of `ctx` to `context`. Your `ChannelHandler` methods now
need to take a `context` parameter and no longer `ctx`. Example: `func channelRead(context: ChannelHandlerContext, data: NIOAny)`
- `HTTPResponseCompressor` moved to [`swift-nio-extras`](
- removed all previously deprecated functions, types and modules.
- renamed `SniResult` to `SNIResult`
- renamed `SniHandler` to `SNIHandler`