Add FileRegion support when using HTTP
Motivation: Often people want to transfer large files over http which should be done using sendfile(...) for best performance. Modifications: - Add support for using FileRegion when sending body parts. - Add tests Result: Be able to serve files without loading these into memory
This commit is contained in:
parent
8f2d17bc55
commit
22a35b2bdb
|
@ -29,3 +29,14 @@ extension IOData: Equatable {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public extension IOData {
|
||||
var readableBytes: Int {
|
||||
switch self {
|
||||
case .byteBuffer(let buf):
|
||||
return buf.readableBytes
|
||||
case .fileRegion(let region):
|
||||
return region.readableBytes
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@ import Foundation // TODO(JW): investigate linker errors if this is missing
|
|||
|
||||
public final class HTTPResponseEncoder : ChannelOutboundHandler {
|
||||
public typealias OutboundIn = HTTPResponsePart
|
||||
public typealias OutboundOut = ByteBuffer
|
||||
public typealias OutboundOut = IOData
|
||||
|
||||
private var isChunked = false
|
||||
private var scratchBuffer: ByteBuffer
|
||||
|
@ -26,7 +26,7 @@ public final class HTTPResponseEncoder : ChannelOutboundHandler {
|
|||
self.scratchBuffer = allocator.buffer(capacity: 256)
|
||||
}
|
||||
|
||||
private func writeChunk(ctx: ChannelHandlerContext, chunk: ByteBuffer, promise: Promise<Void>?) {
|
||||
private func writeChunk(ctx: ChannelHandlerContext, chunk: IOData, promise: Promise<Void>?) {
|
||||
let (mW1, mW2, mW3): (Promise<()>?, Promise<()>?, Promise<()>?)
|
||||
|
||||
switch (self.isChunked, promise) {
|
||||
|
@ -43,19 +43,23 @@ public final class HTTPResponseEncoder : ChannelOutboundHandler {
|
|||
(mW1, mW2, mW3) = (nil, nil, nil)
|
||||
}
|
||||
|
||||
let readableBytes = chunk.readableBytes
|
||||
|
||||
/* we don't want to copy the chunk unnecessarily and therefore call write an annoyingly large number of times */
|
||||
if self.isChunked {
|
||||
self.scratchBuffer.clear()
|
||||
let len = String(chunk.readableBytes, radix: 16)
|
||||
let len = String(readableBytes, radix: 16)
|
||||
self.scratchBuffer.write(string: len)
|
||||
self.scratchBuffer.write(staticString: "\r\n")
|
||||
ctx.write(data: self.wrapOutboundOut(self.scratchBuffer), promise: mW1)
|
||||
ctx.write(data: self.wrapOutboundOut(.byteBuffer(self.scratchBuffer)), promise: mW1)
|
||||
}
|
||||
|
||||
ctx.write(data: self.wrapOutboundOut(chunk), promise: mW2)
|
||||
|
||||
if self.isChunked {
|
||||
self.scratchBuffer.clear()
|
||||
self.scratchBuffer.write(staticString: "\r\n")
|
||||
ctx.write(data: self.wrapOutboundOut(self.scratchBuffer), promise: mW3)
|
||||
ctx.write(data: self.wrapOutboundOut(.byteBuffer(self.scratchBuffer)), promise: mW3)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -72,9 +76,9 @@ public final class HTTPResponseEncoder : ChannelOutboundHandler {
|
|||
self.scratchBuffer.write(staticString: "\r\n")
|
||||
response.headers.write(buffer: &self.scratchBuffer)
|
||||
|
||||
ctx.write(data: self.wrapOutboundOut(self.scratchBuffer), promise: promise)
|
||||
case .some(.body(let buffer)):
|
||||
self.writeChunk(ctx: ctx, chunk: buffer, promise: promise)
|
||||
ctx.write(data: self.wrapOutboundOut(.byteBuffer(self.scratchBuffer)), promise: promise)
|
||||
case .some(.body(let bodyPart)):
|
||||
self.writeChunk(ctx: ctx, chunk: bodyPart, promise: promise)
|
||||
case .some(.end(let trailers)):
|
||||
switch (self.isChunked, promise) {
|
||||
case (true, let p):
|
||||
|
@ -85,12 +89,12 @@ public final class HTTPResponseEncoder : ChannelOutboundHandler {
|
|||
} else {
|
||||
self.scratchBuffer.write(staticString: "0\r\n\r\n")
|
||||
}
|
||||
ctx.write(data: self.wrapOutboundOut(self.scratchBuffer), promise: p)
|
||||
ctx.write(data: self.wrapOutboundOut(.byteBuffer(self.scratchBuffer)), promise: p)
|
||||
case (false, .some(let p)):
|
||||
// Not chunked so we have nothing to write. However, we don't want to satisfy this promise out-of-order
|
||||
// so we issue a zero-length write down the chain.
|
||||
let buf = ctx.channel!.allocator.buffer(capacity: 0)
|
||||
ctx.write(data: self.wrapOutboundOut(buf), promise: p)
|
||||
ctx.write(data: self.wrapOutboundOut(.byteBuffer(buf)), promise: p)
|
||||
case (false, .none):
|
||||
break
|
||||
}
|
||||
|
|
|
@ -79,7 +79,7 @@ public extension HTTPRequestHead {
|
|||
|
||||
public enum HTTPResponsePart {
|
||||
case head(HTTPResponseHead)
|
||||
case body(ByteBuffer)
|
||||
case body(IOData)
|
||||
case end(HTTPHeaders?)
|
||||
}
|
||||
|
||||
|
|
|
@ -44,7 +44,7 @@ private class HTTPHandler : ChannelInboundHandler {
|
|||
case .body:
|
||||
break
|
||||
case .end:
|
||||
let content = HTTPResponsePart.body(buffer!.slice())
|
||||
let content = HTTPResponsePart.body(.byteBuffer(buffer!.slice()))
|
||||
ctx.write(data: self.wrapOutboundOut(content), promise: nil)
|
||||
|
||||
if keepAlive {
|
||||
|
|
|
@ -26,10 +26,14 @@ extension HTTPServerClientTest {
|
|||
|
||||
static var allTests : [(String, (HTTPServerClientTest) -> () throws -> Void)] {
|
||||
return [
|
||||
("testSimpleGet", testSimpleGet),
|
||||
("testSimpleGetChunkedEncoding", testSimpleGetChunkedEncoding),
|
||||
("testSimpleGetTrailers", testSimpleGetTrailers),
|
||||
("testMassiveResponse", testMassiveResponse),
|
||||
("testSimpleGetByteBuffer", testSimpleGetByteBuffer),
|
||||
("testSimpleGetFileRegion", testSimpleGetFileRegion),
|
||||
("testSimpleGetChunkedEncodingByteBuffer", testSimpleGetChunkedEncodingByteBuffer),
|
||||
("testSimpleGetChunkedEncodingFileRegion", testSimpleGetChunkedEncodingFileRegion),
|
||||
("testSimpleGetTrailersByteBuffer", testSimpleGetTrailersByteBuffer),
|
||||
("testSimpleGetTrailersFileRegion", testSimpleGetTrailersFileRegion),
|
||||
("testMassiveResponseByteBuffer", testMassiveResponseByteBuffer),
|
||||
("testMassiveResponseFileRegion", testMassiveResponseFileRegion),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -81,10 +81,53 @@ class HTTPServerClientTest : XCTestCase {
|
|||
return bytes
|
||||
}()
|
||||
|
||||
enum SendMode {
|
||||
case byteBuffer
|
||||
case fileRegion
|
||||
}
|
||||
|
||||
private class SimpleHTTPServer: ChannelInboundHandler {
|
||||
typealias InboundIn = HTTPRequestPart
|
||||
typealias OutboundOut = HTTPResponsePart
|
||||
|
||||
private let mode: SendMode
|
||||
private let fileManager = FileManager.default
|
||||
private var files: [String] = Array()
|
||||
|
||||
init(_ mode: SendMode) {
|
||||
self.mode = mode
|
||||
}
|
||||
|
||||
private func outboundBody(_ buffer: ByteBuffer) -> HTTPResponsePart {
|
||||
switch mode {
|
||||
case .byteBuffer:
|
||||
return .body(.byteBuffer(buffer))
|
||||
case .fileRegion:
|
||||
let filePath: String
|
||||
#if os(Linux)
|
||||
filePath = "/tmp/\(UUID().uuidString)"
|
||||
#else
|
||||
if #available(OSX 10.12, *) {
|
||||
filePath = "\(fileManager.temporaryDirectory.path)/\(UUID().uuidString)"
|
||||
} else {
|
||||
filePath = "/tmp/\(UUID().uuidString)"
|
||||
}
|
||||
#endif
|
||||
files.append(filePath)
|
||||
|
||||
let content = buffer.data(at: 0, length: buffer.readableBytes)!
|
||||
try! content.write(to: URL(fileURLWithPath: filePath))
|
||||
let region = try! FileRegion(file: filePath, readerIndex: 0, endIndex: buffer.readableBytes)
|
||||
return .body(.fileRegion(region))
|
||||
}
|
||||
}
|
||||
|
||||
public func handleRemoved(ctx: ChannelHandlerContext) {
|
||||
for f in files {
|
||||
_ = try? fileManager.removeItem(atPath: f)
|
||||
}
|
||||
}
|
||||
|
||||
public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
|
||||
switch self.unwrapInboundIn(data) {
|
||||
case .head(let req):
|
||||
|
@ -98,7 +141,8 @@ class HTTPServerClientTest : XCTestCase {
|
|||
ctx.write(data: self.wrapOutboundOut(r), promise: nil)
|
||||
var b = ctx.channel!.allocator.buffer(capacity: replyString.count)
|
||||
b.write(string: replyString)
|
||||
ctx.write(data: self.wrapOutboundOut(.body(b)), promise: nil)
|
||||
|
||||
ctx.write(data: self.wrapOutboundOut(self.outboundBody(b)), promise: nil)
|
||||
ctx.write(data: self.wrapOutboundOut(.end(nil))).whenComplete { r in
|
||||
assertSuccess(r)
|
||||
ctx.close().whenComplete { r in
|
||||
|
@ -116,7 +160,8 @@ class HTTPServerClientTest : XCTestCase {
|
|||
for i in 1...10 {
|
||||
b.clear()
|
||||
b.write(string: "\(i)")
|
||||
ctx.write(data: self.wrapOutboundOut(.body(b))).whenComplete { r in
|
||||
|
||||
ctx.write(data: self.wrapOutboundOut(self.outboundBody(b))).whenComplete { r in
|
||||
assertSuccess(r)
|
||||
}
|
||||
}
|
||||
|
@ -138,7 +183,8 @@ class HTTPServerClientTest : XCTestCase {
|
|||
for i in 1...10 {
|
||||
b.clear()
|
||||
b.write(string: "\(i)")
|
||||
ctx.write(data: self.wrapOutboundOut(.body(b))).whenComplete { r in
|
||||
|
||||
ctx.write(data: self.wrapOutboundOut(self.outboundBody(b))).whenComplete { r in
|
||||
assertSuccess(r)
|
||||
}
|
||||
}
|
||||
|
@ -170,7 +216,7 @@ class HTTPServerClientTest : XCTestCase {
|
|||
ctx.write(data: self.wrapOutboundOut(r)).whenComplete { r in
|
||||
assertSuccess(r)
|
||||
}
|
||||
ctx.writeAndFlush(data: self.wrapOutboundOut(.body(buf))).whenComplete { r in
|
||||
ctx.writeAndFlush(data: self.wrapOutboundOut(self.outboundBody(buf))).whenComplete { r in
|
||||
assertSuccess(r)
|
||||
}
|
||||
ctx.write(data: self.wrapOutboundOut(.end(nil))).whenComplete { r in
|
||||
|
@ -196,7 +242,15 @@ class HTTPServerClientTest : XCTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
func testSimpleGet() throws {
|
||||
func testSimpleGetByteBuffer() throws {
|
||||
try testSimpleGet(.byteBuffer)
|
||||
}
|
||||
|
||||
func testSimpleGetFileRegion() throws {
|
||||
try testSimpleGet(.fileRegion)
|
||||
}
|
||||
|
||||
private func testSimpleGet(_ mode: SendMode) throws {
|
||||
let group = try MultiThreadedEventLoopGroup(numThreads: 1)
|
||||
defer {
|
||||
try! group.syncShutdownGracefully()
|
||||
|
@ -215,7 +269,7 @@ class HTTPServerClientTest : XCTestCase {
|
|||
XCTAssert(actual.contains(expectedHeaderContentLength))
|
||||
}
|
||||
let numBytes = 16 * 1024
|
||||
let httpHandler = SimpleHTTPServer()
|
||||
let httpHandler = SimpleHTTPServer(mode)
|
||||
let serverChannel = try ServerBootstrap(group: group)
|
||||
.option(option: ChannelOptions.Socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
|
||||
|
||||
|
@ -249,7 +303,15 @@ class HTTPServerClientTest : XCTestCase {
|
|||
accumulation.syncWaitForCompletion()
|
||||
}
|
||||
|
||||
func testSimpleGetChunkedEncoding() throws {
|
||||
func testSimpleGetChunkedEncodingByteBuffer() throws {
|
||||
try testSimpleGetChunkedEncoding(.byteBuffer)
|
||||
}
|
||||
|
||||
func testSimpleGetChunkedEncodingFileRegion() throws {
|
||||
try testSimpleGetChunkedEncoding(.fileRegion)
|
||||
}
|
||||
|
||||
private func testSimpleGetChunkedEncoding(_ mode: SendMode) throws {
|
||||
let group = try MultiThreadedEventLoopGroup(numThreads: 1)
|
||||
defer {
|
||||
try! group.syncShutdownGracefully()
|
||||
|
@ -264,7 +326,7 @@ class HTTPServerClientTest : XCTestCase {
|
|||
XCTAssert(actual.contains("\r\ntransfer-encoding: chunked\r\n"))
|
||||
}
|
||||
let numBytes = 16 * 1024
|
||||
let httpHandler = SimpleHTTPServer()
|
||||
let httpHandler = SimpleHTTPServer(mode)
|
||||
let serverChannel = try ServerBootstrap(group: group)
|
||||
.option(option: ChannelOptions.Socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
|
||||
|
||||
|
@ -298,7 +360,15 @@ class HTTPServerClientTest : XCTestCase {
|
|||
accumulation.syncWaitForCompletion()
|
||||
}
|
||||
|
||||
func testSimpleGetTrailers() throws {
|
||||
func testSimpleGetTrailersByteBuffer() throws {
|
||||
try testSimpleGetTrailers(.byteBuffer)
|
||||
}
|
||||
|
||||
func testSimpleGetTrailersFileRegion() throws {
|
||||
try testSimpleGetTrailers(.fileRegion)
|
||||
}
|
||||
|
||||
private func testSimpleGetTrailers(_ mode: SendMode) throws {
|
||||
let group = try MultiThreadedEventLoopGroup(numThreads: 1)
|
||||
defer {
|
||||
try! group.syncShutdownGracefully()
|
||||
|
@ -317,7 +387,7 @@ class HTTPServerClientTest : XCTestCase {
|
|||
XCTAssert(actual.contains("\r\ntransfer-encoding: chunked\r\n"))
|
||||
}
|
||||
let numBytes = 16 * 1024
|
||||
let httpHandler = SimpleHTTPServer()
|
||||
let httpHandler = SimpleHTTPServer(mode)
|
||||
let serverChannel = try ServerBootstrap(group: group)
|
||||
.option(option: ChannelOptions.Socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
|
||||
.handler(childHandler: ChannelInitializer(initChannel: { channel in
|
||||
|
@ -348,7 +418,15 @@ class HTTPServerClientTest : XCTestCase {
|
|||
accumulation.syncWaitForCompletion()
|
||||
}
|
||||
|
||||
func testMassiveResponse() throws {
|
||||
func testMassiveResponseByteBuffer() throws {
|
||||
try testMassiveResponse(.byteBuffer)
|
||||
}
|
||||
|
||||
func testMassiveResponseFileRegion() throws {
|
||||
try testMassiveResponse(.fileRegion)
|
||||
}
|
||||
|
||||
func testMassiveResponse(_ mode: SendMode) throws {
|
||||
let group = try MultiThreadedEventLoopGroup(numThreads: 1)
|
||||
defer {
|
||||
try! group.syncShutdownGracefully()
|
||||
|
@ -363,7 +441,7 @@ class HTTPServerClientTest : XCTestCase {
|
|||
XCTAssert(expectedSuffix.elementsEqual(actualSuffix))
|
||||
}
|
||||
let numBytes = 16 * 1024
|
||||
let httpHandler = SimpleHTTPServer()
|
||||
let httpHandler = SimpleHTTPServer(mode)
|
||||
let serverChannel = try ServerBootstrap(group: group)
|
||||
.option(option: ChannelOptions.Socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
|
||||
|
||||
|
@ -396,6 +474,4 @@ class HTTPServerClientTest : XCTestCase {
|
|||
try clientChannel.writeAndFlush(data: NIOAny(buffer)).wait()
|
||||
accumulation.syncWaitForCompletion()
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue