non-blocking file IO
This commit is contained in:
parent
676453c8b9
commit
755ea5aedc
|
@ -31,6 +31,8 @@ let sysFree = free
|
|||
|
||||
/// The preferred allocator for `ByteBuffer` values. The allocation strategy is opaque but is currently libc's
|
||||
/// `malloc`, `realloc` and `free`.
|
||||
///
|
||||
/// - note: `ByteBufferAllocator` is thread-safe.
|
||||
public struct ByteBufferAllocator {
|
||||
|
||||
/// Create a fresh `ByteBufferAllocator`. In the future the allocator might use for example allocation pools and
|
||||
|
|
|
@ -101,7 +101,18 @@ extension FileRegion {
|
|||
/// - readerIndex: the index (offset) on which the reading will start.
|
||||
/// - endIndex: the index which represents the end of the readable portion.
|
||||
public convenience init(file: String, readerIndex: Int, endIndex: Int) throws {
|
||||
let fd = try Posix.open(file: file, oFlag: O_RDONLY)
|
||||
let fd = try Posix.open(file: file, oFlag: O_RDONLY | O_CLOEXEC)
|
||||
self.init(descriptor: Int32(fd), readerIndex: readerIndex, endIndex: endIndex)
|
||||
}
|
||||
|
||||
/// Create a new `FileRegion` forming a complete file.
|
||||
///
|
||||
/// - parameters:
|
||||
/// - file: the name of the file to open. The ownership of the file descriptor is transferred to this `FileRegion` and so it will be closed once `close` is called.
|
||||
public convenience init(file: String) throws {
|
||||
let fd = try Posix.open(file: file, oFlag: O_RDONLY | O_CLOEXEC)
|
||||
let eof = try Posix.lseek(descriptor: fd, offset: 0, whence: SEEK_END)
|
||||
try Posix.lseek(descriptor: fd, offset: 0, whence: SEEK_SET)
|
||||
self.init(descriptor: Int32(fd), readerIndex: 0, endIndex: Int(eof))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,359 @@
|
|||
//===----------------------------------------------------------------------===//
|
||||
//
|
||||
// This source file is part of the SwiftNIO open source project
|
||||
//
|
||||
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
|
||||
// Licensed under Apache License v2.0
|
||||
//
|
||||
// See LICENSE.txt for license information
|
||||
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
import NIOConcurrencyHelpers
|
||||
import Dispatch
|
||||
|
||||
private final class FileIOThreadPool {
|
||||
public enum WorkItemState {
|
||||
case active
|
||||
case cancelled
|
||||
}
|
||||
public typealias WorkItem = (WorkItemState) -> Void
|
||||
private enum State {
|
||||
/// The `FileIOThreadPool` is already stopped.
|
||||
case stopped
|
||||
/// The `FileIOThreadPool` is shutting down, the array has one boolean entry for each thread indicating if it has shut down already.
|
||||
case shuttingDown([Bool])
|
||||
/// The `FileIOThreadPool` is up and running, the `CircularBuffer` containing the yet unprocessed `WorkItems`.
|
||||
case running(CircularBuffer<WorkItem>)
|
||||
}
|
||||
private let semaphore = DispatchSemaphore(value: 0)
|
||||
private let lock = Lock()
|
||||
private let queues: [DispatchQueue]
|
||||
private var state: State = .stopped
|
||||
private let numberOfThreads: Int
|
||||
|
||||
public func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
|
||||
let g = DispatchGroup()
|
||||
self.lock.withLock {
|
||||
switch self.state {
|
||||
case .running(let items):
|
||||
items.forEach { $0(.cancelled) }
|
||||
self.state = .shuttingDown(Array(repeating: true, count: numberOfThreads))
|
||||
(0..<numberOfThreads).forEach { _ in
|
||||
self.semaphore.signal()
|
||||
}
|
||||
case .shuttingDown, .stopped:
|
||||
()
|
||||
}
|
||||
|
||||
self.queues.forEach { q in
|
||||
q.async(group: g) {}
|
||||
}
|
||||
|
||||
g.notify(queue: queue) {
|
||||
callback(nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public func submit(_ fn: @escaping WorkItem) {
|
||||
let item = self.lock.withLock { () -> WorkItem? in
|
||||
switch self.state {
|
||||
case .running(var items):
|
||||
items.append(fn)
|
||||
self.state = .running(items)
|
||||
self.semaphore.signal()
|
||||
return nil
|
||||
case .shuttingDown, .stopped:
|
||||
return fn
|
||||
}
|
||||
}
|
||||
/* if item couldn't be added run it immediately indicating that it couldn't be run */
|
||||
item.map { $0(.cancelled) }
|
||||
}
|
||||
|
||||
internal init(numberOfThreads: Int) {
|
||||
self.numberOfThreads = numberOfThreads
|
||||
self.queues = (0..<numberOfThreads).map {
|
||||
DispatchQueue(label: "FileIOThreadPool thread #\($0)")
|
||||
}
|
||||
}
|
||||
|
||||
private func process(identifier: Int) {
|
||||
var item: WorkItem? = nil
|
||||
repeat {
|
||||
/* wait until work has become available */
|
||||
self.semaphore.wait()
|
||||
|
||||
item = self.lock.withLock { () -> (WorkItem)? in
|
||||
switch self.state {
|
||||
case .running(var items):
|
||||
let item = items.removeFirst()
|
||||
self.state = .running(items)
|
||||
return item
|
||||
case .shuttingDown(var aliveStates):
|
||||
assert(aliveStates[identifier])
|
||||
aliveStates[identifier] = false
|
||||
self.state = .shuttingDown(aliveStates)
|
||||
return nil
|
||||
case .stopped:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
/* if there was a work item popped, run it */
|
||||
item.map { $0(.active) }
|
||||
} while item != nil
|
||||
}
|
||||
|
||||
internal func start() {
|
||||
self.lock.withLock {
|
||||
self.state = .running(CircularBuffer(initialRingCapacity: 16))
|
||||
}
|
||||
self.queues.enumerated().forEach { idAndQueue in
|
||||
let id = idAndQueue.0
|
||||
let q = idAndQueue.1
|
||||
q.async { [unowned self] in
|
||||
self.process(identifier: id)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// `NonBlockingFileIO` is a helper that allows you to read files without blocking the calling thread.
|
||||
///
|
||||
/// It is worth noting that `kqueue`, `epoll` or `poll` returning claiming a file is readable does not mean that the
|
||||
/// data is already available in the kernel's memory. In other words, a `read` from a file can still block even if
|
||||
/// reported as readable. This behaviour is also documented behaviour:
|
||||
///
|
||||
/// - [`poll`](pubs.opengroup.org/onlinepubs/009695399/functions/poll.html): "Regular files shall always poll TRUE for reading and writing."
|
||||
/// - [`epoll`](man7.org/linux/man-pages/man7/epoll.7.html): "epoll is simply a faster poll(2), and can be used wherever the latter is used since it shares the same semantics."
|
||||
/// - [`kqueue`](https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2): "Returns when the file pointer is not at the end of file."
|
||||
///
|
||||
/// `NonBlockingFileIO` helps to work around this issue by maintaining its own thread pool that is used to read the data
|
||||
/// from the files into memory. It will then hand the (in-memory) data back which makes it available without the possibility
|
||||
/// of blocking.
|
||||
public struct NonBlockingFileIO {
|
||||
/// The default and recommended size for `NonBlockingFileIO`'s thread pool.
|
||||
public static let defaultThreadPoolSize = 2
|
||||
|
||||
/// The default and recommended chunk size.
|
||||
public static let defaultChunkSize = 128*1024
|
||||
|
||||
/// `NonBlockingFileIO` errors.
|
||||
public enum Error: Swift.Error {
|
||||
/// `NonBlockingFileIO` is meant to be used with file descriptors that are set to the default (blocking) mode.
|
||||
/// It doesn't make sense to use it with a file descriptor where `O_NONBLOCK` is set therefore this error is
|
||||
/// raised when that was requested.
|
||||
case descriptorSetToNonBlocking
|
||||
}
|
||||
|
||||
private let threadPool: FileIOThreadPool
|
||||
|
||||
/// Initialize a `NonBlockingFileIO` thread pool with `numberOfThreads` threads.
|
||||
///
|
||||
/// - parameters:
|
||||
/// - numberOfThreads: The number of threads to use for the thread pool.
|
||||
public init(numberOfThreads: Int = NonBlockingFileIO.defaultThreadPoolSize) {
|
||||
self.threadPool = FileIOThreadPool(numberOfThreads: numberOfThreads)
|
||||
self.threadPool.start()
|
||||
}
|
||||
|
||||
/// Read a `FileRegion` in chunks of `chunkSize` bytes on `NonBlockingFileIO`'s private thread
|
||||
/// pool which is separate from any `EventLoop` thread.
|
||||
///
|
||||
/// `chunkHandler` will be called on `eventLoop` for every chunk that was read. Assuming `fileRegion.readableBytes` is greater than
|
||||
/// zero and there are enough bytes available `chunkHandler` will be called `1 + |_ fileRegion.readableBytes / chunkSize _|`
|
||||
/// times, delivering `chunkSize` bytes each time. If less than `fileRegion.readableBytes` bytes can be read from the file,
|
||||
/// `chunkHandler` will be called less often with the last invocation possibly being of less than `chunkSize` bytes.
|
||||
///
|
||||
/// The allocation and reading of a subsequent chunk will only be attempted when `chunkHandler` suceeds.
|
||||
///
|
||||
/// - parameters:
|
||||
/// - fileRegion: The file region to read.
|
||||
/// - chunkSize: The size of the individual chunks to deliver.
|
||||
/// - allocator: A `ByteBufferAllocator` used to allocate space for the chunks.
|
||||
/// - eventLoop: The `EventLoop` to call `chunkHandler` on.
|
||||
/// - chunkHandler: Called for every chunk read. The next chunk will be read upon successful completion of the returned `EventLoopFuture`. If the returned `EventLoopFuture` fails, the overall operation is aborted.
|
||||
/// - returns: An `EventLoopFuture` which is the result of the overall operation. If either the reading of `descriptor` or `chunkHandler` fails, the `EventLoopFuture` will fail too. If the reading of `descriptor` as well as `chunkHandler` always succeeded, the `EventLoopFuture` will succeed too.
|
||||
public func readChunked(fileRegion: FileRegion,
|
||||
chunkSize: Int = NonBlockingFileIO.defaultChunkSize,
|
||||
allocator: ByteBufferAllocator,
|
||||
eventLoop: EventLoop,
|
||||
chunkHandler: @escaping (ByteBuffer) -> EventLoopFuture<()>) -> EventLoopFuture<()> {
|
||||
do {
|
||||
_ = try Posix.lseek(descriptor: fileRegion.descriptor, offset: off_t(fileRegion.readerIndex), whence: SEEK_SET)
|
||||
return self.readChunked(descriptor: fileRegion.descriptor,
|
||||
byteCount: fileRegion.readableBytes,
|
||||
chunkSize: chunkSize,
|
||||
allocator: allocator,
|
||||
eventLoop: eventLoop,
|
||||
chunkHandler: chunkHandler)
|
||||
} catch {
|
||||
return eventLoop.newFailedFuture(error: error)
|
||||
}
|
||||
}
|
||||
|
||||
/// Read `byteCount` bytes in chunks of `chunkSize` bytes from `descriptor` in `NonBlockingFileIO`'s private thread
|
||||
/// pool which is separate from any `EventLoop` thread.
|
||||
///
|
||||
/// `chunkHandler` will be called on `eventLoop` for every chunk that was read. Assuming `byteCount` is greater than
|
||||
/// zero and there are enough bytes available `chunkHandler` will be called `1 + |_ byteCount / chunkSize _|`
|
||||
/// times, delivering `chunkSize` bytes each time. If less than `byteCount` bytes can be read from `descriptor`,
|
||||
/// `chunkHandler` will be called less often with the last invocation possibly being of less than `chunkSize` bytes.
|
||||
///
|
||||
/// The allocation and reading of a subsequent chunk will only be attempted when `chunkHandler` suceeds.
|
||||
///
|
||||
/// - note: `readChunked(fileRegion:chunkSize:allocator:eventLoop:chunkHandler:)` should be preferred as it uses `FileRegion` object instead of raw file descriptors.
|
||||
///
|
||||
/// - parameters:
|
||||
/// - descriptor: The file descriptor to read.
|
||||
/// - byteCount: The number of bytes to read from `descriptor`.
|
||||
/// - chunkSize: The size of the individual chunks to deliver.
|
||||
/// - allocator: A `ByteBufferAllocator` used to allocate space for the chunks.
|
||||
/// - eventLoop: The `EventLoop` to call `chunkHandler` on.
|
||||
/// - chunkHandler: Called for every chunk read. The next chunk will be read upon successful completion of the returned `EventLoopFuture`. If the returned `EventLoopFuture` fails, the overall operation is aborted.
|
||||
/// - returns: An `EventLoopFuture` which is the result of the overall operation. If either the reading of `descriptor` or `chunkHandler` fails, the `EventLoopFuture` will fail too. If the reading of `descriptor` as well as `chunkHandler` always succeeded, the `EventLoopFuture` will succeed too.
|
||||
public func readChunked(descriptor fd: CInt,
|
||||
byteCount: Int,
|
||||
chunkSize: Int = NonBlockingFileIO.defaultChunkSize,
|
||||
allocator: ByteBufferAllocator,
|
||||
eventLoop: EventLoop, chunkHandler: @escaping (ByteBuffer) -> EventLoopFuture<()>) -> EventLoopFuture<()> {
|
||||
precondition(chunkSize > 0, "chunkSize must be > 0 (is \(chunkSize))")
|
||||
var remainingReads = 1 + (byteCount / chunkSize)
|
||||
let lastReadSize = byteCount % chunkSize
|
||||
|
||||
func _read(remainingReads: Int) -> EventLoopFuture<()> {
|
||||
if remainingReads > 1 || (remainingReads == 1 && lastReadSize > 0) {
|
||||
let readSize = remainingReads > 1 ? chunkSize : lastReadSize
|
||||
assert(readSize > 0)
|
||||
return self.read(descriptor: fd, byteCount: readSize, allocator: allocator, eventLoop: eventLoop).then { buffer in
|
||||
return chunkHandler(buffer).then { () -> EventLoopFuture<()> in
|
||||
assert(eventLoop.inEventLoop)
|
||||
return _read(remainingReads: remainingReads - 1)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return eventLoop.newSucceedFuture(result: ())
|
||||
}
|
||||
}
|
||||
|
||||
return _read(remainingReads: remainingReads)
|
||||
}
|
||||
|
||||
/// Read a `FileRegion` in `NonBlockingFileIO`'s private thread pool which is separate from any `EventLoop` thread.
|
||||
///
|
||||
/// The returned `ByteBuffer` will not have less than `fileRegion.readableBytes` unless we hit end-of-file in which
|
||||
/// case the `ByteBuffer` will contain the bytes available to read.
|
||||
///
|
||||
/// - note: Only use this function for small enough `FileRegion`s as it will need to allocate enough memory to hold `fileRegion.readableBytes` bytes.
|
||||
/// - note: In most cases you should prefer one of the `readChunked` functions.
|
||||
///
|
||||
/// - parameters:
|
||||
/// - fileRegion: The file region to read.
|
||||
/// - allocator: A `ByteBufferAllocator` used to allocate space for the returned `ByteBuffer`.
|
||||
/// - eventLoop: The `EventLoop` to create the returned `EventLoopFuture` from.
|
||||
/// - returns: An `EventLoopFuture` which delivers a `ByteBuffer` if the read was successful or a failure on error.
|
||||
public func read(fileRegion: FileRegion, allocator: ByteBufferAllocator, eventLoop: EventLoop) -> EventLoopFuture<ByteBuffer> {
|
||||
do {
|
||||
_ = try Posix.lseek(descriptor: fileRegion.descriptor, offset: off_t(fileRegion.readerIndex), whence: SEEK_SET)
|
||||
return self.read(descriptor: fileRegion.descriptor,
|
||||
byteCount: fileRegion.readableBytes,
|
||||
allocator: allocator,
|
||||
eventLoop: eventLoop)
|
||||
} catch {
|
||||
return eventLoop.newFailedFuture(error: error)
|
||||
}
|
||||
}
|
||||
|
||||
/// Read `byteCount` bytes from `descriptor` in `NonBlockingFileIO`'s private thread pool which is separate from any `EventLoop` thread.
|
||||
///
|
||||
/// The returned `ByteBuffer` will not have less than `byteCount` bytes unless we hit end-of-file in which
|
||||
/// case the `ByteBuffer` will contain the bytes available to read.
|
||||
///
|
||||
/// - note: Only use this function for small enough `byteCount`s as it will need to allocate enough memory to hold `byteCount` bytes.
|
||||
/// - note: `read(fileRegion:allocator:eventLoop:)` should be preferred as it uses `FileRegion` object instead of raw file descriptors.
|
||||
///
|
||||
/// - parameters:
|
||||
/// - descriptor: The file descriptor to read.
|
||||
/// - byteCount: The number of bytes to read from `descriptor`.
|
||||
/// - allocator: A `ByteBufferAllocator` used to allocate space for the returned `ByteBuffer`.
|
||||
/// - eventLoop: The `EventLoop` to create the returned `EventLoopFuture` from.
|
||||
/// - returns: An `EventLoopFuture` which delivers a `ByteBuffer` if the read was successful or a failure on error.
|
||||
public func read(descriptor: CInt, byteCount: Int, allocator: ByteBufferAllocator, eventLoop: EventLoop) -> EventLoopFuture<ByteBuffer> {
|
||||
guard byteCount > 0 else {
|
||||
return eventLoop.newSucceedFuture(result: allocator.buffer(capacity: 0))
|
||||
}
|
||||
|
||||
let p: EventLoopPromise<ByteBuffer> = eventLoop.newPromise()
|
||||
var buf = allocator.buffer(capacity: byteCount)
|
||||
self.threadPool.submit { shouldRun in
|
||||
guard case shouldRun = FileIOThreadPool.WorkItemState.active else {
|
||||
p.fail(error: ChannelError.ioOnClosedChannel)
|
||||
return
|
||||
}
|
||||
|
||||
var bytesRead = 0
|
||||
while bytesRead < byteCount {
|
||||
do {
|
||||
let n = try buf.writeWithUnsafeMutableBytes { ptr in
|
||||
let res = try Posix.read(descriptor: descriptor,
|
||||
pointer: ptr.baseAddress!.assumingMemoryBound(to: UInt8.self),
|
||||
size: byteCount - bytesRead)
|
||||
switch res {
|
||||
case .processed(let n):
|
||||
assert(n >= 0, "read claims to have read a negative number of bytes \(n)")
|
||||
return n
|
||||
case .wouldBlock(_):
|
||||
throw Error.descriptorSetToNonBlocking
|
||||
}
|
||||
}
|
||||
if n == 0 {
|
||||
// EOF
|
||||
break
|
||||
} else {
|
||||
bytesRead += n
|
||||
}
|
||||
} catch {
|
||||
p.fail(error: error)
|
||||
return
|
||||
}
|
||||
}
|
||||
p.succeed(result: buf)
|
||||
}
|
||||
return p.futureResult
|
||||
}
|
||||
|
||||
/// Shut this `NonBlockingFileIO` down.
|
||||
///
|
||||
/// - parameters:
|
||||
/// - queue: The `DispatchQueue` on which to call `callback`.
|
||||
/// - callback: The callback and optionally an `Error` on why the operation failed.
|
||||
func shutdownGracefully(queue: DispatchQueue = .global(), _ callback: @escaping (Swift.Error?) -> Void) {
|
||||
self.threadPool.shutdownGracefully(queue: queue, callback)
|
||||
}
|
||||
|
||||
/// Shut this `NonBlockingFileIO` synchronously. This call will block until the `NonBlockingFileIO` and its
|
||||
/// thread pool have shut down fully. This might take some time as outstanding IO requests will first be fulfilled.
|
||||
public func syncShutdownGracefully() throws {
|
||||
let errorStorageLock = Lock()
|
||||
var errorStorage: Swift.Error? = nil
|
||||
let continuation = DispatchWorkItem {}
|
||||
self.shutdownGracefully { error in
|
||||
if let error = error {
|
||||
errorStorageLock.withLock {
|
||||
errorStorage = error
|
||||
}
|
||||
}
|
||||
continuation.perform()
|
||||
}
|
||||
continuation.wait()
|
||||
try errorStorageLock.withLock {
|
||||
if let error = errorStorage {
|
||||
throw error
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -306,6 +306,18 @@ internal enum Posix {
|
|||
})
|
||||
}
|
||||
|
||||
@discardableResult
|
||||
@inline(never)
|
||||
public static func lseek(descriptor: CInt, offset: off_t, whence: CInt) throws -> off_t {
|
||||
return try wrapSyscall({ () -> off_t in
|
||||
#if os(Linux)
|
||||
return Glibc.lseek(descriptor, offset, whence)
|
||||
#else
|
||||
return Darwin.lseek(descriptor, offset, whence)
|
||||
#endif
|
||||
})
|
||||
}
|
||||
|
||||
// Its not really posix but exists on Linux and MacOS / BSD so just put it here for now to keep it simple
|
||||
@inline(never)
|
||||
public static func sendfile(descriptor: Int32, fd: Int32, offset: Int, count: Int) throws -> IOResult<Int> {
|
||||
|
|
|
@ -13,19 +13,23 @@
|
|||
//===----------------------------------------------------------------------===//
|
||||
import NIO
|
||||
import NIOHTTP1
|
||||
import class Foundation.FileManager
|
||||
import class Foundation.NSData
|
||||
import struct Foundation.Data
|
||||
#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS)
|
||||
import class Foundation.NSFileManager
|
||||
import struct Foundation.NSFileManager.FileAttributeKey
|
||||
#else
|
||||
import class Foundation.FileManager
|
||||
import struct Foundation.FileAttributeKey
|
||||
#endif
|
||||
import class Foundation.NSNumber
|
||||
import class Foundation.NSNull /* dummy just to get Foundation to link */
|
||||
|
||||
extension String {
|
||||
func chopPrefix(_ prefix: String) -> String? {
|
||||
if self.hasPrefix(prefix) {
|
||||
return String(self[self.index(self.startIndex, offsetBy: prefix.count)...])
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final class HTTPHandler: ChannelInboundHandler {
|
||||
private enum FileIOMethod {
|
||||
case sendfile
|
||||
case nonblockingFileIO
|
||||
}
|
||||
public typealias InboundIn = HTTPServerRequestPart
|
||||
public typealias OutboundOut = HTTPServerResponsePart
|
||||
|
||||
|
@ -40,9 +44,11 @@ private final class HTTPHandler: ChannelInboundHandler {
|
|||
|
||||
private var handler: ((ChannelHandlerContext, HTTPServerRequestPart) -> Void)? = nil
|
||||
private var handlerFuture: EventLoopFuture<()>?
|
||||
private let fileIO: NonBlockingFileIO
|
||||
|
||||
public init(htdocsPath: String) {
|
||||
public init(fileIO: NonBlockingFileIO, htdocsPath: String) {
|
||||
self.htdocsPath = htdocsPath
|
||||
self.fileIO = fileIO
|
||||
}
|
||||
|
||||
func handleInfo(ctx: ChannelHandlerContext, request: HTTPServerRequestPart) {
|
||||
|
@ -200,7 +206,7 @@ private final class HTTPHandler: ChannelInboundHandler {
|
|||
}
|
||||
}
|
||||
|
||||
func handleFile(ctx: ChannelHandlerContext, request: HTTPServerRequestPart) {
|
||||
private func handleFile(ctx: ChannelHandlerContext, request: HTTPServerRequestPart, ioMethod: FileIOMethod, path: String) {
|
||||
self.buffer.clear()
|
||||
|
||||
switch request {
|
||||
|
@ -211,25 +217,79 @@ private final class HTTPHandler: ChannelInboundHandler {
|
|||
ctx.writeAndFlush(data: self.wrapOutboundOut(.end(nil)), promise: nil)
|
||||
return
|
||||
}
|
||||
let path = self.htdocsPath + "/" + request.uri
|
||||
let path = self.htdocsPath + "/" + path
|
||||
do {
|
||||
let attrs = try FileManager.default.attributesOfItem(atPath: path)
|
||||
let fileSize = (attrs[FileAttributeKey.size] as! NSNumber).intValue
|
||||
let region = try FileRegion(file: path, readerIndex: 0, endIndex: fileSize)
|
||||
let region = try FileRegion(file: path)
|
||||
var response = HTTPResponseHead(version: request.version, status: .ok)
|
||||
|
||||
response.headers.add(name: "Content-Length", value: "\(fileSize)")
|
||||
response.headers.add(name: "Content-Length", value: "\(region.endIndex)")
|
||||
response.headers.add(name: "Content-Type", value: "text/plain; charset=utf-8")
|
||||
ctx.write(data: self.wrapOutboundOut(.head(response)), promise: nil)
|
||||
ctx.writeAndFlush(data: self.wrapOutboundOut(.body(.fileRegion(region)))).whenComplete { _ in
|
||||
_ = try? region.close()
|
||||
|
||||
switch ioMethod {
|
||||
case .nonblockingFileIO:
|
||||
var responseStarted = false
|
||||
self.fileIO.readChunked(fileRegion: region,
|
||||
chunkSize: 32 * 1024,
|
||||
allocator: ctx.channel.allocator,
|
||||
eventLoop: ctx.eventLoop) { buffer in
|
||||
if !responseStarted {
|
||||
responseStarted = true
|
||||
ctx.write(data: self.wrapOutboundOut(.head(response)), promise: nil)
|
||||
}
|
||||
return ctx.writeAndFlush(data: self.wrapOutboundOut(.body(.byteBuffer(buffer))))
|
||||
}.then { _ in
|
||||
ctx.writeAndFlush(data: self.wrapOutboundOut(.end(nil)))
|
||||
}.thenIfError { error in
|
||||
if !responseStarted {
|
||||
let response = HTTPResponseHead(version: request.version, status: .ok)
|
||||
ctx.write(data: self.wrapOutboundOut(.head(response)), promise: nil)
|
||||
var buffer = ctx.channel.allocator.buffer(capacity: 100)
|
||||
buffer.write(string: "fail: \(error.localizedDescription)")
|
||||
ctx.write(data: self.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: nil)
|
||||
return ctx.writeAndFlush(data: self.wrapOutboundOut(.end(nil)))
|
||||
} else {
|
||||
return ctx.close()
|
||||
}
|
||||
}.whenComplete { res in
|
||||
_ = try? region.close()
|
||||
}
|
||||
case .sendfile:
|
||||
ctx.write(data: self.wrapOutboundOut(.head(response))).then { _ in
|
||||
ctx.writeAndFlush(data: self.wrapOutboundOut(.body(.fileRegion(region))))
|
||||
} .then { _ in
|
||||
ctx.writeAndFlush(data: self.wrapOutboundOut(.end(nil)))
|
||||
}.thenIfError { _ in
|
||||
ctx.close()
|
||||
}.whenComplete { _ in
|
||||
_ = try? region.close()
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
let response = HTTPResponseHead(version: request.version, status: .internalServerError)
|
||||
ctx.writeAndFlush(data: self.wrapOutboundOut(.head(response)), promise: nil)
|
||||
var body = ctx.channel.allocator.buffer(capacity: 128)
|
||||
let response = { () -> HTTPResponseHead in
|
||||
switch error {
|
||||
case let e as IOError where e.errnoCode == ENOENT:
|
||||
body.write(staticString: "IOError (not found)\r\n")
|
||||
return HTTPResponseHead(version: request.version, status: .notFound)
|
||||
case let e as IOError:
|
||||
body.write(staticString: "IOError (other)\r\n")
|
||||
body.write(string: e.description)
|
||||
body.write(staticString: "\r\n")
|
||||
return HTTPResponseHead(version: request.version, status: .notFound)
|
||||
default:
|
||||
body.write(string: "\(type(of: error)) error\r\n")
|
||||
return HTTPResponseHead(version: request.version, status: .internalServerError)
|
||||
}
|
||||
}()
|
||||
body.write(string: error.localizedDescription)
|
||||
body.write(staticString: "\r\n")
|
||||
ctx.write(data: self.wrapOutboundOut(.head(response)), promise: nil)
|
||||
ctx.write(data: self.wrapOutboundOut(.body(.byteBuffer(body))), promise: nil)
|
||||
ctx.writeAndFlush(data: self.wrapOutboundOut(.end(nil)), promise: nil)
|
||||
ctx.channel.close(promise: nil)
|
||||
}
|
||||
case .end(_):
|
||||
ctx.writeAndFlush(data: self.wrapOutboundOut(.end(nil)), promise: nil)
|
||||
()
|
||||
default:
|
||||
fatalError("oh noes: \(request)")
|
||||
}
|
||||
|
@ -250,8 +310,12 @@ private final class HTTPHandler: ChannelInboundHandler {
|
|||
self.handler = self.dynamicHandler(request: request)
|
||||
self.handler!(ctx, reqPart)
|
||||
return
|
||||
} else if request.uri != "" && request.uri != "/" {
|
||||
self.handler = self.handleFile
|
||||
} else if let path = request.uri.chopPrefix("/sendfile/") {
|
||||
self.handler = { self.handleFile(ctx: $0, request: $1, ioMethod: .sendfile, path: path) }
|
||||
self.handler!(ctx, reqPart)
|
||||
return
|
||||
} else if let path = request.uri.chopPrefix("/fileio/") {
|
||||
self.handler = { self.handleFile(ctx: $0, request: $1, ioMethod: .nonblockingFileIO, path: path) }
|
||||
self.handler!(ctx, reqPart)
|
||||
return
|
||||
}
|
||||
|
@ -322,6 +386,7 @@ default:
|
|||
}
|
||||
|
||||
let group = MultiThreadedEventLoopGroup(numThreads: 1)
|
||||
let fileIO = NonBlockingFileIO(numberOfThreads: 6)
|
||||
let bootstrap = ServerBootstrap(group: group)
|
||||
// Specify backlog and enable SO_REUSEADDR for the server itself
|
||||
.serverChannelOption(ChannelOptions.backlog, value: 256)
|
||||
|
@ -329,8 +394,8 @@ let bootstrap = ServerBootstrap(group: group)
|
|||
|
||||
// Set the handlers that are applied to the accepted Channels
|
||||
.childChannelInitializer { channel in
|
||||
return channel.pipeline.addHTTPServerHandlers().then {
|
||||
return channel.pipeline.add(handler: HTTPHandler(htdocsPath: htdocs))
|
||||
return channel.pipeline.addHTTPServerHandlers().then { _ in
|
||||
return channel.pipeline.add(handler: HTTPHandler(fileIO: fileIO, htdocsPath: htdocs))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -340,6 +405,7 @@ let bootstrap = ServerBootstrap(group: group)
|
|||
.childChannelOption(ChannelOptions.maxMessagesPerRead, value: 1)
|
||||
|
||||
defer {
|
||||
try! fileIO.syncShutdownGracefully()
|
||||
try! group.syncShutdownGracefully()
|
||||
}
|
||||
|
||||
|
|
|
@ -56,6 +56,7 @@ import XCTest
|
|||
testCase(IdleStateHandlerTest.allTests),
|
||||
testCase(MarkedCircularBufferTests.allTests),
|
||||
testCase(MessageToByteEncoderTest.allTests),
|
||||
testCase(NonBlockingFileIOTest.allTests),
|
||||
testCase(PriorityQueueTest.allTests),
|
||||
testCase(SniHandlerTest.allTests),
|
||||
testCase(SocketAddressTest.allTests),
|
||||
|
|
|
@ -29,6 +29,8 @@ extension FileRegionTest {
|
|||
("testWriteFileRegion", testWriteFileRegion),
|
||||
("testWriteEmptyFileRegionDoesNotHang", testWriteEmptyFileRegionDoesNotHang),
|
||||
("testOutstandingFileRegionsWork", testOutstandingFileRegionsWork),
|
||||
("testWholeFileFileRegion", testWholeFileFileRegion),
|
||||
("testWholeEmptyFileFileRegion", testWholeEmptyFileFileRegion),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -168,4 +168,28 @@ class FileRegionTest : XCTestCase {
|
|||
buffer.write(bytes: bytes)
|
||||
try countingHandler.assertReceived(buffer: buffer)
|
||||
}
|
||||
|
||||
func testWholeFileFileRegion() throws {
|
||||
try withTemporaryFile(content: "hello") { fd, path in
|
||||
let region = try FileRegion(file: path)
|
||||
defer {
|
||||
XCTAssertNoThrow(try region.close())
|
||||
}
|
||||
XCTAssertEqual(0, region.readerIndex)
|
||||
XCTAssertEqual(5, region.readableBytes)
|
||||
XCTAssertEqual(5, region.endIndex)
|
||||
}
|
||||
}
|
||||
|
||||
func testWholeEmptyFileFileRegion() throws {
|
||||
try withTemporaryFile(content: "") { fd, path in
|
||||
let region = try FileRegion(file: path)
|
||||
defer {
|
||||
XCTAssertNoThrow(try region.close())
|
||||
}
|
||||
XCTAssertEqual(0, region.readerIndex)
|
||||
XCTAssertEqual(0, region.readableBytes)
|
||||
XCTAssertEqual(0, region.endIndex)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
//===----------------------------------------------------------------------===//
|
||||
//
|
||||
// This source file is part of the SwiftNIO open source project
|
||||
//
|
||||
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
|
||||
// Licensed under Apache License v2.0
|
||||
//
|
||||
// See LICENSE.txt for license information
|
||||
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
///
|
||||
/// NonBlockingFileIOTest+XCTest.swift
|
||||
///
|
||||
import XCTest
|
||||
|
||||
///
|
||||
/// NOTE: This file was generated by generate_linux_tests.rb
|
||||
///
|
||||
/// Do NOT edit this file directly as it will be regenerated automatically when needed.
|
||||
///
|
||||
|
||||
extension NonBlockingFileIOTest {
|
||||
|
||||
static var allTests : [(String, (NonBlockingFileIOTest) -> () throws -> Void)] {
|
||||
return [
|
||||
("testBasicFileIOWorks", testBasicFileIOWorks),
|
||||
("testOffsetWorks", testOffsetWorks),
|
||||
("testOffsetBeyondEOF", testOffsetBeyondEOF),
|
||||
("testEmptyReadWorks", testEmptyReadWorks),
|
||||
("testReadingShortWorks", testReadingShortWorks),
|
||||
("testDoesNotBlockTheThreadOrEventLoop", testDoesNotBlockTheThreadOrEventLoop),
|
||||
("testGettingErrorWhenEventLoopGroupIsShutdown", testGettingErrorWhenEventLoopGroupIsShutdown),
|
||||
("testChunkReadingWorks", testChunkReadingWorks),
|
||||
("testChunkReadingCanBeAborted", testChunkReadingCanBeAborted),
|
||||
("testFailedIO", testFailedIO),
|
||||
("testChunkReadingWorksForIncrediblyLongChain", testChunkReadingWorksForIncrediblyLongChain),
|
||||
("testReadingDifferentChunkSize", testReadingDifferentChunkSize),
|
||||
("testReadDoesNotReadShort", testReadDoesNotReadShort),
|
||||
("testChunkReadingWhereByteCountIsNotAChunkSizeMultiplier", testChunkReadingWhereByteCountIsNotAChunkSizeMultiplier),
|
||||
("testChunkedReadDoesNotReadShort", testChunkedReadDoesNotReadShort),
|
||||
("testChunkSizeMoreThanTotal", testChunkSizeMoreThanTotal),
|
||||
("testFileRegionReadFromPipeFails", testFileRegionReadFromPipeFails),
|
||||
("testReadFromNonBlockingPipeFails", testReadFromNonBlockingPipeFails),
|
||||
("testDoubleShutdownWorks", testDoubleShutdownWorks),
|
||||
("testSeekPointerIsSetToFront", testSeekPointerIsSetToFront),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,447 @@
|
|||
//===----------------------------------------------------------------------===//
|
||||
//
|
||||
// This source file is part of the SwiftNIO open source project
|
||||
//
|
||||
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
|
||||
// Licensed under Apache License v2.0
|
||||
//
|
||||
// See LICENSE.txt for license information
|
||||
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
import XCTest
|
||||
@testable import NIO
|
||||
|
||||
class NonBlockingFileIOTest: XCTestCase {
|
||||
private var group: EventLoopGroup!
|
||||
private var eventLoop: EventLoop!
|
||||
private var allocator: ByteBufferAllocator!
|
||||
private var fileIO: NonBlockingFileIO!
|
||||
|
||||
override func setUp() {
|
||||
super.setUp()
|
||||
self.allocator = ByteBufferAllocator()
|
||||
self.group = MultiThreadedEventLoopGroup(numThreads: 1)
|
||||
self.fileIO = NonBlockingFileIO(numberOfThreads: 6)
|
||||
self.eventLoop = self.group.next()
|
||||
}
|
||||
|
||||
override func tearDown() {
|
||||
XCTAssertNoThrow(try self.group?.syncShutdownGracefully())
|
||||
XCTAssertNoThrow(try self.fileIO?.syncShutdownGracefully())
|
||||
self.group = nil
|
||||
self.eventLoop = nil
|
||||
self.allocator = nil
|
||||
self.fileIO = nil
|
||||
super.tearDown()
|
||||
}
|
||||
|
||||
func testBasicFileIOWorks() throws {
|
||||
let content = "hello"
|
||||
try withTemporaryFile(content: content) { (fd, _) -> Void in
|
||||
let fr = FileRegion(descriptor: fd, readerIndex: 0, endIndex: 5)
|
||||
var buf = try self.fileIO.read(fileRegion: fr,
|
||||
allocator: self.allocator,
|
||||
eventLoop: self.eventLoop).wait()
|
||||
XCTAssertEqual(content.utf8.count, buf.readableBytes)
|
||||
XCTAssertEqual(content, buf.readString(length: buf.readableBytes))
|
||||
}
|
||||
}
|
||||
|
||||
func testOffsetWorks() throws {
|
||||
let content = "hello"
|
||||
try withTemporaryFile(content: content) { (fd, path) -> Void in
|
||||
let fr = FileRegion(descriptor: fd, readerIndex: 3, endIndex: 5)
|
||||
var buf = try self.fileIO.read(fileRegion: fr,
|
||||
allocator: self.allocator,
|
||||
eventLoop: self.eventLoop).wait()
|
||||
XCTAssertEqual(2, buf.readableBytes)
|
||||
XCTAssertEqual("lo", buf.readString(length: buf.readableBytes))
|
||||
}
|
||||
}
|
||||
|
||||
func testOffsetBeyondEOF() throws {
|
||||
let content = "hello"
|
||||
try withTemporaryFile(content: content) { (fd, path) -> Void in
|
||||
let fr = FileRegion(descriptor: fd, readerIndex: 3000, endIndex: 3001)
|
||||
var buf = try self.fileIO.read(fileRegion: fr,
|
||||
allocator: self.allocator,
|
||||
eventLoop: self.eventLoop).wait()
|
||||
XCTAssertEqual(0, buf.readableBytes)
|
||||
XCTAssertEqual("", buf.readString(length: buf.readableBytes))
|
||||
}
|
||||
}
|
||||
|
||||
func testEmptyReadWorks() throws {
|
||||
try withTemporaryFile { (fd, path) -> Void in
|
||||
let fr = FileRegion(descriptor: fd, readerIndex: 0, endIndex: 0)
|
||||
let buf = try self.fileIO.read(fileRegion: fr,
|
||||
allocator: self.allocator,
|
||||
eventLoop: self.eventLoop).wait()
|
||||
XCTAssertEqual(0, buf.readableBytes)
|
||||
}
|
||||
}
|
||||
|
||||
func testReadingShortWorks() throws {
|
||||
let content = "hello"
|
||||
try withTemporaryFile(content: "hello") { (fd, path) -> Void in
|
||||
let fr = FileRegion(descriptor: fd, readerIndex: 0, endIndex: 10)
|
||||
var buf = try self.fileIO.read(fileRegion: fr,
|
||||
allocator: self.allocator,
|
||||
eventLoop: self.eventLoop).wait()
|
||||
XCTAssertEqual(content.utf8.count, buf.readableBytes)
|
||||
XCTAssertEqual(content, buf.readString(length: buf.readableBytes))
|
||||
}
|
||||
}
|
||||
|
||||
func testDoesNotBlockTheThreadOrEventLoop() throws {
|
||||
var innerError: Error? = nil
|
||||
try withPipe { readFD, writeFD in
|
||||
let bufferFuture = self.fileIO.read(descriptor: readFD,
|
||||
byteCount: 10,
|
||||
allocator: self.allocator,
|
||||
eventLoop: self.eventLoop)
|
||||
do {
|
||||
try self.eventLoop.submit {
|
||||
_ = try Posix.write(descriptor: writeFD, pointer: "X", size: 1)
|
||||
_ = try Posix.close(descriptor: writeFD)
|
||||
}.wait()
|
||||
var buf = try bufferFuture.wait()
|
||||
XCTAssertEqual(1, buf.readableBytes)
|
||||
XCTAssertEqual("X", buf.readString(length: buf.readableBytes))
|
||||
} catch {
|
||||
innerError = error
|
||||
}
|
||||
return [readFD]
|
||||
}
|
||||
XCTAssertNil(innerError)
|
||||
}
|
||||
|
||||
func testGettingErrorWhenEventLoopGroupIsShutdown() throws {
|
||||
self.fileIO.shutdownGracefully(queue: .global()) { err in
|
||||
XCTAssertNil(err)
|
||||
}
|
||||
|
||||
try withPipe { readFD, writeFD in
|
||||
do {
|
||||
_ = try self.fileIO.read(descriptor: readFD,
|
||||
byteCount: 1,
|
||||
allocator: self.allocator,
|
||||
eventLoop: self.eventLoop).wait()
|
||||
XCTFail("should've thrown")
|
||||
} catch let e as ChannelError {
|
||||
XCTAssertEqual(ChannelError.ioOnClosedChannel, e)
|
||||
} catch {
|
||||
XCTFail("unexpected error \(error)")
|
||||
}
|
||||
return [readFD, writeFD]
|
||||
}
|
||||
}
|
||||
|
||||
func testChunkReadingWorks() throws {
|
||||
let content = "hello"
|
||||
let contentBytes = Array(content.utf8)
|
||||
var numCalls = 0
|
||||
try withTemporaryFile(content: content) { (fd, path) -> Void in
|
||||
let fr = FileRegion(descriptor: fd, readerIndex: 0, endIndex: 5)
|
||||
try self.fileIO.readChunked(fileRegion: fr,
|
||||
chunkSize: 1,
|
||||
allocator: self.allocator,
|
||||
eventLoop: self.eventLoop) { buf in
|
||||
var buf = buf
|
||||
XCTAssertTrue(self.eventLoop.inEventLoop)
|
||||
XCTAssertEqual(1, buf.readableBytes)
|
||||
XCTAssertEqual(contentBytes[numCalls], buf.readBytes(length: 1)?.first!)
|
||||
numCalls += 1
|
||||
return self.eventLoop.newSucceedFuture(result: ())
|
||||
}.wait()
|
||||
}
|
||||
XCTAssertEqual(content.utf8.count, numCalls)
|
||||
}
|
||||
|
||||
func testChunkReadingCanBeAborted() throws {
|
||||
enum DummyError: Error { case dummy }
|
||||
let content = "hello"
|
||||
let contentBytes = Array(content.utf8)
|
||||
var numCalls = 0
|
||||
withTemporaryFile(content: content) { (fd, path) -> Void in
|
||||
let fr = FileRegion(descriptor: fd, readerIndex: 0, endIndex: 5)
|
||||
do {
|
||||
try self.fileIO.readChunked(fileRegion: fr,
|
||||
chunkSize: 1,
|
||||
allocator: self.allocator,
|
||||
eventLoop: self.eventLoop) { buf in
|
||||
var buf = buf
|
||||
XCTAssertTrue(self.eventLoop.inEventLoop)
|
||||
XCTAssertEqual(1, buf.readableBytes)
|
||||
XCTAssertEqual(contentBytes[numCalls], buf.readBytes(length: 1)?.first!)
|
||||
numCalls += 1
|
||||
return self.eventLoop.newFailedFuture(error: DummyError.dummy)
|
||||
}.wait()
|
||||
XCTFail("call successful but should've failed")
|
||||
} catch let e as DummyError where e == .dummy {
|
||||
// ok
|
||||
} catch {
|
||||
XCTFail("wrong error \(error) caught")
|
||||
}
|
||||
}
|
||||
XCTAssertEqual(1, numCalls)
|
||||
}
|
||||
|
||||
func testFailedIO() throws {
|
||||
enum DummyError: Error { case dummy }
|
||||
let unconnectedSockFD = socket(AF_UNIX, Posix.SOCK_STREAM, 0)
|
||||
defer {
|
||||
XCTAssertNoThrow(try Posix.close(descriptor: unconnectedSockFD))
|
||||
}
|
||||
do {
|
||||
try self.fileIO.readChunked(descriptor: unconnectedSockFD,
|
||||
byteCount: 5,
|
||||
chunkSize: 1,
|
||||
allocator: self.allocator,
|
||||
eventLoop: self.eventLoop) { buf in
|
||||
XCTFail("shouldn't have been called")
|
||||
return self.eventLoop.newSucceedFuture(result: ())
|
||||
}.wait()
|
||||
XCTFail("call successful but should've failed")
|
||||
} catch let e as IOError {
|
||||
#if os(macOS) || os(iOS) || os(watchOS) || os(tvOS)
|
||||
XCTAssertEqual(ENOTCONN, e.errnoCode)
|
||||
#else
|
||||
XCTAssertEqual(EINVAL, e.errnoCode)
|
||||
#endif
|
||||
} catch {
|
||||
XCTFail("wrong error \(error) caught")
|
||||
}
|
||||
}
|
||||
|
||||
func testChunkReadingWorksForIncrediblyLongChain() throws {
|
||||
let content = String(repeatElement("X", count: 20*1024))
|
||||
var numCalls = 0
|
||||
let expectedByte = content.utf8.first!
|
||||
try withTemporaryFile(content: content) { (fd, path) -> Void in
|
||||
let fr = FileRegion(descriptor: fd, readerIndex: 0, endIndex: content.utf8.count)
|
||||
try self.fileIO.readChunked(fileRegion: fr,
|
||||
chunkSize: 1,
|
||||
allocator: self.allocator,
|
||||
eventLoop: self.eventLoop) { buf in
|
||||
var buf = buf
|
||||
XCTAssertTrue(self.eventLoop.inEventLoop)
|
||||
XCTAssertEqual(1, buf.readableBytes)
|
||||
XCTAssertEqual(expectedByte, buf.readBytes(length: 1)!.first!)
|
||||
numCalls += 1
|
||||
return self.eventLoop.newSucceedFuture(result: ())
|
||||
}.wait()
|
||||
}
|
||||
XCTAssertEqual(content.utf8.count, numCalls)
|
||||
}
|
||||
|
||||
func testReadingDifferentChunkSize() throws {
|
||||
let content = "0123456789"
|
||||
var numCalls = 0
|
||||
try withTemporaryFile(content: content) { (fd, path) -> Void in
|
||||
let fr = FileRegion(descriptor: fd, readerIndex: 0, endIndex: content.utf8.count)
|
||||
try self.fileIO.readChunked(fileRegion: fr,
|
||||
chunkSize: 2,
|
||||
allocator: self.allocator,
|
||||
eventLoop: self.eventLoop) { buf in
|
||||
var buf = buf
|
||||
XCTAssertTrue(self.eventLoop.inEventLoop)
|
||||
XCTAssertEqual(2, buf.readableBytes)
|
||||
XCTAssertEqual(Array("\(numCalls*2)\(numCalls*2 + 1)".utf8), buf.readBytes(length: 2)!)
|
||||
numCalls += 1
|
||||
return self.eventLoop.newSucceedFuture(result: ())
|
||||
}.wait()
|
||||
}
|
||||
XCTAssertEqual(content.utf8.count/2, numCalls)
|
||||
}
|
||||
|
||||
func testReadDoesNotReadShort() throws {
|
||||
var innerError: Error? = nil
|
||||
try withPipe { readFD, writeFD in
|
||||
let bufferFuture = self.fileIO.read(descriptor: readFD,
|
||||
byteCount: 10,
|
||||
allocator: self.allocator,
|
||||
eventLoop: self.eventLoop)
|
||||
do {
|
||||
for i in 0..<10 {
|
||||
// this construction will cause 'read' to repeatedly return with 1 byte read
|
||||
try self.eventLoop.scheduleTask(in: .milliseconds(50)) {
|
||||
_ = try Posix.write(descriptor: writeFD, pointer: "\(i)", size: 1)
|
||||
}.futureResult.wait()
|
||||
}
|
||||
_ = try Posix.close(descriptor: writeFD)
|
||||
|
||||
var buf = try bufferFuture.wait()
|
||||
XCTAssertEqual(10, buf.readableBytes)
|
||||
XCTAssertEqual("0123456789", buf.readString(length: buf.readableBytes))
|
||||
} catch {
|
||||
innerError = error
|
||||
}
|
||||
return [readFD]
|
||||
}
|
||||
XCTAssertNil(innerError)
|
||||
}
|
||||
|
||||
func testChunkReadingWhereByteCountIsNotAChunkSizeMultiplier() throws {
|
||||
let content = "prefix-12345-suffix"
|
||||
var allBytesActual = ""
|
||||
let allBytesExpected = String(content.dropFirst(7).dropLast(7))
|
||||
var numCalls = 0
|
||||
try withTemporaryFile(content: content) { (fd, path) -> Void in
|
||||
let fr = FileRegion(descriptor: fd, readerIndex: 7, endIndex: 12)
|
||||
try self.fileIO.readChunked(fileRegion: fr,
|
||||
chunkSize: 3,
|
||||
allocator: self.allocator,
|
||||
eventLoop: self.eventLoop) { buf in
|
||||
var buf = buf
|
||||
XCTAssertTrue(self.eventLoop.inEventLoop)
|
||||
allBytesActual += buf.readString(length: buf.readableBytes) ?? "WRONG"
|
||||
numCalls += 1
|
||||
return self.eventLoop.newSucceedFuture(result: ())
|
||||
}.wait()
|
||||
}
|
||||
XCTAssertEqual(allBytesExpected, allBytesActual)
|
||||
XCTAssertEqual(2, numCalls)
|
||||
}
|
||||
|
||||
func testChunkedReadDoesNotReadShort() throws {
|
||||
var innerError: Error? = nil
|
||||
try withPipe { readFD, writeFD in
|
||||
var allBytes = ""
|
||||
let f = self.fileIO.readChunked(descriptor: readFD,
|
||||
byteCount: 10,
|
||||
chunkSize: 3,
|
||||
allocator: self.allocator,
|
||||
eventLoop: self.eventLoop) { buf in
|
||||
var buf = buf
|
||||
if allBytes.utf8.count == 9 {
|
||||
XCTAssertEqual(1, buf.readableBytes)
|
||||
} else {
|
||||
XCTAssertEqual(3, buf.readableBytes)
|
||||
}
|
||||
allBytes.append(buf.readString(length: buf.readableBytes) ?? "THIS IS WRONG")
|
||||
return self.eventLoop.newSucceedFuture(result: ())
|
||||
}
|
||||
|
||||
do {
|
||||
for i in 0..<10 {
|
||||
// this construction will cause 'read' to repeatedly return with 1 byte read
|
||||
try self.eventLoop.scheduleTask(in: .milliseconds(50)) {
|
||||
_ = try Posix.write(descriptor: writeFD, pointer: "\(i)", size: 1)
|
||||
}.futureResult.wait()
|
||||
}
|
||||
_ = try Posix.close(descriptor: writeFD)
|
||||
|
||||
try f.wait()
|
||||
XCTAssertEqual("0123456789", allBytes)
|
||||
} catch {
|
||||
innerError = error
|
||||
}
|
||||
return [readFD]
|
||||
}
|
||||
XCTAssertNil(innerError)
|
||||
}
|
||||
|
||||
func testChunkSizeMoreThanTotal() throws {
|
||||
let content = "0123456789"
|
||||
var numCalls = 0
|
||||
try withTemporaryFile(content: content) { (fd, path) -> Void in
|
||||
let fr = FileRegion(descriptor: fd, readerIndex: 0, endIndex: 5)
|
||||
try self.fileIO.readChunked(fileRegion: fr,
|
||||
chunkSize: 10,
|
||||
allocator: self.allocator,
|
||||
eventLoop: self.eventLoop) { buf in
|
||||
var buf = buf
|
||||
XCTAssertTrue(self.eventLoop.inEventLoop)
|
||||
XCTAssertEqual(5, buf.readableBytes)
|
||||
XCTAssertEqual("01234", buf.readString(length: buf.readableBytes) ?? "bad")
|
||||
numCalls += 1
|
||||
return self.eventLoop.newSucceedFuture(result: ())
|
||||
}.wait()
|
||||
}
|
||||
XCTAssertEqual(1, numCalls)
|
||||
}
|
||||
|
||||
func testFileRegionReadFromPipeFails() throws {
|
||||
try withPipe { readFD, writeFD in
|
||||
_ = try! Posix.write(descriptor: writeFD, pointer: "ABC", size: 3)
|
||||
let fr = FileRegion(descriptor: readFD, readerIndex: 1, endIndex: 2)
|
||||
do {
|
||||
try self.fileIO.readChunked(fileRegion: fr,
|
||||
chunkSize: 10,
|
||||
allocator: self.allocator,
|
||||
eventLoop: self.eventLoop) { buf in
|
||||
XCTFail("this shouldn't have been called")
|
||||
return self.eventLoop.newSucceedFuture(result: ())
|
||||
}.wait()
|
||||
XCTFail("succeeded and shouldn't have")
|
||||
} catch let e as IOError where e.errnoCode == ESPIPE {
|
||||
// OK
|
||||
} catch {
|
||||
XCTFail("wrong error \(error) caught")
|
||||
}
|
||||
return [readFD, writeFD]
|
||||
}
|
||||
}
|
||||
|
||||
func testReadFromNonBlockingPipeFails() throws {
|
||||
try withPipe { readFD, writeFD in
|
||||
do {
|
||||
try Posix.fcntl(descriptor: readFD, command: F_SETFL, value: O_NONBLOCK)
|
||||
try self.fileIO.readChunked(descriptor: readFD,
|
||||
byteCount: 10,
|
||||
chunkSize: 10,
|
||||
allocator: self.allocator,
|
||||
eventLoop: self.eventLoop) { buf in
|
||||
XCTFail("this shouldn't have been called")
|
||||
return self.eventLoop.newSucceedFuture(result: ())
|
||||
}.wait()
|
||||
XCTFail("succeeded and shouldn't have")
|
||||
} catch let e as NonBlockingFileIO.Error where e == NonBlockingFileIO.Error.descriptorSetToNonBlocking {
|
||||
// OK
|
||||
} catch {
|
||||
XCTFail("wrong error \(error) caught")
|
||||
}
|
||||
return [readFD, writeFD]
|
||||
}
|
||||
}
|
||||
|
||||
func testDoubleShutdownWorks() throws {
|
||||
let otherFileIO = NonBlockingFileIO(numberOfThreads: 17)
|
||||
try otherFileIO.syncShutdownGracefully()
|
||||
try otherFileIO.syncShutdownGracefully()
|
||||
}
|
||||
|
||||
func testSeekPointerIsSetToFront() throws {
|
||||
let content = "0123456789"
|
||||
var numCalls = 0
|
||||
try withTemporaryFile(content: content) { (fd, path) -> Void in
|
||||
let region = try FileRegion(file: path)
|
||||
defer {
|
||||
try! region.close()
|
||||
}
|
||||
try self.fileIO.readChunked(descriptor: region.descriptor,
|
||||
byteCount: content.utf8.count,
|
||||
chunkSize: 9,
|
||||
allocator: self.allocator,
|
||||
eventLoop: self.eventLoop) { buf in
|
||||
var buf = buf
|
||||
numCalls += 1
|
||||
XCTAssertTrue(self.eventLoop.inEventLoop)
|
||||
if numCalls == 1 {
|
||||
XCTAssertEqual(9, buf.readableBytes)
|
||||
XCTAssertEqual("012345678", buf.readString(length: buf.readableBytes) ?? "bad")
|
||||
} else {
|
||||
XCTAssertEqual(1, buf.readableBytes)
|
||||
XCTAssertEqual("9", buf.readString(length: buf.readableBytes) ?? "bad")
|
||||
}
|
||||
return self.eventLoop.newSucceedFuture(result: ())
|
||||
}.wait()
|
||||
}
|
||||
XCTAssertEqual(2, numCalls)
|
||||
}
|
||||
}
|
|
@ -22,26 +22,22 @@ class SystemTest: XCTestCase {
|
|||
}
|
||||
|
||||
func testErrorsWorkCorrectly() throws {
|
||||
var fds: [Int32] = [-1, -1]
|
||||
let pipeErr = pipe(&fds)
|
||||
precondition(pipeErr == 0, "pipe returned error")
|
||||
defer {
|
||||
close(fds[0])
|
||||
close(fds[1])
|
||||
}
|
||||
var randomBytes: UInt8 = 42
|
||||
do {
|
||||
_ = try withUnsafePointer(to: &randomBytes) { ptr in
|
||||
try Posix.setsockopt(socket: fds[0], level: -1, optionName: -1, optionValue: ptr, optionLen: 0)
|
||||
try withPipe { readFD, writeFD in
|
||||
var randomBytes: UInt8 = 42
|
||||
do {
|
||||
_ = try withUnsafePointer(to: &randomBytes) { ptr in
|
||||
try Posix.setsockopt(socket: readFD, level: -1, optionName: -1, optionValue: ptr, optionLen: 0)
|
||||
}
|
||||
XCTFail("success even though the call was invalid")
|
||||
} catch let e as IOError {
|
||||
XCTAssertEqual(ENOTSOCK, e.errnoCode)
|
||||
XCTAssert(e.description.contains("setsockopt"))
|
||||
XCTAssert(e.description.contains("\(ENOTSOCK)"))
|
||||
XCTAssert(e.localizedDescription.contains("\(ENOTSOCK)"), "\(e.localizedDescription)")
|
||||
} catch let e {
|
||||
XCTFail("wrong error thrown: \(e)")
|
||||
}
|
||||
XCTFail("success even though the call was invalid")
|
||||
} catch let e as IOError {
|
||||
XCTAssertEqual(ENOTSOCK, e.errnoCode)
|
||||
XCTAssert(e.description.contains("setsockopt"))
|
||||
XCTAssert(e.description.contains("\(ENOTSOCK)"))
|
||||
XCTAssert(e.localizedDescription.contains("\(ENOTSOCK)"), "\(e.localizedDescription)")
|
||||
} catch let e {
|
||||
XCTFail("wrong error thrown: \(e)")
|
||||
return [readFD, writeFD]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,9 +12,61 @@
|
|||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
import NIO
|
||||
@testable import NIO
|
||||
import XCTest
|
||||
|
||||
func withPipe(_ fn: (CInt, CInt) -> [CInt]) throws {
|
||||
var fds: [Int32] = [-1, -1]
|
||||
fds.withUnsafeMutableBufferPointer { ptr in
|
||||
XCTAssertEqual(0, pipe(ptr.baseAddress!))
|
||||
}
|
||||
let toClose = fn(fds[0], fds[1])
|
||||
try toClose.forEach { fd in
|
||||
XCTAssertNoThrow(try Posix.close(descriptor: fd))
|
||||
}
|
||||
}
|
||||
|
||||
func withTemporaryFile<T>(content: String? = nil, _ fn: (CInt, String) throws -> T) rethrows -> T {
|
||||
let (fd, path) = openTemporaryFile()
|
||||
defer {
|
||||
XCTAssertNoThrow(try Posix.close(descriptor: fd))
|
||||
XCTAssertEqual(0, unlink(path))
|
||||
}
|
||||
if let content = content {
|
||||
try Array(content.utf8).withUnsafeBufferPointer { ptr in
|
||||
var toWrite = ptr.count
|
||||
var start = ptr.baseAddress!
|
||||
while toWrite > 0 {
|
||||
let res = try Posix.write(descriptor: fd, pointer: start, size: toWrite)
|
||||
switch res {
|
||||
case .processed(let written):
|
||||
toWrite -= written
|
||||
start = start + written
|
||||
case .wouldBlock:
|
||||
XCTFail("unexpectedly got .wouldBlock from a file")
|
||||
continue
|
||||
}
|
||||
}
|
||||
XCTAssertEqual(0, lseek(fd, 0, SEEK_SET))
|
||||
}
|
||||
}
|
||||
return try fn(fd, path)
|
||||
}
|
||||
|
||||
func openTemporaryFile() -> (CInt, String) {
|
||||
let template = "/tmp/niotestXXXXXXX"
|
||||
var templateBytes = template.utf8 + [0]
|
||||
let templateBytesCount = templateBytes.count
|
||||
let fd = templateBytes.withUnsafeMutableBufferPointer { ptr in
|
||||
ptr.baseAddress!.withMemoryRebound(to: Int8.self, capacity: templateBytesCount) { (ptr: UnsafeMutablePointer<Int8>) in
|
||||
return mkstemp(ptr)
|
||||
}
|
||||
}
|
||||
templateBytes.removeLast()
|
||||
return (fd, String(decoding: templateBytes, as: UTF8.self))
|
||||
}
|
||||
|
||||
|
||||
final class ByteCountingHandler : ChannelInboundHandler {
|
||||
typealias InboundIn = ByteBuffer
|
||||
|
||||
|
|
|
@ -75,7 +75,7 @@ function get_socket() {
|
|||
|
||||
function stop_server() {
|
||||
source "$1"
|
||||
sleep 0.05 # just to make sure all the fds could be closed
|
||||
sleep 0.5 # just to make sure all the fds could be closed
|
||||
if command -v lsof > /dev/null 2> /dev/null; then
|
||||
server_lsof "$token_pid"
|
||||
local open_fds
|
||||
|
|
|
@ -6,6 +6,8 @@ token=$(create_token)
|
|||
start_server "$token"
|
||||
htdocs=$(get_htdocs "$token")
|
||||
echo FOO BAR > "$htdocs/some_file.txt"
|
||||
do_curl "$token" "http://foobar.com/some_file.txt" > "$tmp/out.txt"
|
||||
assert_equal_files "$htdocs/some_file.txt" "$tmp/out.txt"
|
||||
for method in sendfile fileio; do
|
||||
do_curl "$token" "http://foobar.com/$method/some_file.txt" > "$tmp/out.txt"
|
||||
assert_equal_files "$htdocs/some_file.txt" "$tmp/out.txt"
|
||||
done
|
||||
stop_server "$token"
|
||||
|
|
|
@ -8,6 +8,8 @@ htdocs=$(get_htdocs "$token")
|
|||
base="s/o/m/e/r/a/n/d/o/m/f/o/l/d/e/r"
|
||||
mkdir -p "$htdocs/$base"
|
||||
dd if=/dev/urandom of="$htdocs/$base/random.bytes" bs=$((1024 * 1024)) count=2
|
||||
do_curl "$token" "http://foobar.com/$base/random.bytes" > "$tmp/random.bytes"
|
||||
cmp "$htdocs/$base/random.bytes" "$tmp/random.bytes"
|
||||
for method in sendfile fileio; do
|
||||
do_curl "$token" "http://foobar.com/$method/$base/random.bytes" > "$tmp/random.bytes"
|
||||
assert_equal_files "$htdocs/$base/random.bytes" "$tmp/random.bytes"
|
||||
done
|
||||
stop_server "$token"
|
||||
|
|
|
@ -15,7 +15,7 @@ for f in $(seq 20); do
|
|||
kill -CONT "$server_pid"
|
||||
kill -WINCH "$server_pid"
|
||||
|
||||
do_curl "$token" "http://foobar.com/some_file.txt" > "$tmp/out.txt" &
|
||||
do_curl "$token" "http://foobar.com/fileio/some_file.txt" > "$tmp/out.txt" &
|
||||
curl_pid=$!
|
||||
for g in $(seq 20); do
|
||||
kill -URG "$server_pid"
|
||||
|
|
|
@ -8,7 +8,10 @@ htdocs=$(get_htdocs "$token")
|
|||
# create a 3GB sparse file, this is above the 2,147,479,552 mentioned in the
|
||||
# BUGS section of Linux's sendfile(2) man page.
|
||||
dd if=/dev/zero of="$htdocs/lots_of_zeroes" seek=$((3 * 1024)) bs=$((1024 * 1024)) count=1
|
||||
do_curl "$token" "http://foobar.com/lots_of_zeroes" | shasum > "$tmp/actual_sha"
|
||||
echo "bf184d91c8f82092198e4d8e1d029e576dbec3bc -" > "$tmp/expected_sha"
|
||||
assert_equal_files "$tmp/expected_sha" "$tmp/actual_sha"
|
||||
for method in fileio; do
|
||||
do_curl "$token" "http://foobar.com/$method/lots_of_zeroes" | shasum > "$tmp/actual_sha"
|
||||
echo "bf184d91c8f82092198e4d8e1d029e576dbec3bc -" > "$tmp/expected_sha"
|
||||
assert_equal_files "$tmp/expected_sha" "$tmp/actual_sha"
|
||||
done
|
||||
sleep 3 # wait for all the fds to be closed
|
||||
stop_server "$token"
|
||||
|
|
Loading…
Reference in New Issue