minor stylistic improvements (#36)
Motivation: Sometimes we deviated from the style the Swift stdlib sets out for no reason. Modifications: Fixed some stylistic deviations. Result: Looks more Swift-like.
This commit is contained in:
parent
580924909d
commit
73a805c7f6
|
@ -19,8 +19,8 @@ protocol Registration {
|
||||||
}
|
}
|
||||||
|
|
||||||
protocol SockAddrProtocol {
|
protocol SockAddrProtocol {
|
||||||
mutating func withSockAddr<R>(_ fn: (UnsafePointer<sockaddr>, Int) throws -> R) rethrows -> R
|
mutating func withSockAddr<R>(_ body: (UnsafePointer<sockaddr>, Int) throws -> R) rethrows -> R
|
||||||
mutating func withMutableSockAddr<R>(_ fn: (UnsafeMutablePointer<sockaddr>, Int) throws -> R) rethrows -> R
|
mutating func withMutableSockAddr<R>(_ body: (UnsafeMutablePointer<sockaddr>, Int) throws -> R) rethrows -> R
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a description for the given address.
|
/// Returns a description for the given address.
|
||||||
|
@ -38,17 +38,17 @@ private func descriptionForAddress(family: CInt, bytes: UnsafeRawPointer, length
|
||||||
}
|
}
|
||||||
|
|
||||||
extension sockaddr_in: SockAddrProtocol {
|
extension sockaddr_in: SockAddrProtocol {
|
||||||
mutating func withSockAddr<R>(_ fn: (UnsafePointer<sockaddr>, Int) throws -> R) rethrows -> R {
|
mutating func withSockAddr<R>(_ body: (UnsafePointer<sockaddr>, Int) throws -> R) rethrows -> R {
|
||||||
var me = self
|
var me = self
|
||||||
return try withUnsafeBytes(of: &me) { p in
|
return try withUnsafeBytes(of: &me) { p in
|
||||||
try fn(p.baseAddress!.assumingMemoryBound(to: sockaddr.self), p.count)
|
try body(p.baseAddress!.assumingMemoryBound(to: sockaddr.self), p.count)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mutating func withMutableSockAddr<R>(_ fn: (UnsafeMutablePointer<sockaddr>, Int) throws -> R) rethrows -> R {
|
mutating func withMutableSockAddr<R>(_ body: (UnsafeMutablePointer<sockaddr>, Int) throws -> R) rethrows -> R {
|
||||||
var me = self
|
var me = self
|
||||||
return try withUnsafeMutableBytes(of: &me) { p in
|
return try withUnsafeMutableBytes(of: &me) { p in
|
||||||
try fn(p.baseAddress!.assumingMemoryBound(to: sockaddr.self), p.count)
|
try body(p.baseAddress!.assumingMemoryBound(to: sockaddr.self), p.count)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -61,17 +61,17 @@ extension sockaddr_in: SockAddrProtocol {
|
||||||
}
|
}
|
||||||
|
|
||||||
extension sockaddr_in6: SockAddrProtocol {
|
extension sockaddr_in6: SockAddrProtocol {
|
||||||
mutating func withSockAddr<R>(_ fn: (UnsafePointer<sockaddr>, Int) throws -> R) rethrows -> R {
|
mutating func withSockAddr<R>(_ body: (UnsafePointer<sockaddr>, Int) throws -> R) rethrows -> R {
|
||||||
var me = self
|
var me = self
|
||||||
return try withUnsafeBytes(of: &me) { p in
|
return try withUnsafeBytes(of: &me) { p in
|
||||||
try fn(p.baseAddress!.assumingMemoryBound(to: sockaddr.self), p.count)
|
try body(p.baseAddress!.assumingMemoryBound(to: sockaddr.self), p.count)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mutating func withMutableSockAddr<R>(_ fn: (UnsafeMutablePointer<sockaddr>, Int) throws -> R) rethrows -> R {
|
mutating func withMutableSockAddr<R>(_ body: (UnsafeMutablePointer<sockaddr>, Int) throws -> R) rethrows -> R {
|
||||||
var me = self
|
var me = self
|
||||||
return try withUnsafeMutableBytes(of: &me) { p in
|
return try withUnsafeMutableBytes(of: &me) { p in
|
||||||
try fn(p.baseAddress!.assumingMemoryBound(to: sockaddr.self), p.count)
|
try body(p.baseAddress!.assumingMemoryBound(to: sockaddr.self), p.count)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,32 +84,32 @@ extension sockaddr_in6: SockAddrProtocol {
|
||||||
}
|
}
|
||||||
|
|
||||||
extension sockaddr_un: SockAddrProtocol {
|
extension sockaddr_un: SockAddrProtocol {
|
||||||
mutating func withSockAddr<R>(_ fn: (UnsafePointer<sockaddr>, Int) throws -> R) rethrows -> R {
|
mutating func withSockAddr<R>(_ body: (UnsafePointer<sockaddr>, Int) throws -> R) rethrows -> R {
|
||||||
var me = self
|
var me = self
|
||||||
return try withUnsafeBytes(of: &me) { p in
|
return try withUnsafeBytes(of: &me) { p in
|
||||||
try fn(p.baseAddress!.assumingMemoryBound(to: sockaddr.self), p.count)
|
try body(p.baseAddress!.assumingMemoryBound(to: sockaddr.self), p.count)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mutating func withMutableSockAddr<R>(_ fn: (UnsafeMutablePointer<sockaddr>, Int) throws -> R) rethrows -> R {
|
mutating func withMutableSockAddr<R>(_ body: (UnsafeMutablePointer<sockaddr>, Int) throws -> R) rethrows -> R {
|
||||||
var me = self
|
var me = self
|
||||||
return try withUnsafeMutableBytes(of: &me) { p in
|
return try withUnsafeMutableBytes(of: &me) { p in
|
||||||
try fn(p.baseAddress!.assumingMemoryBound(to: sockaddr.self), p.count)
|
try body(p.baseAddress!.assumingMemoryBound(to: sockaddr.self), p.count)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
extension sockaddr_storage: SockAddrProtocol {
|
extension sockaddr_storage: SockAddrProtocol {
|
||||||
mutating func withSockAddr<R>(_ fn: (UnsafePointer<sockaddr>, Int) throws -> R) rethrows -> R {
|
mutating func withSockAddr<R>(_ body: (UnsafePointer<sockaddr>, Int) throws -> R) rethrows -> R {
|
||||||
var me = self
|
var me = self
|
||||||
return try withUnsafeBytes(of: &me) { p in
|
return try withUnsafeBytes(of: &me) { p in
|
||||||
try fn(p.baseAddress!.assumingMemoryBound(to: sockaddr.self), p.count)
|
try body(p.baseAddress!.assumingMemoryBound(to: sockaddr.self), p.count)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mutating func withMutableSockAddr<R>(_ fn: (UnsafeMutablePointer<sockaddr>, Int) throws -> R) rethrows -> R {
|
mutating func withMutableSockAddr<R>(_ body: (UnsafeMutablePointer<sockaddr>, Int) throws -> R) rethrows -> R {
|
||||||
return try withUnsafeMutableBytes(of: &self) { p in
|
return try withUnsafeMutableBytes(of: &self) { p in
|
||||||
try fn(p.baseAddress!.assumingMemoryBound(to: sockaddr.self), p.count)
|
try body(p.baseAddress!.assumingMemoryBound(to: sockaddr.self), p.count)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -184,11 +184,11 @@ class BaseSocket: Selectable {
|
||||||
private let descriptor: Int32
|
private let descriptor: Int32
|
||||||
public private(set) var open: Bool
|
public private(set) var open: Bool
|
||||||
|
|
||||||
func withUnsafeFileDescriptor<T>(_ fn: (Int32) throws -> T) throws -> T {
|
func withUnsafeFileDescriptor<T>(_ body: (Int32) throws -> T) throws -> T {
|
||||||
guard self.open else {
|
guard self.open else {
|
||||||
throw IOError(errnoCode: EBADF, reason: "file descriptor already closed!")
|
throw IOError(errnoCode: EBADF, reason: "file descriptor already closed!")
|
||||||
}
|
}
|
||||||
return try fn(descriptor)
|
return try body(descriptor)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the local bound `SocketAddress` of the socket.
|
/// Returns the local bound `SocketAddress` of the socket.
|
||||||
|
@ -208,14 +208,14 @@ class BaseSocket: Selectable {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Internal helper function for retrieval of a `SocketAddress`.
|
/// Internal helper function for retrieval of a `SocketAddress`.
|
||||||
private func get_addr(_ fn: (Int32, UnsafeMutablePointer<sockaddr>, UnsafeMutablePointer<socklen_t>) throws -> Void) throws -> SocketAddress {
|
private func get_addr(_ body: (Int32, UnsafeMutablePointer<sockaddr>, UnsafeMutablePointer<socklen_t>) throws -> Void) throws -> SocketAddress {
|
||||||
var addr = sockaddr_storage()
|
var addr = sockaddr_storage()
|
||||||
|
|
||||||
try addr.withMutableSockAddr { addressPtr, size in
|
try addr.withMutableSockAddr { addressPtr, size in
|
||||||
var size = socklen_t(size)
|
var size = socklen_t(size)
|
||||||
|
|
||||||
try withUnsafeFileDescriptor { fd in
|
try withUnsafeFileDescriptor { fd in
|
||||||
try fn(fd, addressPtr, &size)
|
try body(fd, addressPtr, &size)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return addr.convert()
|
return addr.convert()
|
||||||
|
@ -259,7 +259,7 @@ class BaseSocket: Selectable {
|
||||||
///
|
///
|
||||||
/// - parameters:
|
/// - parameters:
|
||||||
/// - descriptor: The file descriptor to wrap.
|
/// - descriptor: The file descriptor to wrap.
|
||||||
init(descriptor : Int32) {
|
init(descriptor: Int32) {
|
||||||
self.descriptor = descriptor
|
self.descriptor = descriptor
|
||||||
self.open = true
|
self.open = true
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,17 +76,17 @@ public final class BlockingIOThreadPool {
|
||||||
/// Submit a `WorkItem` to process.
|
/// Submit a `WorkItem` to process.
|
||||||
///
|
///
|
||||||
/// - parameters:
|
/// - parameters:
|
||||||
/// - fn: The `WorkItem` to process by the `BlockingIOThreadPool`.
|
/// - body: The `WorkItem` to process by the `BlockingIOThreadPool`.
|
||||||
public func submit(_ fn: @escaping WorkItem) {
|
public func submit(_ body: @escaping WorkItem) {
|
||||||
let item = self.lock.withLock { () -> WorkItem? in
|
let item = self.lock.withLock { () -> WorkItem? in
|
||||||
switch self.state {
|
switch self.state {
|
||||||
case .running(var items):
|
case .running(var items):
|
||||||
items.append(fn)
|
items.append(body)
|
||||||
self.state = .running(items)
|
self.state = .running(items)
|
||||||
self.semaphore.signal()
|
self.semaphore.signal()
|
||||||
return nil
|
return nil
|
||||||
case .shuttingDown, .stopped:
|
case .shuttingDown, .stopped:
|
||||||
return fn
|
return body
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/* if item couldn't be added run it immediately indicating that it couldn't be run */
|
/* if item couldn't be added run it immediately indicating that it couldn't be run */
|
||||||
|
|
|
@ -186,7 +186,7 @@ public final class ServerBootstrap {
|
||||||
return promise.futureResult
|
return promise.futureResult
|
||||||
}
|
}
|
||||||
|
|
||||||
private class AcceptHandler : ChannelInboundHandler {
|
private class AcceptHandler: ChannelInboundHandler {
|
||||||
public typealias InboundIn = SocketChannel
|
public typealias InboundIn = SocketChannel
|
||||||
|
|
||||||
private let childChannelInit: ((Channel) -> EventLoopFuture<()>)?
|
private let childChannelInit: ((Channel) -> EventLoopFuture<()>)?
|
||||||
|
|
|
@ -147,10 +147,10 @@ extension ByteBuffer {
|
||||||
/// - warning: Do not escape the pointer from the closure for later use.
|
/// - warning: Do not escape the pointer from the closure for later use.
|
||||||
///
|
///
|
||||||
/// - parameters:
|
/// - parameters:
|
||||||
/// - fn: The closure that will accept the yielded bytes and returns the number of bytes it processed.
|
/// - body: The closure that will accept the yielded bytes and returns the number of bytes it processed.
|
||||||
/// - returns: The number of bytes read.
|
/// - returns: The number of bytes read.
|
||||||
public mutating func readWithUnsafeReadableBytes(_ fn: (UnsafeRawBufferPointer) throws -> Int) rethrows -> Int {
|
public mutating func readWithUnsafeReadableBytes(_ body: (UnsafeRawBufferPointer) throws -> Int) rethrows -> Int {
|
||||||
let bytesRead = try self.withUnsafeReadableBytes(fn)
|
let bytesRead = try self.withUnsafeReadableBytes(body)
|
||||||
self.moveReaderIndex(forwardBy: bytesRead)
|
self.moveReaderIndex(forwardBy: bytesRead)
|
||||||
return bytesRead
|
return bytesRead
|
||||||
}
|
}
|
||||||
|
@ -161,10 +161,10 @@ extension ByteBuffer {
|
||||||
/// - warning: Do not escape the pointer from the closure for later use.
|
/// - warning: Do not escape the pointer from the closure for later use.
|
||||||
///
|
///
|
||||||
/// - parameters:
|
/// - parameters:
|
||||||
/// - fn: The closure that will accept the yielded bytes and returns the number of bytes it processed along with some other value.
|
/// - body: The closure that will accept the yielded bytes and returns the number of bytes it processed along with some other value.
|
||||||
/// - returns: The value `fn` returned in the second tuple component.
|
/// - returns: The value `fn` returned in the second tuple component.
|
||||||
public mutating func readWithUnsafeReadableBytes<T>(_ fn: (UnsafeRawBufferPointer) throws -> (Int, T)) rethrows -> T {
|
public mutating func readWithUnsafeReadableBytes<T>(_ body: (UnsafeRawBufferPointer) throws -> (Int, T)) rethrows -> T {
|
||||||
let (bytesRead, ret) = try self.withUnsafeReadableBytes(fn)
|
let (bytesRead, ret) = try self.withUnsafeReadableBytes(body)
|
||||||
self.moveReaderIndex(forwardBy: bytesRead)
|
self.moveReaderIndex(forwardBy: bytesRead)
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
@ -175,10 +175,10 @@ extension ByteBuffer {
|
||||||
/// - warning: Do not escape the pointer from the closure for later use.
|
/// - warning: Do not escape the pointer from the closure for later use.
|
||||||
///
|
///
|
||||||
/// - parameters:
|
/// - parameters:
|
||||||
/// - fn: The closure that will accept the yielded bytes and returns the number of bytes it processed.
|
/// - body: The closure that will accept the yielded bytes and returns the number of bytes it processed.
|
||||||
/// - returns: The number of bytes read.
|
/// - returns: The number of bytes read.
|
||||||
public mutating func readWithUnsafeMutableReadableBytes(_ fn: (UnsafeMutableRawBufferPointer) throws -> Int) rethrows -> Int {
|
public mutating func readWithUnsafeMutableReadableBytes(_ body: (UnsafeMutableRawBufferPointer) throws -> Int) rethrows -> Int {
|
||||||
let bytesRead = try self.withUnsafeMutableReadableBytes(fn)
|
let bytesRead = try self.withUnsafeMutableReadableBytes(body)
|
||||||
self.moveReaderIndex(forwardBy: bytesRead)
|
self.moveReaderIndex(forwardBy: bytesRead)
|
||||||
return bytesRead
|
return bytesRead
|
||||||
}
|
}
|
||||||
|
@ -189,10 +189,10 @@ extension ByteBuffer {
|
||||||
/// - warning: Do not escape the pointer from the closure for later use.
|
/// - warning: Do not escape the pointer from the closure for later use.
|
||||||
///
|
///
|
||||||
/// - parameters:
|
/// - parameters:
|
||||||
/// - fn: The closure that will accept the yielded bytes and returns the number of bytes it processed along with some other value.
|
/// - body: The closure that will accept the yielded bytes and returns the number of bytes it processed along with some other value.
|
||||||
/// - returns: The value `fn` returned in the second tuple component.
|
/// - returns: The value `fn` returned in the second tuple component.
|
||||||
public mutating func readWithUnsafeMutableReadableBytes<T>(_ fn: (UnsafeMutableRawBufferPointer) throws -> (Int, T)) rethrows -> T {
|
public mutating func readWithUnsafeMutableReadableBytes<T>(_ body: (UnsafeMutableRawBufferPointer) throws -> (Int, T)) rethrows -> T {
|
||||||
let (bytesRead, ret) = try self.withUnsafeMutableReadableBytes(fn)
|
let (bytesRead, ret) = try self.withUnsafeMutableReadableBytes(body)
|
||||||
self.moveReaderIndex(forwardBy: bytesRead)
|
self.moveReaderIndex(forwardBy: bytesRead)
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
|
@ -387,11 +387,11 @@ public struct ByteBuffer {
|
||||||
/// - warning: Do not escape the pointer from the closure for later use.
|
/// - warning: Do not escape the pointer from the closure for later use.
|
||||||
///
|
///
|
||||||
/// - parameters:
|
/// - parameters:
|
||||||
/// - fn: The closure that will accept the yielded bytes.
|
/// - body: The closure that will accept the yielded bytes.
|
||||||
/// - returns: The value returned by `fn`.
|
/// - returns: The value returned by `fn`.
|
||||||
public mutating func withUnsafeMutableReadableBytes<T>(_ fn: (UnsafeMutableRawBufferPointer) throws -> T) rethrows -> T {
|
public mutating func withUnsafeMutableReadableBytes<T>(_ body: (UnsafeMutableRawBufferPointer) throws -> T) rethrows -> T {
|
||||||
self.copyStorageAndRebaseIfNeeded()
|
self.copyStorageAndRebaseIfNeeded()
|
||||||
return try fn(UnsafeMutableRawBufferPointer(start: self._storage.bytes.advanced(by: Int(self._slice.lowerBound + self._readerIndex)),
|
return try body(UnsafeMutableRawBufferPointer(start: self._storage.bytes.advanced(by: Int(self._slice.lowerBound + self._readerIndex)),
|
||||||
count: self.readableBytes))
|
count: self.readableBytes))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -403,17 +403,17 @@ public struct ByteBuffer {
|
||||||
/// - warning: Do not escape the pointer from the closure for later use.
|
/// - warning: Do not escape the pointer from the closure for later use.
|
||||||
///
|
///
|
||||||
/// - parameters:
|
/// - parameters:
|
||||||
/// - fn: The closure that will accept the yielded bytes and return the number of bytes written.
|
/// - body: The closure that will accept the yielded bytes and return the number of bytes written.
|
||||||
/// - returns: The number of bytes written.
|
/// - returns: The number of bytes written.
|
||||||
public mutating func withUnsafeMutableWritableBytes<T>(_ fn: (UnsafeMutableRawBufferPointer) throws -> T) rethrows -> T {
|
public mutating func withUnsafeMutableWritableBytes<T>(_ body: (UnsafeMutableRawBufferPointer) throws -> T) rethrows -> T {
|
||||||
self.copyStorageAndRebaseIfNeeded()
|
self.copyStorageAndRebaseIfNeeded()
|
||||||
return try fn(UnsafeMutableRawBufferPointer(start: self._storage.bytes.advanced(by: Int(self._slice.lowerBound + self._writerIndex)),
|
return try body(UnsafeMutableRawBufferPointer(start: self._storage.bytes.advanced(by: Int(self._slice.lowerBound + self._writerIndex)),
|
||||||
count: self.writableBytes))
|
count: self.writableBytes))
|
||||||
}
|
}
|
||||||
|
|
||||||
@discardableResult
|
@discardableResult
|
||||||
public mutating func writeWithUnsafeMutableBytes(_ fn: (UnsafeMutableRawBufferPointer) throws -> Int) rethrows -> Int {
|
public mutating func writeWithUnsafeMutableBytes(_ body: (UnsafeMutableRawBufferPointer) throws -> Int) rethrows -> Int {
|
||||||
let bytesWritten = try withUnsafeMutableWritableBytes(fn)
|
let bytesWritten = try withUnsafeMutableWritableBytes(body)
|
||||||
self.moveWriterIndex(to: self._writerIndex + toIndex(bytesWritten))
|
self.moveWriterIndex(to: self._writerIndex + toIndex(bytesWritten))
|
||||||
return bytesWritten
|
return bytesWritten
|
||||||
}
|
}
|
||||||
|
@ -422,8 +422,8 @@ public struct ByteBuffer {
|
||||||
/// uninitialised memory and it's undefined behaviour to read it. In most cases you should use `withUnsafeReadableBytes`.
|
/// uninitialised memory and it's undefined behaviour to read it. In most cases you should use `withUnsafeReadableBytes`.
|
||||||
///
|
///
|
||||||
/// - warning: Do not escape the pointer from the closure for later use.
|
/// - warning: Do not escape the pointer from the closure for later use.
|
||||||
public func withVeryUnsafeBytes<T>(_ fn: (UnsafeRawBufferPointer) throws -> T) rethrows -> T {
|
public func withVeryUnsafeBytes<T>(_ body: (UnsafeRawBufferPointer) throws -> T) rethrows -> T {
|
||||||
return try fn(UnsafeRawBufferPointer(start: self._storage.bytes.advanced(by: Int(self._slice.lowerBound)),
|
return try body(UnsafeRawBufferPointer(start: self._storage.bytes.advanced(by: Int(self._slice.lowerBound)),
|
||||||
count: self._slice.count))
|
count: self._slice.count))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -432,10 +432,10 @@ public struct ByteBuffer {
|
||||||
/// - warning: Do not escape the pointer from the closure for later use.
|
/// - warning: Do not escape the pointer from the closure for later use.
|
||||||
///
|
///
|
||||||
/// - parameters:
|
/// - parameters:
|
||||||
/// - fn: The closure that will accept the yielded bytes.
|
/// - body: The closure that will accept the yielded bytes.
|
||||||
/// - returns: The value returned by `fn`.
|
/// - returns: The value returned by `fn`.
|
||||||
public func withUnsafeReadableBytes<T>(_ fn: (UnsafeRawBufferPointer) throws -> T) rethrows -> T {
|
public func withUnsafeReadableBytes<T>(_ body: (UnsafeRawBufferPointer) throws -> T) rethrows -> T {
|
||||||
return try fn(UnsafeRawBufferPointer(start: self._storage.bytes.advanced(by: Int(self._slice.lowerBound + self._readerIndex)),
|
return try body(UnsafeRawBufferPointer(start: self._storage.bytes.advanced(by: Int(self._slice.lowerBound + self._readerIndex)),
|
||||||
count: self.readableBytes))
|
count: self.readableBytes))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -448,18 +448,18 @@ public struct ByteBuffer {
|
||||||
/// `retain` and `release` must be balanced.
|
/// `retain` and `release` must be balanced.
|
||||||
///
|
///
|
||||||
/// - parameters:
|
/// - parameters:
|
||||||
/// - fn: The closure that will accept the yielded bytes and the `storageManagement`.
|
/// - body: The closure that will accept the yielded bytes and the `storageManagement`.
|
||||||
/// - returns: The value returned by `fn`.
|
/// - returns: The value returned by `fn`.
|
||||||
public func withUnsafeReadableBytesWithStorageManagement<T>(_ fn: (UnsafeRawBufferPointer, Unmanaged<AnyObject>) throws -> T) rethrows -> T {
|
public func withUnsafeReadableBytesWithStorageManagement<T>(_ body: (UnsafeRawBufferPointer, Unmanaged<AnyObject>) throws -> T) rethrows -> T {
|
||||||
let storageReference: Unmanaged<AnyObject> = Unmanaged.passUnretained(self._storage)
|
let storageReference: Unmanaged<AnyObject> = Unmanaged.passUnretained(self._storage)
|
||||||
return try fn(UnsafeRawBufferPointer(start: self._storage.bytes.advanced(by: Int(self._slice.lowerBound + self._readerIndex)),
|
return try body(UnsafeRawBufferPointer(start: self._storage.bytes.advanced(by: Int(self._slice.lowerBound + self._readerIndex)),
|
||||||
count: self.readableBytes), storageReference)
|
count: self.readableBytes), storageReference)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// See `withUnsafeReadableBytesWithStorageManagement` and `withVeryUnsafeBytes`.
|
/// See `withUnsafeReadableBytesWithStorageManagement` and `withVeryUnsafeBytes`.
|
||||||
public func withVeryUnsafeBytesWithStorageManagement<T>(_ fn: (UnsafeRawBufferPointer, Unmanaged<AnyObject>) throws -> T) rethrows -> T {
|
public func withVeryUnsafeBytesWithStorageManagement<T>(_ body: (UnsafeRawBufferPointer, Unmanaged<AnyObject>) throws -> T) rethrows -> T {
|
||||||
let storageReference: Unmanaged<AnyObject> = Unmanaged.passUnretained(self._storage)
|
let storageReference: Unmanaged<AnyObject> = Unmanaged.passUnretained(self._storage)
|
||||||
return try fn(UnsafeRawBufferPointer(start: self._storage.bytes.advanced(by: Int(self._slice.lowerBound)),
|
return try body(UnsafeRawBufferPointer(start: self._storage.bytes.advanced(by: Int(self._slice.lowerBound)),
|
||||||
count: self._slice.count), storageReference)
|
count: self._slice.count), storageReference)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -569,7 +569,7 @@ extension ByteBuffer: CustomStringConvertible {
|
||||||
|
|
||||||
/// A `Collection` that is contiguously layed out in memory and can therefore be duplicated using `memcpy`.
|
/// A `Collection` that is contiguously layed out in memory and can therefore be duplicated using `memcpy`.
|
||||||
public protocol ContiguousCollection: Collection {
|
public protocol ContiguousCollection: Collection {
|
||||||
func withUnsafeBytes<R>(_ fn: (UnsafeRawBufferPointer) throws -> R) rethrows -> R
|
func withUnsafeBytes<R>(_ body: (UnsafeRawBufferPointer) throws -> R) rethrows -> R
|
||||||
}
|
}
|
||||||
|
|
||||||
extension StaticString: Collection {
|
extension StaticString: Collection {
|
||||||
|
@ -592,18 +592,18 @@ extension StaticString: Collection {
|
||||||
extension Array: ContiguousCollection {}
|
extension Array: ContiguousCollection {}
|
||||||
extension ContiguousArray: ContiguousCollection {}
|
extension ContiguousArray: ContiguousCollection {}
|
||||||
extension StaticString: ContiguousCollection {
|
extension StaticString: ContiguousCollection {
|
||||||
public func withUnsafeBytes<R>(_ fn: (UnsafeRawBufferPointer) throws -> R) rethrows -> R {
|
public func withUnsafeBytes<R>(_ body: (UnsafeRawBufferPointer) throws -> R) rethrows -> R {
|
||||||
return try fn(UnsafeRawBufferPointer(start: self.utf8Start, count: self.utf8CodeUnitCount))
|
return try body(UnsafeRawBufferPointer(start: self.utf8Start, count: self.utf8CodeUnitCount))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
extension UnsafeRawBufferPointer: ContiguousCollection {
|
extension UnsafeRawBufferPointer: ContiguousCollection {
|
||||||
public func withUnsafeBytes<R>(_ fn: (UnsafeRawBufferPointer) throws -> R) rethrows -> R {
|
public func withUnsafeBytes<R>(_ body: (UnsafeRawBufferPointer) throws -> R) rethrows -> R {
|
||||||
return try fn(self)
|
return try body(self)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
extension UnsafeBufferPointer: ContiguousCollection {
|
extension UnsafeBufferPointer: ContiguousCollection {
|
||||||
public func withUnsafeBytes<R>(_ fn: (UnsafeRawBufferPointer) throws -> R) rethrows -> R {
|
public func withUnsafeBytes<R>(_ body: (UnsafeRawBufferPointer) throws -> R) rethrows -> R {
|
||||||
return try fn(UnsafeRawBufferPointer(self))
|
return try body(UnsafeRawBufferPointer(self))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,7 +17,7 @@ import NIOConcurrencyHelpers
|
||||||
/// The core `Channel` methods for NIO-internal use only.
|
/// The core `Channel` methods for NIO-internal use only.
|
||||||
///
|
///
|
||||||
/// - note: All methods must be called from the `EventLoop` thread.
|
/// - note: All methods must be called from the `EventLoop` thread.
|
||||||
public protocol ChannelCore : class {
|
public protocol ChannelCore: class {
|
||||||
/// Returns the local bound `SocketAddress`.
|
/// Returns the local bound `SocketAddress`.
|
||||||
func localAddress0() throws -> SocketAddress
|
func localAddress0() throws -> SocketAddress
|
||||||
|
|
||||||
|
@ -95,7 +95,7 @@ public protocol ChannelCore : class {
|
||||||
/// passed to or returned by the operations are used to retrieve the result of an operation after it has completed.
|
/// passed to or returned by the operations are used to retrieve the result of an operation after it has completed.
|
||||||
///
|
///
|
||||||
/// A `Channel` owns its `ChannelPipeline` which handles all I/O events and requests associated with the `Channel`.
|
/// A `Channel` owns its `ChannelPipeline` which handles all I/O events and requests associated with the `Channel`.
|
||||||
public protocol Channel : class, ChannelOutboundInvoker {
|
public protocol Channel: class, ChannelOutboundInvoker {
|
||||||
/// The `Channel`'s `ByteBuffer` allocator. This is _the only_ supported way of allocating `ByteBuffer`s to be used with this `Channel`.
|
/// The `Channel`'s `ByteBuffer` allocator. This is _the only_ supported way of allocating `ByteBuffer`s to be used with this `Channel`.
|
||||||
var allocator: ByteBufferAllocator { get }
|
var allocator: ByteBufferAllocator { get }
|
||||||
|
|
||||||
|
@ -139,7 +139,7 @@ public protocol Channel : class, ChannelOutboundInvoker {
|
||||||
/// before possible. On UNIX a `Selector` is usually an abstraction of `select`, `poll`, `epoll` or `kqueue`.
|
/// before possible. On UNIX a `Selector` is usually an abstraction of `select`, `poll`, `epoll` or `kqueue`.
|
||||||
///
|
///
|
||||||
/// - warning: `SelectableChannel` methods and properties are _not_ thread-safe (unless they also belong to `Channel`).
|
/// - warning: `SelectableChannel` methods and properties are _not_ thread-safe (unless they also belong to `Channel`).
|
||||||
internal protocol SelectableChannel : Channel {
|
internal protocol SelectableChannel: Channel {
|
||||||
/// The type of the `Selectable`. A `Selectable` is usually wrapping a file descriptor that can be registered in a
|
/// The type of the `Selectable`. A `Selectable` is usually wrapping a file descriptor that can be registered in a
|
||||||
/// `Selector`.
|
/// `Selector`.
|
||||||
associatedtype SelectableType: Selectable
|
associatedtype SelectableType: Selectable
|
||||||
|
|
|
@ -34,7 +34,7 @@ public protocol ChannelHandler: class {
|
||||||
/// Untyped `ChannelHandler` which handles outbound I/O events or intercept an outbound I/O operation.
|
/// Untyped `ChannelHandler` which handles outbound I/O events or intercept an outbound I/O operation.
|
||||||
///
|
///
|
||||||
/// We _strongly_ advice against implementing this protocol directly. Please implement `ChannelOutboundHandler`.
|
/// We _strongly_ advice against implementing this protocol directly. Please implement `ChannelOutboundHandler`.
|
||||||
public protocol _ChannelOutboundHandler : ChannelHandler {
|
public protocol _ChannelOutboundHandler: ChannelHandler {
|
||||||
|
|
||||||
/// Called to request that the `Channel` register itself for I/O events with its `EventLoop`.
|
/// Called to request that the `Channel` register itself for I/O events with its `EventLoop`.
|
||||||
/// This should call `ctx.register` to forward the operation to the next `_ChannelOutboundHandler` in the `ChannelPipeline` or
|
/// This should call `ctx.register` to forward the operation to the next `_ChannelOutboundHandler` in the `ChannelPipeline` or
|
||||||
|
@ -125,7 +125,7 @@ public protocol _ChannelOutboundHandler : ChannelHandler {
|
||||||
/// Untyped `ChannelHandler` which handles inbound I/O events.
|
/// Untyped `ChannelHandler` which handles inbound I/O events.
|
||||||
///
|
///
|
||||||
/// We _strongly_ advice against implementing this protocol directly. Please implement `ChannelInboundHandler`.
|
/// We _strongly_ advice against implementing this protocol directly. Please implement `ChannelInboundHandler`.
|
||||||
public protocol _ChannelInboundHandler : ChannelHandler {
|
public protocol _ChannelInboundHandler: ChannelHandler {
|
||||||
|
|
||||||
/// Called when the `Channel` has successfully registered with its `EventLoop` to handle I/O.
|
/// Called when the `Channel` has successfully registered with its `EventLoop` to handle I/O.
|
||||||
///
|
///
|
||||||
|
|
|
@ -27,7 +27,7 @@ public class BackPressureHandler: ChannelInboundHandler, _ChannelOutboundHandler
|
||||||
public typealias OutboundOut = ByteBuffer
|
public typealias OutboundOut = ByteBuffer
|
||||||
|
|
||||||
private var pendingRead = false
|
private var pendingRead = false
|
||||||
private var writable: Bool = true;
|
private var writable: Bool = true
|
||||||
|
|
||||||
public init() { }
|
public init() { }
|
||||||
|
|
||||||
|
@ -64,7 +64,7 @@ public class BackPressureHandler: ChannelInboundHandler, _ChannelOutboundHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Triggers an IdleStateEvent when a Channel has not performed read, write, or both operation for a while.
|
/// Triggers an IdleStateEvent when a Channel has not performed read, write, or both operation for a while.
|
||||||
public class IdleStateHandler : ChannelInboundHandler, ChannelOutboundHandler {
|
public class IdleStateHandler: ChannelInboundHandler, ChannelOutboundHandler {
|
||||||
public typealias InboundIn = NIOAny
|
public typealias InboundIn = NIOAny
|
||||||
public typealias InboundOut = NIOAny
|
public typealias InboundOut = NIOAny
|
||||||
public typealias OutboundIn = NIOAny
|
public typealias OutboundIn = NIOAny
|
||||||
|
@ -145,7 +145,7 @@ public class IdleStateHandler : ChannelInboundHandler, ChannelOutboundHandler {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
private func newReadTimeoutTask(_ ctx: ChannelHandlerContext, _ timeout: TimeAmount) -> () -> () {
|
private func newReadTimeoutTask(_ ctx: ChannelHandlerContext, _ timeout: TimeAmount) -> (() -> Void) {
|
||||||
return {
|
return {
|
||||||
guard self.shouldReschedule(ctx) else {
|
guard self.shouldReschedule(ctx) else {
|
||||||
return
|
return
|
||||||
|
@ -169,7 +169,7 @@ public class IdleStateHandler : ChannelInboundHandler, ChannelOutboundHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private func newWriteTimeoutTask(_ ctx: ChannelHandlerContext, _ timeout: TimeAmount) -> () -> () {
|
private func newWriteTimeoutTask(_ ctx: ChannelHandlerContext, _ timeout: TimeAmount) -> (() -> Void) {
|
||||||
return {
|
return {
|
||||||
guard self.shouldReschedule(ctx) else {
|
guard self.shouldReschedule(ctx) else {
|
||||||
return
|
return
|
||||||
|
@ -190,7 +190,7 @@ public class IdleStateHandler : ChannelInboundHandler, ChannelOutboundHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private func newAllTimeoutTask(_ ctx: ChannelHandlerContext, _ timeout: TimeAmount) -> () -> () {
|
private func newAllTimeoutTask(_ ctx: ChannelHandlerContext, _ timeout: TimeAmount) -> (() -> Void) {
|
||||||
return {
|
return {
|
||||||
guard self.shouldReschedule(ctx) else {
|
guard self.shouldReschedule(ctx) else {
|
||||||
return
|
return
|
||||||
|
@ -216,9 +216,9 @@ public class IdleStateHandler : ChannelInboundHandler, ChannelOutboundHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private func schedule(_ ctx: ChannelHandlerContext, _ amount: TimeAmount?, _ fn: @escaping (ChannelHandlerContext, TimeAmount) -> () -> ()) -> Scheduled<Void>? {
|
private func schedule(_ ctx: ChannelHandlerContext, _ amount: TimeAmount?, _ body: @escaping (ChannelHandlerContext, TimeAmount) -> (() -> Void) ) -> Scheduled<Void>? {
|
||||||
if let timeout = amount {
|
if let timeout = amount {
|
||||||
return ctx.eventLoop.scheduleTask(in: timeout, fn(ctx, timeout))
|
return ctx.eventLoop.scheduleTask(in: timeout, body(ctx, timeout))
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -241,7 +241,7 @@ public protocol ChannelInboundInvoker {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A protocol that signals that outbound and inbound events are triggered by this invoker.
|
/// A protocol that signals that outbound and inbound events are triggered by this invoker.
|
||||||
public protocol ChannelInvoker : ChannelOutboundInvoker, ChannelInboundInvoker { }
|
public protocol ChannelInvoker: ChannelOutboundInvoker, ChannelInboundInvoker { }
|
||||||
|
|
||||||
/// Specify what kind of close operation is requested.
|
/// Specify what kind of close operation is requested.
|
||||||
public enum CloseMode {
|
public enum CloseMode {
|
||||||
|
|
|
@ -85,7 +85,7 @@
|
||||||
/// For example, let us assume that we created the following pipeline:
|
/// For example, let us assume that we created the following pipeline:
|
||||||
///
|
///
|
||||||
/// ```
|
/// ```
|
||||||
/// ChannelPipeline p = ...;
|
/// ChannelPipeline p = ...
|
||||||
/// let future = p.add(name: "1", handler: InboundHandlerA()).then {
|
/// let future = p.add(name: "1", handler: InboundHandlerA()).then {
|
||||||
/// p.add(name: "2", handler: InboundHandlerB())
|
/// p.add(name: "2", handler: InboundHandlerB())
|
||||||
/// }.then {
|
/// }.then {
|
||||||
|
@ -133,7 +133,7 @@
|
||||||
/// # Thread safety
|
/// # Thread safety
|
||||||
///
|
///
|
||||||
/// A `ChannelHandler` can be added or removed at any time because a `ChannelPipeline` is thread safe.
|
/// A `ChannelHandler` can be added or removed at any time because a `ChannelPipeline` is thread safe.
|
||||||
public final class ChannelPipeline : ChannelInvoker {
|
public final class ChannelPipeline: ChannelInvoker {
|
||||||
private var head: ChannelHandlerContext?
|
private var head: ChannelHandlerContext?
|
||||||
private var tail: ChannelHandlerContext?
|
private var tail: ChannelHandlerContext?
|
||||||
|
|
||||||
|
@ -271,13 +271,13 @@ public final class ChannelPipeline : ChannelInvoker {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Find a `ChannelHandlerContext` in the `ChannelPipeline`.
|
/// Find a `ChannelHandlerContext` in the `ChannelPipeline`.
|
||||||
private func context0(_ fn: @escaping ((ChannelHandlerContext) -> Bool)) -> EventLoopFuture<ChannelHandlerContext> {
|
private func context0(_ body: @escaping ((ChannelHandlerContext) -> Bool)) -> EventLoopFuture<ChannelHandlerContext> {
|
||||||
let promise: EventLoopPromise<ChannelHandlerContext> = eventLoop.newPromise()
|
let promise: EventLoopPromise<ChannelHandlerContext> = eventLoop.newPromise()
|
||||||
|
|
||||||
func _context0() {
|
func _context0() {
|
||||||
var curCtx: ChannelHandlerContext? = self.head
|
var curCtx: ChannelHandlerContext? = self.head
|
||||||
while let ctx = curCtx {
|
while let ctx = curCtx {
|
||||||
if fn(ctx) {
|
if body(ctx) {
|
||||||
promise.succeed(result: ctx)
|
promise.succeed(result: ctx)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -658,7 +658,7 @@ public final class ChannelPipeline : ChannelInvoker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private var inEventLoop : Bool {
|
private var inEventLoop: Bool {
|
||||||
return eventLoop.inEventLoop
|
return eventLoop.inEventLoop
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -677,7 +677,7 @@ public final class ChannelPipeline : ChannelInvoker {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Special `ChannelHandler` that forwards all events to the `Channel.Unsafe` implementation.
|
/// Special `ChannelHandler` that forwards all events to the `Channel.Unsafe` implementation.
|
||||||
private final class HeadChannelHandler : _ChannelOutboundHandler {
|
private final class HeadChannelHandler: _ChannelOutboundHandler {
|
||||||
|
|
||||||
static let sharedInstance = HeadChannelHandler()
|
static let sharedInstance = HeadChannelHandler()
|
||||||
|
|
||||||
|
@ -731,7 +731,7 @@ private extension CloseMode {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Special `ChannelInboundHandler` which will consume all inbound events.
|
/// Special `ChannelInboundHandler` which will consume all inbound events.
|
||||||
private final class TailChannelHandler : _ChannelInboundHandler, _ChannelOutboundHandler {
|
private final class TailChannelHandler: _ChannelInboundHandler, _ChannelOutboundHandler {
|
||||||
|
|
||||||
static let sharedInstance = TailChannelHandler()
|
static let sharedInstance = TailChannelHandler()
|
||||||
|
|
||||||
|
@ -775,7 +775,7 @@ private final class TailChannelHandler : _ChannelInboundHandler, _ChannelOutboun
|
||||||
}
|
}
|
||||||
|
|
||||||
/// `Error` that is used by the `ChannelPipeline` to inform the user of an error.
|
/// `Error` that is used by the `ChannelPipeline` to inform the user of an error.
|
||||||
public enum ChannelPipelineError : Error {
|
public enum ChannelPipelineError: Error {
|
||||||
/// `ChannelHandler` was already removed.
|
/// `ChannelHandler` was already removed.
|
||||||
case alreadyRemoved
|
case alreadyRemoved
|
||||||
/// `ChannelHandler` was not found.
|
/// `ChannelHandler` was not found.
|
||||||
|
@ -792,7 +792,7 @@ public enum ChannelPipelineError : Error {
|
||||||
/// Many events are instrumental for a `ChannelHandler`'s life-cycle and it is therefore very important to send them
|
/// Many events are instrumental for a `ChannelHandler`'s life-cycle and it is therefore very important to send them
|
||||||
/// at the right point in time. Often, the right behaviour is to react to an event and then forward it to the next
|
/// at the right point in time. Often, the right behaviour is to react to an event and then forward it to the next
|
||||||
/// `ChannelHandler`.
|
/// `ChannelHandler`.
|
||||||
public final class ChannelHandlerContext : ChannelInvoker {
|
public final class ChannelHandlerContext: ChannelInvoker {
|
||||||
// visible for ChannelPipeline to modify
|
// visible for ChannelPipeline to modify
|
||||||
fileprivate var next: ChannelHandlerContext?
|
fileprivate var next: ChannelHandlerContext?
|
||||||
fileprivate var prev: ChannelHandlerContext?
|
fileprivate var prev: ChannelHandlerContext?
|
||||||
|
@ -1215,7 +1215,7 @@ public final class ChannelHandlerContext : ChannelInvoker {
|
||||||
handler.handlerRemoved(ctx: self)
|
handler.handlerRemoved(ctx: self)
|
||||||
}
|
}
|
||||||
|
|
||||||
private var inEventLoop : Bool {
|
private var inEventLoop: Bool {
|
||||||
return eventLoop.inEventLoop
|
return eventLoop.inEventLoop
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,7 +38,7 @@ public enum DecodingState {
|
||||||
/// If you move the reader index forward, either manually or by using one of `buffer.read*` methods, you must ensure
|
/// If you move the reader index forward, either manually or by using one of `buffer.read*` methods, you must ensure
|
||||||
/// that you no longer need to see those bytes again as they will not be returned to you the next time `decode` is called.
|
/// that you no longer need to see those bytes again as they will not be returned to you the next time `decode` is called.
|
||||||
/// If you still need those bytes to come back, consider taking a local copy of buffer inside the function to perform your read operations on.
|
/// If you still need those bytes to come back, consider taking a local copy of buffer inside the function to perform your read operations on.
|
||||||
public protocol ByteToMessageDecoder : ChannelInboundHandler where InboundIn == ByteBuffer {
|
public protocol ByteToMessageDecoder: ChannelInboundHandler where InboundIn == ByteBuffer {
|
||||||
/// The cumulationBuffer which will be used to buffer any data.
|
/// The cumulationBuffer which will be used to buffer any data.
|
||||||
var cumulationBuffer: ByteBuffer? { get set }
|
var cumulationBuffer: ByteBuffer? { get set }
|
||||||
|
|
||||||
|
@ -180,7 +180,7 @@ extension ByteToMessageDecoder {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// `ChannelOutboundHandler` which allows users to encode custom messages to a `ByteBuffer` easily.
|
/// `ChannelOutboundHandler` which allows users to encode custom messages to a `ByteBuffer` easily.
|
||||||
public protocol MessageToByteEncoder : ChannelOutboundHandler where OutboundOut == ByteBuffer {
|
public protocol MessageToByteEncoder: ChannelOutboundHandler where OutboundOut == ByteBuffer {
|
||||||
|
|
||||||
/// Called once there is data to encode. The used `ByteBuffer` is allocated by `allocateOutBuffer`.
|
/// Called once there is data to encode. The used `ByteBuffer` is allocated by `allocateOutBuffer`.
|
||||||
///
|
///
|
||||||
|
|
|
@ -99,11 +99,11 @@ internal final class DeadChannel: Channel {
|
||||||
|
|
||||||
let parent: Channel? = nil
|
let parent: Channel? = nil
|
||||||
|
|
||||||
func setOption<T>(option: T, value: T.OptionType) -> EventLoopFuture<Void> where T : ChannelOption {
|
func setOption<T>(option: T, value: T.OptionType) -> EventLoopFuture<Void> where T: ChannelOption {
|
||||||
return EventLoopFuture(eventLoop: self.pipeline.eventLoop, error: ChannelError.ioOnClosedChannel, file: #file, line: #line)
|
return EventLoopFuture(eventLoop: self.pipeline.eventLoop, error: ChannelError.ioOnClosedChannel, file: #file, line: #line)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getOption<T>(option: T) -> EventLoopFuture<T.OptionType> where T : ChannelOption {
|
func getOption<T>(option: T) -> EventLoopFuture<T.OptionType> where T: ChannelOption {
|
||||||
return eventLoop.newFailedFuture(error: ChannelError.ioOnClosedChannel)
|
return eventLoop.newFailedFuture(error: ChannelError.ioOnClosedChannel)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,10 +16,10 @@ import Dispatch
|
||||||
import NIOPriorityQueue
|
import NIOPriorityQueue
|
||||||
|
|
||||||
private final class EmbeddedScheduledTask {
|
private final class EmbeddedScheduledTask {
|
||||||
let task: () -> ()
|
let task: () -> Void
|
||||||
let readyTime: UInt64
|
let readyTime: UInt64
|
||||||
|
|
||||||
init(readyTime: UInt64, task: @escaping () -> ()) {
|
init(readyTime: UInt64, task: @escaping () -> Void) {
|
||||||
self.readyTime = readyTime
|
self.readyTime = readyTime
|
||||||
self.task = task
|
self.task = task
|
||||||
}
|
}
|
||||||
|
@ -57,7 +57,7 @@ public class EmbeddedEventLoop: EventLoop {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
var tasks = CircularBuffer<() -> ()>(initialRingCapacity: 2)
|
var tasks = CircularBuffer<() -> Void>(initialRingCapacity: 2)
|
||||||
|
|
||||||
public init() { }
|
public init() { }
|
||||||
|
|
||||||
|
@ -82,7 +82,7 @@ public class EmbeddedEventLoop: EventLoop {
|
||||||
// We're not really running a loop here. Tasks aren't run until run() is called,
|
// We're not really running a loop here. Tasks aren't run until run() is called,
|
||||||
// at which point we run everything that's been submitted. Anything newly submitted
|
// at which point we run everything that's been submitted. Anything newly submitted
|
||||||
// either gets on that train if it's still moving or waits until the next call to run().
|
// either gets on that train if it's still moving or waits until the next call to run().
|
||||||
public func execute(task: @escaping () -> ()) {
|
public func execute(_ task: @escaping () -> Void) {
|
||||||
tasks.append(task)
|
tasks.append(task)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -137,7 +137,7 @@ public class EmbeddedEventLoop: EventLoop {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class EmbeddedChannelCore : ChannelCore {
|
class EmbeddedChannelCore: ChannelCore {
|
||||||
var isOpen: Bool = true
|
var isOpen: Bool = true
|
||||||
var isActive: Bool = false
|
var isActive: Bool = false
|
||||||
|
|
||||||
|
@ -241,7 +241,7 @@ class EmbeddedChannelCore : ChannelCore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public class EmbeddedChannel : Channel {
|
public class EmbeddedChannel: Channel {
|
||||||
|
|
||||||
public var isActive: Bool { return channelcore.isActive }
|
public var isActive: Bool { return channelcore.isActive }
|
||||||
public var closeFuture: EventLoopFuture<Void> { return channelcore.closePromise.futureResult }
|
public var closeFuture: EventLoopFuture<Void> { return channelcore.closePromise.futureResult }
|
||||||
|
@ -338,12 +338,12 @@ public class EmbeddedChannel : Channel {
|
||||||
_ = try? register().wait()
|
_ = try? register().wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
public func setOption<T>(option: T, value: T.OptionType) -> EventLoopFuture<Void> where T : ChannelOption {
|
public func setOption<T>(option: T, value: T.OptionType) -> EventLoopFuture<Void> where T: ChannelOption {
|
||||||
// No options supported
|
// No options supported
|
||||||
fatalError("no options supported")
|
fatalError("no options supported")
|
||||||
}
|
}
|
||||||
|
|
||||||
public func getOption<T>(option: T) -> EventLoopFuture<T.OptionType> where T : ChannelOption {
|
public func getOption<T>(option: T) -> EventLoopFuture<T.OptionType> where T: ChannelOption {
|
||||||
if option is AutoReadOption {
|
if option is AutoReadOption {
|
||||||
return self.eventLoop.newSucceededFuture(result: true as! T.OptionType)
|
return self.eventLoop.newSucceededFuture(result: true as! T.OptionType)
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,9 +22,9 @@ import NIOPriorityQueue
|
||||||
/// will be notified once the execution is complete.
|
/// will be notified once the execution is complete.
|
||||||
public struct Scheduled<T> {
|
public struct Scheduled<T> {
|
||||||
private let promise: EventLoopPromise<T>
|
private let promise: EventLoopPromise<T>
|
||||||
private let cancellationTask: () -> ()
|
private let cancellationTask: () -> Void
|
||||||
|
|
||||||
init(promise: EventLoopPromise<T>, cancellationTask: @escaping () -> ()) {
|
init(promise: EventLoopPromise<T>, cancellationTask: @escaping () -> Void) {
|
||||||
self.promise = promise
|
self.promise = promise
|
||||||
promise.futureResult.whenFailure { error in
|
promise.futureResult.whenFailure { error in
|
||||||
guard let err = error as? EventLoopError else {
|
guard let err = error as? EventLoopError else {
|
||||||
|
@ -82,14 +82,14 @@ public protocol EventLoop: EventLoopGroup {
|
||||||
var inEventLoop: Bool { get }
|
var inEventLoop: Bool { get }
|
||||||
|
|
||||||
/// Submit a given task to be executed by the `EventLoop`
|
/// Submit a given task to be executed by the `EventLoop`
|
||||||
func execute(task: @escaping () -> ())
|
func execute(_ task: @escaping () -> Void)
|
||||||
|
|
||||||
/// Submit a given task to be executed by the `EventLoop`. Once the execution is complete the returned `EventLoopFuture` is notified.
|
/// Submit a given task to be executed by the `EventLoop`. Once the execution is complete the returned `EventLoopFuture` is notified.
|
||||||
///
|
///
|
||||||
/// - parameters:
|
/// - parameters:
|
||||||
/// - task: The closure that will be submited to the `EventLoop` for execution.
|
/// - task: The closure that will be submited to the `EventLoop` for execution.
|
||||||
/// - returns: `EventLoopFuture` that is notified once the task was executed.
|
/// - returns: `EventLoopFuture` that is notified once the task was executed.
|
||||||
func submit<T>(task: @escaping () throws-> (T)) -> EventLoopFuture<T>
|
func submit<T>(_ task: @escaping () throws-> (T)) -> EventLoopFuture<T>
|
||||||
|
|
||||||
/// Schedule a `task` that is executed by this `SelectableEventLoop` after the given amount of time.
|
/// Schedule a `task` that is executed by this `SelectableEventLoop` after the given amount of time.
|
||||||
func scheduleTask<T>(in: TimeAmount, _ task: @escaping () throws-> (T)) -> Scheduled<T>
|
func scheduleTask<T>(in: TimeAmount, _ task: @escaping () throws-> (T)) -> Scheduled<T>
|
||||||
|
@ -171,16 +171,16 @@ extension TimeAmount: Comparable {
|
||||||
}
|
}
|
||||||
|
|
||||||
extension EventLoop {
|
extension EventLoop {
|
||||||
public func submit<T>(task: @escaping () throws-> (T)) -> EventLoopFuture<T> {
|
public func submit<T>(_ task: @escaping () throws-> (T)) -> EventLoopFuture<T> {
|
||||||
let promise: EventLoopPromise<T> = newPromise(file: #file, line: #line)
|
let promise: EventLoopPromise<T> = newPromise(file: #file, line: #line)
|
||||||
|
|
||||||
execute(task: {() -> () in
|
self.execute {
|
||||||
do {
|
do {
|
||||||
promise.succeed(result: try task())
|
promise.succeed(result: try task())
|
||||||
} catch let err {
|
} catch let err {
|
||||||
promise.fail(error: err)
|
promise.fail(error: err)
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
|
|
||||||
return promise.futureResult
|
return promise.futureResult
|
||||||
}
|
}
|
||||||
|
@ -276,7 +276,7 @@ private enum EventLoopLifecycleState {
|
||||||
/// `EventLoop` implementation that uses a `Selector` to get notified once there is more I/O or tasks to process.
|
/// `EventLoop` implementation that uses a `Selector` to get notified once there is more I/O or tasks to process.
|
||||||
/// The whole processing of I/O and tasks is done by a `Thread` that is tied to the `SelectableEventLoop`. This `Thread`
|
/// The whole processing of I/O and tasks is done by a `Thread` that is tied to the `SelectableEventLoop`. This `Thread`
|
||||||
/// is guaranteed to never change!
|
/// is guaranteed to never change!
|
||||||
internal final class SelectableEventLoop : EventLoop {
|
internal final class SelectableEventLoop: EventLoop {
|
||||||
private let selector: NIO.Selector<NIORegistration>
|
private let selector: NIO.Selector<NIORegistration>
|
||||||
private let thread: Thread
|
private let thread: Thread
|
||||||
private var scheduledTasks = PriorityQueue<ScheduledTask>(ascending: true)
|
private var scheduledTasks = PriorityQueue<ScheduledTask>(ascending: true)
|
||||||
|
@ -390,7 +390,7 @@ internal final class SelectableEventLoop : EventLoop {
|
||||||
return scheduled
|
return scheduled
|
||||||
}
|
}
|
||||||
|
|
||||||
public func execute(task: @escaping () -> ()) {
|
public func execute(_ task: @escaping () -> Void) {
|
||||||
schedule0(ScheduledTask(task, { error in
|
schedule0(ScheduledTask(task, { error in
|
||||||
// do nothing
|
// do nothing
|
||||||
}, .nanoseconds(0)))
|
}, .nanoseconds(0)))
|
||||||
|
@ -507,7 +507,7 @@ internal final class SelectableEventLoop : EventLoop {
|
||||||
tasksLock.unlock()
|
tasksLock.unlock()
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
var tasksCopy = ContiguousArray<() -> ()>()
|
var tasksCopy = ContiguousArray<() -> Void>()
|
||||||
|
|
||||||
// We only fetch the time one time as this may be expensive and is generally good enough as if we miss anything we will just do a non-blocking select again anyway.
|
// We only fetch the time one time as this may be expensive and is generally good enough as if we miss anything we will just do a non-blocking select again anyway.
|
||||||
let now = DispatchTime.now()
|
let now = DispatchTime.now()
|
||||||
|
@ -561,9 +561,9 @@ internal final class SelectableEventLoop : EventLoop {
|
||||||
if inEventLoop {
|
if inEventLoop {
|
||||||
self.lifecycleState = .closed
|
self.lifecycleState = .closed
|
||||||
} else {
|
} else {
|
||||||
_ = self.submit(task: { () -> (Void) in
|
self.execute {
|
||||||
self.lifecycleState = .closed
|
self.lifecycleState = .closed
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -659,7 +659,7 @@ extension EventLoopGroup {
|
||||||
typealias ThreadInitializer = (Thread) -> Void
|
typealias ThreadInitializer = (Thread) -> Void
|
||||||
|
|
||||||
/// An `EventLoopGroup` which will create multiple `EventLoop`s, each tied to its own `Thread`.
|
/// An `EventLoopGroup` which will create multiple `EventLoop`s, each tied to its own `Thread`.
|
||||||
final public class MultiThreadedEventLoopGroup : EventLoopGroup {
|
final public class MultiThreadedEventLoopGroup: EventLoopGroup {
|
||||||
|
|
||||||
private let index = Atomic<Int>(value: 0)
|
private let index = Atomic<Int>(value: 0)
|
||||||
private let eventLoops: [SelectableEventLoop]
|
private let eventLoops: [SelectableEventLoop]
|
||||||
|
@ -759,11 +759,11 @@ final public class MultiThreadedEventLoopGroup : EventLoopGroup {
|
||||||
}
|
}
|
||||||
|
|
||||||
private final class ScheduledTask {
|
private final class ScheduledTask {
|
||||||
let task: () -> ()
|
let task: () -> Void
|
||||||
private let failFn: (Error) ->()
|
private let failFn: (Error) ->()
|
||||||
private let readyTime: Int
|
private let readyTime: Int
|
||||||
|
|
||||||
init(_ task: @escaping () -> (), _ failFn: @escaping (Error) -> (), _ time: TimeAmount) {
|
init(_ task: @escaping () -> Void, _ failFn: @escaping (Error) -> Void, _ time: TimeAmount) {
|
||||||
self.task = task
|
self.task = task
|
||||||
self.failFn = failFn
|
self.failFn = failFn
|
||||||
self.readyTime = time.nanoseconds + Int(DispatchTime.now().uptimeNanoseconds)
|
self.readyTime = time.nanoseconds + Int(DispatchTime.now().uptimeNanoseconds)
|
||||||
|
@ -787,7 +787,7 @@ extension ScheduledTask: CustomStringConvertible {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
extension ScheduledTask : Comparable {
|
extension ScheduledTask: Comparable {
|
||||||
public static func < (lhs: ScheduledTask, rhs: ScheduledTask) -> Bool {
|
public static func < (lhs: ScheduledTask, rhs: ScheduledTask) -> Bool {
|
||||||
return lhs.readyTime < rhs.readyTime
|
return lhs.readyTime < rhs.readyTime
|
||||||
}
|
}
|
||||||
|
|
|
@ -124,7 +124,7 @@ private struct CallbackList: ExpressibleByArrayLiteral {
|
||||||
/// ```
|
/// ```
|
||||||
/// func someAsyncOperation(args) -> EventLoopFuture<ResultType> {
|
/// func someAsyncOperation(args) -> EventLoopFuture<ResultType> {
|
||||||
/// let promise: EventLoopPromise<ResultType> = eventLoop.newPromise()
|
/// let promise: EventLoopPromise<ResultType> = eventLoop.newPromise()
|
||||||
/// someAsyncOperationWithACallback(args) { result -> () in
|
/// someAsyncOperationWithACallback(args) { result -> Void in
|
||||||
/// // when finished...
|
/// // when finished...
|
||||||
/// promise.succeed(result: result)
|
/// promise.succeed(result: result)
|
||||||
/// // if error...
|
/// // if error...
|
||||||
|
@ -466,7 +466,7 @@ extension EventLoopFuture {
|
||||||
/// - returns: A future that will receive the eventual value.
|
/// - returns: A future that will receive the eventual value.
|
||||||
public func map<U>(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (T) -> (U)) -> EventLoopFuture<U> {
|
public func map<U>(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (T) -> (U)) -> EventLoopFuture<U> {
|
||||||
if U.self == T.self && U.self == Void.self {
|
if U.self == T.self && U.self == Void.self {
|
||||||
whenSuccess(callback as! (T) -> ())
|
whenSuccess(callback as! (T) -> Void)
|
||||||
return self as! EventLoopFuture<U>
|
return self as! EventLoopFuture<U>
|
||||||
} else {
|
} else {
|
||||||
return then { return EventLoopFuture<U>(eventLoop: self.eventLoop, result: callback($0), file: file, line: line) }
|
return then { return EventLoopFuture<U>(eventLoop: self.eventLoop, result: callback($0), file: file, line: line) }
|
||||||
|
@ -539,7 +539,7 @@ extension EventLoopFuture {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fileprivate func _whenCompleteWithValue(_ callback: @escaping (EventLoopFutureValue<T>) -> ()) {
|
fileprivate func _whenCompleteWithValue(_ callback: @escaping (EventLoopFutureValue<T>) -> Void) {
|
||||||
_whenComplete {
|
_whenComplete {
|
||||||
callback(self.value!)
|
callback(self.value!)
|
||||||
return CallbackList()
|
return CallbackList()
|
||||||
|
@ -556,7 +556,7 @@ extension EventLoopFuture {
|
||||||
///
|
///
|
||||||
/// - parameters:
|
/// - parameters:
|
||||||
/// - callback: The callback that is called with the successful result of the `EventLoopFuture`.
|
/// - callback: The callback that is called with the successful result of the `EventLoopFuture`.
|
||||||
public func whenSuccess(_ callback: @escaping (T) -> ()) {
|
public func whenSuccess(_ callback: @escaping (T) -> Void) {
|
||||||
_whenComplete {
|
_whenComplete {
|
||||||
if case .success(let t) = self.value! {
|
if case .success(let t) = self.value! {
|
||||||
callback(t)
|
callback(t)
|
||||||
|
@ -575,7 +575,7 @@ extension EventLoopFuture {
|
||||||
///
|
///
|
||||||
/// - parameters:
|
/// - parameters:
|
||||||
/// - callback: The callback that is called with the failed result of the `EventLoopFuture`.
|
/// - callback: The callback that is called with the failed result of the `EventLoopFuture`.
|
||||||
public func whenFailure(_ callback: @escaping (Error) -> ()) {
|
public func whenFailure(_ callback: @escaping (Error) -> Void) {
|
||||||
_whenComplete {
|
_whenComplete {
|
||||||
if case .failure(let e) = self.value! {
|
if case .failure(let e) = self.value! {
|
||||||
callback(e)
|
callback(e)
|
||||||
|
@ -593,7 +593,7 @@ extension EventLoopFuture {
|
||||||
///
|
///
|
||||||
/// - parameters:
|
/// - parameters:
|
||||||
/// - callback: The callback that is called when the `EventLoopFuture` is fulfilled.
|
/// - callback: The callback that is called when the `EventLoopFuture` is fulfilled.
|
||||||
public func whenComplete(_ callback: @escaping () -> ()) {
|
public func whenComplete(_ callback: @escaping () -> Void) {
|
||||||
_whenComplete {
|
_whenComplete {
|
||||||
callback()
|
callback()
|
||||||
return CallbackList()
|
return CallbackList()
|
||||||
|
@ -666,7 +666,7 @@ extension EventLoopFuture {
|
||||||
/// Return a new EventLoopFuture that contains this "and" another value.
|
/// Return a new EventLoopFuture that contains this "and" another value.
|
||||||
/// This is just syntactic sugar for `future.and(loop.newSucceedFuture<U>(result: result))`.
|
/// This is just syntactic sugar for `future.and(loop.newSucceedFuture<U>(result: result))`.
|
||||||
public func and<U>(result: U, file: StaticString = #file, line: UInt = #line) -> EventLoopFuture<(T,U)> {
|
public func and<U>(result: U, file: StaticString = #file, line: UInt = #line) -> EventLoopFuture<(T,U)> {
|
||||||
return and(EventLoopFuture<U>(eventLoop: self.eventLoop, result:result, file: file, line: line))
|
return and(EventLoopFuture<U>(eventLoop: self.eventLoop, result: result, file: file, line: line))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -775,17 +775,17 @@ extension EventLoopFuture {
|
||||||
return p0.futureResult
|
return p0.futureResult
|
||||||
}
|
}
|
||||||
|
|
||||||
let fn: EventLoopFuture<Void> = futures.reduce(p0.futureResult, { (f1: EventLoopFuture<Void>, f2: EventLoopFuture<Void>) in f1.and(f2).map({ (_ : ((), ())) in }) })
|
let body: EventLoopFuture<Void> = futures.reduce(p0.futureResult, { (f1: EventLoopFuture<Void>, f2: EventLoopFuture<Void>) in f1.and(f2).map({ (_ : ((), ())) in }) })
|
||||||
p0.succeed(result: ())
|
p0.succeed(result: ())
|
||||||
return fn
|
return body
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Execute the given function and synchronously complete the given `EventLoopPromise` (if not `nil`).
|
/// Execute the given function and synchronously complete the given `EventLoopPromise` (if not `nil`).
|
||||||
func executeAndComplete<T>(_ promise: EventLoopPromise<T>?, _ fn: () throws -> T) {
|
func executeAndComplete<T>(_ promise: EventLoopPromise<T>?, _ body: () throws -> T) {
|
||||||
do {
|
do {
|
||||||
let result = try fn()
|
let result = try body()
|
||||||
promise?.succeed(result: result)
|
promise?.succeed(result: result)
|
||||||
} catch let e {
|
} catch let e {
|
||||||
promise?.fail(error: e)
|
promise?.fail(error: e)
|
||||||
|
|
|
@ -56,7 +56,7 @@ import CNIOLinux
|
||||||
var cpuset = cpu_set_t()
|
var cpuset = cpu_set_t()
|
||||||
|
|
||||||
// Ensure the cpuset is empty (and so nothing is selected yet).
|
// Ensure the cpuset is empty (and so nothing is selected yet).
|
||||||
CNIOLinux_CPU_ZERO(&cpuset);
|
CNIOLinux_CPU_ZERO(&cpuset)
|
||||||
|
|
||||||
let res = withUnsafePthread { p in
|
let res = withUnsafePthread { p in
|
||||||
CNIOLinux_pthread_getaffinity_np(p, MemoryLayout.size(ofValue: cpuset), &cpuset)
|
CNIOLinux_pthread_getaffinity_np(p, MemoryLayout.size(ofValue: cpuset), &cpuset)
|
||||||
|
@ -71,7 +71,7 @@ import CNIOLinux
|
||||||
var cpuset = cpu_set_t()
|
var cpuset = cpu_set_t()
|
||||||
|
|
||||||
// Ensure the cpuset is empty (and so nothing is selected yet).
|
// Ensure the cpuset is empty (and so nothing is selected yet).
|
||||||
CNIOLinux_CPU_ZERO(&cpuset);
|
CNIOLinux_CPU_ZERO(&cpuset)
|
||||||
|
|
||||||
// Mark the CPU we want to run on.
|
// Mark the CPU we want to run on.
|
||||||
cpuSet.cpuIds.forEach { CNIOLinux_CPU_SET(CInt($0), &cpuset) }
|
cpuSet.cpuIds.forEach { CNIOLinux_CPU_SET(CInt($0), &cpuset) }
|
||||||
|
|
|
@ -65,7 +65,7 @@ private func doPendingDatagramWriteVectorOperation(pending: PendingDatagramWrite
|
||||||
msgs: UnsafeMutableBufferPointer<MMsgHdr>,
|
msgs: UnsafeMutableBufferPointer<MMsgHdr>,
|
||||||
addresses: UnsafeMutableBufferPointer<sockaddr_storage>,
|
addresses: UnsafeMutableBufferPointer<sockaddr_storage>,
|
||||||
storageRefs: UnsafeMutableBufferPointer<Unmanaged<AnyObject>>,
|
storageRefs: UnsafeMutableBufferPointer<Unmanaged<AnyObject>>,
|
||||||
_ fn: (UnsafeMutableBufferPointer<MMsgHdr>) throws -> IOResult<Int>) throws -> IOResult<Int> {
|
_ body: (UnsafeMutableBufferPointer<MMsgHdr>) throws -> IOResult<Int>) throws -> IOResult<Int> {
|
||||||
assert(msgs.count >= Socket.writevLimitIOVectors, "Insufficiently sized buffer for a maximal sendmmsg")
|
assert(msgs.count >= Socket.writevLimitIOVectors, "Insufficiently sized buffer for a maximal sendmmsg")
|
||||||
|
|
||||||
// the numbers of storage refs that we need to decrease later.
|
// the numbers of storage refs that we need to decrease later.
|
||||||
|
@ -115,7 +115,7 @@ private func doPendingDatagramWriteVectorOperation(pending: PendingDatagramWrite
|
||||||
storageRefs[i].release()
|
storageRefs[i].release()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return try fn(UnsafeMutableBufferPointer(start: msgs.baseAddress!, count: c))
|
return try body(UnsafeMutableBufferPointer(start: msgs.baseAddress!, count: c))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This holds the states of the currently pending datagram writes. The core is a `MarkedCircularBuffer` which holds all the
|
/// This holds the states of the currently pending datagram writes. The core is a `MarkedCircularBuffer` which holds all the
|
||||||
|
|
|
@ -24,12 +24,12 @@ private struct PendingStreamWrite {
|
||||||
/// - pending: The currently pending writes.
|
/// - pending: The currently pending writes.
|
||||||
/// - iovecs: Pre-allocated storage (per `EventLoop`) for `iovecs`.
|
/// - iovecs: Pre-allocated storage (per `EventLoop`) for `iovecs`.
|
||||||
/// - storageRefs: Pre-allocated storage references (per `EventLoop`) to manage the lifetime of the buffers to be passed to `writev`.
|
/// - storageRefs: Pre-allocated storage references (per `EventLoop`) to manage the lifetime of the buffers to be passed to `writev`.
|
||||||
/// - fn: The function that actually does the vector write (usually `writev`).
|
/// - body: The function that actually does the vector write (usually `writev`).
|
||||||
/// - returns: A tuple of the number of items attempted to write and the result of the write operation.
|
/// - returns: A tuple of the number of items attempted to write and the result of the write operation.
|
||||||
private func doPendingWriteVectorOperation(pending: PendingStreamWritesState,
|
private func doPendingWriteVectorOperation(pending: PendingStreamWritesState,
|
||||||
iovecs: UnsafeMutableBufferPointer<IOVector>,
|
iovecs: UnsafeMutableBufferPointer<IOVector>,
|
||||||
storageRefs: UnsafeMutableBufferPointer<Unmanaged<AnyObject>>,
|
storageRefs: UnsafeMutableBufferPointer<Unmanaged<AnyObject>>,
|
||||||
_ fn: (UnsafeBufferPointer<IOVector>) throws -> IOResult<Int>) throws -> (itemCount: Int, writeResult: IOResult<Int>) {
|
_ body: (UnsafeBufferPointer<IOVector>) throws -> IOResult<Int>) throws -> (itemCount: Int, writeResult: IOResult<Int>) {
|
||||||
assert(iovecs.count >= Socket.writevLimitIOVectors, "Insufficiently sized buffer for a maximal writev")
|
assert(iovecs.count >= Socket.writevLimitIOVectors, "Insufficiently sized buffer for a maximal writev")
|
||||||
|
|
||||||
// Clamp the number of writes we're willing to issue to the limit for writev.
|
// Clamp the number of writes we're willing to issue to the limit for writev.
|
||||||
|
@ -66,7 +66,7 @@ private func doPendingWriteVectorOperation(pending: PendingStreamWritesState,
|
||||||
storageRefs[i].release()
|
storageRefs[i].release()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let result = try fn(UnsafeBufferPointer(start: iovecs.baseAddress!, count: numberOfUsedStorageSlots))
|
let result = try body(UnsafeBufferPointer(start: iovecs.baseAddress!, count: numberOfUsedStorageSlots))
|
||||||
/* if we hit a limit, we really wanted to write more than we have so the caller should retry us */
|
/* if we hit a limit, we really wanted to write more than we have so the caller should retry us */
|
||||||
return (numberOfUsedStorageSlots, result)
|
return (numberOfUsedStorageSlots, result)
|
||||||
}
|
}
|
||||||
|
@ -224,7 +224,7 @@ private struct PendingStreamWritesState {
|
||||||
/// - warning: See the warning for `didWrite`.
|
/// - warning: See the warning for `didWrite`.
|
||||||
///
|
///
|
||||||
/// - returns: A closure that the caller _needs_ to run which will fulfill the promises.
|
/// - returns: A closure that the caller _needs_ to run which will fulfill the promises.
|
||||||
public mutating func failAll(error: Error) -> () -> Void {
|
public mutating func failAll(error: Error) -> (() -> Void) {
|
||||||
var promises: [EventLoopPromise<()>] = []
|
var promises: [EventLoopPromise<()>] = []
|
||||||
promises.reserveCapacity(self.pendingWrites.count)
|
promises.reserveCapacity(self.pendingWrites.count)
|
||||||
while !self.pendingWrites.isEmpty {
|
while !self.pendingWrites.isEmpty {
|
||||||
|
|
|
@ -28,7 +28,7 @@ public protocol RecvByteBufferAllocator {
|
||||||
|
|
||||||
|
|
||||||
/// `RecvByteBufferAllocator` which will always return a `ByteBuffer` with the same fixed size no matter what was recorded.
|
/// `RecvByteBufferAllocator` which will always return a `ByteBuffer` with the same fixed size no matter what was recorded.
|
||||||
public struct FixedSizeRecvByteBufferAllocator : RecvByteBufferAllocator {
|
public struct FixedSizeRecvByteBufferAllocator: RecvByteBufferAllocator {
|
||||||
public let capacity: Int
|
public let capacity: Int
|
||||||
|
|
||||||
public init(capacity: Int) {
|
public init(capacity: Int) {
|
||||||
|
@ -47,7 +47,7 @@ public struct FixedSizeRecvByteBufferAllocator : RecvByteBufferAllocator {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// `RecvByteBufferAllocator` which will gracefully increment or decrement the buffer size on the feedback that was recorded.
|
/// `RecvByteBufferAllocator` which will gracefully increment or decrement the buffer size on the feedback that was recorded.
|
||||||
public struct AdaptiveRecvByteBufferAllocator : RecvByteBufferAllocator {
|
public struct AdaptiveRecvByteBufferAllocator: RecvByteBufferAllocator {
|
||||||
|
|
||||||
private static let indexIncrement = 4
|
private static let indexIncrement = 4
|
||||||
private static let indexDecrement = 1
|
private static let indexDecrement = 1
|
||||||
|
@ -76,8 +76,8 @@ public struct AdaptiveRecvByteBufferAllocator : RecvByteBufferAllocator {
|
||||||
public let maximum: Int
|
public let maximum: Int
|
||||||
public let initial: Int
|
public let initial: Int
|
||||||
|
|
||||||
private var index:Int
|
private var index: Int
|
||||||
private var nextReceiveBufferSize:Int
|
private var nextReceiveBufferSize: Int
|
||||||
private var decreaseNow: Bool
|
private var decreaseNow: Bool
|
||||||
|
|
||||||
public init() {
|
public init() {
|
||||||
|
@ -96,7 +96,7 @@ public struct AdaptiveRecvByteBufferAllocator : RecvByteBufferAllocator {
|
||||||
self.minIndex = minIndex
|
self.minIndex = minIndex
|
||||||
}
|
}
|
||||||
|
|
||||||
let maxIndex = AdaptiveRecvByteBufferAllocator.sizeTableIndex(maximum);
|
let maxIndex = AdaptiveRecvByteBufferAllocator.sizeTableIndex(maximum)
|
||||||
if AdaptiveRecvByteBufferAllocator.sizeTable[maxIndex] > maximum {
|
if AdaptiveRecvByteBufferAllocator.sizeTable[maxIndex] > maximum {
|
||||||
self.maxIndex = maxIndex - 1
|
self.maxIndex = maxIndex - 1
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -21,9 +21,9 @@ protocol Selectable {
|
||||||
/// throw an `IOError`.
|
/// throw an `IOError`.
|
||||||
///
|
///
|
||||||
/// - parameters:
|
/// - parameters:
|
||||||
/// - fn: The closure to execute if the `Selectable` is still open.
|
/// - body: The closure to execute if the `Selectable` is still open.
|
||||||
/// - throws: If either the `Selectable` was closed before or the closure throws by itself.
|
/// - throws: If either the `Selectable` was closed before or the closure throws by itself.
|
||||||
func withUnsafeFileDescriptor<T>(_ fn: (Int32) throws -> T) throws -> T
|
func withUnsafeFileDescriptor<T>(_ body: (Int32) throws -> T) throws -> T
|
||||||
|
|
||||||
/// `true` if this `Selectable` is open (which means it was not closed yet).
|
/// `true` if this `Selectable` is open (which means it was not closed yet).
|
||||||
var open: Bool { get }
|
var open: Bool { get }
|
||||||
|
|
|
@ -28,13 +28,13 @@ private extension timespec {
|
||||||
}
|
}
|
||||||
|
|
||||||
private extension Optional {
|
private extension Optional {
|
||||||
func withUnsafeOptionalPointer<T>(_ fn: (UnsafePointer<Wrapped>?) throws -> T) rethrows -> T {
|
func withUnsafeOptionalPointer<T>(_ body: (UnsafePointer<Wrapped>?) throws -> T) rethrows -> T {
|
||||||
if var this = self {
|
if var this = self {
|
||||||
return try withUnsafePointer(to: &this) { x in
|
return try withUnsafePointer(to: &this) { x in
|
||||||
try fn(x)
|
try body(x)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return try fn(nil)
|
return try body(nil)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -340,8 +340,8 @@ final class Selector<R: Registration> {
|
||||||
///
|
///
|
||||||
/// - parameters:
|
/// - parameters:
|
||||||
/// - strategy: The `SelectorStrategy` to apply
|
/// - strategy: The `SelectorStrategy` to apply
|
||||||
/// - fn: The function to execute for each `SelectorEvent` that was produced.
|
/// - body: The function to execute for each `SelectorEvent` that was produced.
|
||||||
func whenReady(strategy: SelectorStrategy, _ fn: (SelectorEvent<R>) throws -> Void) throws -> Void {
|
func whenReady(strategy: SelectorStrategy, _ body: (SelectorEvent<R>) throws -> Void) throws -> Void {
|
||||||
guard self.lifecycleState == .open else {
|
guard self.lifecycleState == .open else {
|
||||||
throw IOError(errnoCode: EBADF, reason: "can't call whenReady for selector as it's \(self.lifecycleState).")
|
throw IOError(errnoCode: EBADF, reason: "can't call whenReady for selector as it's \(self.lifecycleState).")
|
||||||
}
|
}
|
||||||
|
@ -375,7 +375,7 @@ final class Selector<R: Registration> {
|
||||||
_ = Glibc.read(timerfd, &val, MemoryLayout<UInt>.size)
|
_ = Glibc.read(timerfd, &val, MemoryLayout<UInt>.size)
|
||||||
default:
|
default:
|
||||||
let registration = registrations[Int(ev.data.fd)]!
|
let registration = registrations[Int(ev.data.fd)]!
|
||||||
try fn(
|
try body(
|
||||||
SelectorEvent(
|
SelectorEvent(
|
||||||
readable: (ev.events & Epoll.EPOLLIN.rawValue) != 0 || (ev.events & Epoll.EPOLLERR.rawValue) != 0 || (ev.events & Epoll.EPOLLRDHUP.rawValue) != 0,
|
readable: (ev.events & Epoll.EPOLLIN.rawValue) != 0 || (ev.events & Epoll.EPOLLERR.rawValue) != 0 || (ev.events & Epoll.EPOLLRDHUP.rawValue) != 0,
|
||||||
writable: (ev.events & Epoll.EPOLLOUT.rawValue) != 0 || (ev.events & Epoll.EPOLLERR.rawValue) != 0 || (ev.events & Epoll.EPOLLRDHUP.rawValue) != 0,
|
writable: (ev.events & Epoll.EPOLLOUT.rawValue) != 0 || (ev.events & Epoll.EPOLLERR.rawValue) != 0 || (ev.events & Epoll.EPOLLRDHUP.rawValue) != 0,
|
||||||
|
@ -398,11 +398,11 @@ final class Selector<R: Registration> {
|
||||||
break
|
break
|
||||||
case EVFILT_READ:
|
case EVFILT_READ:
|
||||||
if let registration = registrations[Int(ev.ident)] {
|
if let registration = registrations[Int(ev.ident)] {
|
||||||
try fn((SelectorEvent(readable: true, writable: false, registration: registration)))
|
try body((SelectorEvent(readable: true, writable: false, registration: registration)))
|
||||||
}
|
}
|
||||||
case EVFILT_WRITE:
|
case EVFILT_WRITE:
|
||||||
if let registration = registrations[Int(ev.ident)] {
|
if let registration = registrations[Int(ev.ident)] {
|
||||||
try fn((SelectorEvent(readable: false, writable: true, registration: registration)))
|
try body((SelectorEvent(readable: false, writable: true, registration: registration)))
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
// We only use EVFILT_USER, EVFILT_READ and EVFILT_WRITE.
|
// We only use EVFILT_USER, EVFILT_READ and EVFILT_WRITE.
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
public typealias IOVector = iovec
|
public typealias IOVector = iovec
|
||||||
|
|
||||||
// TODO: scattering support
|
// TODO: scattering support
|
||||||
final class Socket : BaseSocket {
|
final class Socket: BaseSocket {
|
||||||
|
|
||||||
/// The maximum number of bytes to write per `writev` call.
|
/// The maximum number of bytes to write per `writev` call.
|
||||||
static var writevLimitBytes: Int {
|
static var writevLimitBytes: Int {
|
||||||
|
@ -44,7 +44,7 @@ final class Socket : BaseSocket {
|
||||||
///
|
///
|
||||||
/// - parameters:
|
/// - parameters:
|
||||||
/// - descriptor: The file descriptor to wrap.
|
/// - descriptor: The file descriptor to wrap.
|
||||||
override init(descriptor : Int32) {
|
override init(descriptor: Int32) {
|
||||||
super.init(descriptor: descriptor)
|
super.init(descriptor: descriptor)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -125,17 +125,17 @@ public enum SocketAddress: CustomStringConvertible {
|
||||||
|
|
||||||
/// Calls the given function with a pointer to a `sockaddr` structure and the associated size
|
/// Calls the given function with a pointer to a `sockaddr` structure and the associated size
|
||||||
/// of that structure.
|
/// of that structure.
|
||||||
public func withSockAddr<T>(_ fn: (UnsafePointer<sockaddr>, Int) throws -> T) rethrows -> T {
|
public func withSockAddr<T>(_ body: (UnsafePointer<sockaddr>, Int) throws -> T) rethrows -> T {
|
||||||
switch self {
|
switch self {
|
||||||
case .v4(let addr):
|
case .v4(let addr):
|
||||||
var address = addr.address
|
var address = addr.address
|
||||||
return try address.withSockAddr(fn)
|
return try address.withSockAddr(body)
|
||||||
case .v6(let addr):
|
case .v6(let addr):
|
||||||
var address = addr.address
|
var address = addr.address
|
||||||
return try address.withSockAddr(fn)
|
return try address.withSockAddr(body)
|
||||||
case .unixDomainSocket(let addr):
|
case .unixDomainSocket(let addr):
|
||||||
var address = addr.address
|
var address = addr.address
|
||||||
return try address.withSockAddr(fn)
|
return try address.withSockAddr(body)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -40,7 +40,7 @@ private extension ByteBuffer {
|
||||||
/// For this reason, `BaseSocketChannel` exists to provide a common core implementation of
|
/// For this reason, `BaseSocketChannel` exists to provide a common core implementation of
|
||||||
/// the `SelectableChannel` protocol. It uses a number of private functions to provide hooks
|
/// the `SelectableChannel` protocol. It uses a number of private functions to provide hooks
|
||||||
/// for subclasses to implement the specific logic to handle their writes and reads.
|
/// for subclasses to implement the specific logic to handle their writes and reads.
|
||||||
class BaseSocketChannel<T : BaseSocket> : SelectableChannel, ChannelCore {
|
class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
|
||||||
typealias SelectableType = T
|
typealias SelectableType = T
|
||||||
|
|
||||||
// MARK: Stored Properties
|
// MARK: Stored Properties
|
||||||
|
@ -305,7 +305,7 @@ class BaseSocketChannel<T : BaseSocket> : SelectableChannel, ChannelCore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public func getOption<T>(option: T) -> EventLoopFuture<T.OptionType> where T : ChannelOption {
|
public func getOption<T>(option: T) -> EventLoopFuture<T.OptionType> where T: ChannelOption {
|
||||||
if eventLoop.inEventLoop {
|
if eventLoop.inEventLoop {
|
||||||
do {
|
do {
|
||||||
return eventLoop.newSucceededFuture(result: try getOption0(option: option))
|
return eventLoop.newSucceededFuture(result: try getOption0(option: option))
|
||||||
|
@ -977,7 +977,7 @@ final class SocketChannel: BaseSocketChannel<Socket> {
|
||||||
/// A `Channel` for a server socket.
|
/// A `Channel` for a server socket.
|
||||||
///
|
///
|
||||||
/// - note: All operations on `ServerSocketChannel` are thread-safe.
|
/// - note: All operations on `ServerSocketChannel` are thread-safe.
|
||||||
final class ServerSocketChannel : BaseSocketChannel<ServerSocket> {
|
final class ServerSocketChannel: BaseSocketChannel<ServerSocket> {
|
||||||
|
|
||||||
private var backlog: Int32 = 128
|
private var backlog: Int32 = 128
|
||||||
private let group: EventLoopGroup
|
private let group: EventLoopGroup
|
||||||
|
@ -995,7 +995,7 @@ final class ServerSocketChannel : BaseSocketChannel<ServerSocket> {
|
||||||
throw err
|
throw err
|
||||||
}
|
}
|
||||||
self.group = group
|
self.group = group
|
||||||
try super.init(socket: serverSocket, eventLoop: eventLoop, recvAllocator: AdaptiveRecvByteBufferAllocator())
|
try super.init(socket: serverSocket, eventLoop: eventLoop, recvAllocator: AdaptiveRecvByteBufferAllocator())
|
||||||
}
|
}
|
||||||
|
|
||||||
override func registrationFor(interested: IOEvent) -> NIORegistration {
|
override func registrationFor(interested: IOEvent) -> NIORegistration {
|
||||||
|
|
|
@ -79,9 +79,9 @@ private func isBlacklistedErrno(_ code: Int32) -> Bool {
|
||||||
|
|
||||||
/* Sorry, we really try hard to not use underscored attributes. In this case however we seem to break the inlining threshold which makes a system call take twice the time, ie. we need this exception. */
|
/* Sorry, we really try hard to not use underscored attributes. In this case however we seem to break the inlining threshold which makes a system call take twice the time, ie. we need this exception. */
|
||||||
@inline(__always)
|
@inline(__always)
|
||||||
internal func wrapSyscallMayBlock<T: FixedWidthInteger>(where function: StaticString = #function, _ fn: () throws -> T) throws -> IOResult<T> {
|
internal func wrapSyscallMayBlock<T: FixedWidthInteger>(where function: StaticString = #function, _ body: () throws -> T) throws -> IOResult<T> {
|
||||||
while true {
|
while true {
|
||||||
let res = try fn()
|
let res = try body()
|
||||||
if res == -1 {
|
if res == -1 {
|
||||||
let err = errno
|
let err = errno
|
||||||
switch err {
|
switch err {
|
||||||
|
@ -101,9 +101,9 @@ internal func wrapSyscallMayBlock<T: FixedWidthInteger>(where function: StaticSt
|
||||||
|
|
||||||
/* Sorry, we really try hard to not use underscored attributes. In this case however we seem to break the inlining threshold which makes a system call take twice the time, ie. we need this exception. */
|
/* Sorry, we really try hard to not use underscored attributes. In this case however we seem to break the inlining threshold which makes a system call take twice the time, ie. we need this exception. */
|
||||||
@inline(__always)
|
@inline(__always)
|
||||||
internal func wrapSyscall<T: FixedWidthInteger>(where function: StaticString = #function, _ fn: () throws -> T) throws -> T {
|
internal func wrapSyscall<T: FixedWidthInteger>(where function: StaticString = #function, _ body: () throws -> T) throws -> T {
|
||||||
while true {
|
while true {
|
||||||
let res = try fn()
|
let res = try body()
|
||||||
if res == -1 {
|
if res == -1 {
|
||||||
let err = errno
|
let err = errno
|
||||||
if err == EINTR {
|
if err == EINTR {
|
||||||
|
@ -118,9 +118,9 @@ internal func wrapSyscall<T: FixedWidthInteger>(where function: StaticString = #
|
||||||
|
|
||||||
/* Sorry, we really try hard to not use underscored attributes. In this case however we seem to break the inlining threshold which makes a system call take twice the time, ie. we need this exception. */
|
/* Sorry, we really try hard to not use underscored attributes. In this case however we seem to break the inlining threshold which makes a system call take twice the time, ie. we need this exception. */
|
||||||
@inline(__always)
|
@inline(__always)
|
||||||
internal func wrapErrorIsNullReturnCall(where function: StaticString = #function, _ fn: () throws -> UnsafePointer<CChar>?) throws -> UnsafePointer<CChar>? {
|
internal func wrapErrorIsNullReturnCall(where function: StaticString = #function, _ body: () throws -> UnsafePointer<CChar>?) throws -> UnsafePointer<CChar>? {
|
||||||
while true {
|
while true {
|
||||||
let res = try fn()
|
let res = try body()
|
||||||
if res == nil {
|
if res == nil {
|
||||||
let err = errno
|
let err = errno
|
||||||
if err == EINTR {
|
if err == EINTR {
|
||||||
|
@ -157,7 +157,7 @@ internal enum Posix {
|
||||||
static let UIO_MAXIOV: Int = 1024
|
static let UIO_MAXIOV: Int = 1024
|
||||||
#elseif os(Linux) || os(FreeBSD) || os(Android)
|
#elseif os(Linux) || os(FreeBSD) || os(Android)
|
||||||
static let SOCK_STREAM: CInt = CInt(Glibc.SOCK_STREAM.rawValue)
|
static let SOCK_STREAM: CInt = CInt(Glibc.SOCK_STREAM.rawValue)
|
||||||
static let SOCK_DGRAM : CInt = CInt(Glibc.SOCK_DGRAM.rawValue)
|
static let SOCK_DGRAM: CInt = CInt(Glibc.SOCK_DGRAM.rawValue)
|
||||||
static let UIO_MAXIOV: Int = Int(Glibc.UIO_MAXIOV)
|
static let UIO_MAXIOV: Int = Int(Glibc.UIO_MAXIOV)
|
||||||
#else
|
#else
|
||||||
static var SOCK_STREAM: CInt {
|
static var SOCK_STREAM: CInt {
|
||||||
|
@ -250,7 +250,7 @@ internal enum Posix {
|
||||||
let fd = sysAccept(descriptor, addr, len)
|
let fd = sysAccept(descriptor, addr, len)
|
||||||
|
|
||||||
#if !os(Linux)
|
#if !os(Linux)
|
||||||
if (fd != -1) {
|
if fd != -1 {
|
||||||
// TODO: Handle return code ?
|
// TODO: Handle return code ?
|
||||||
_ = try? Posix.fcntl(descriptor: fd, command: F_SETNOSIGPIPE, value: 1)
|
_ = try? Posix.fcntl(descriptor: fd, command: F_SETNOSIGPIPE, value: 1)
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,15 +48,15 @@ final class Thread {
|
||||||
self.pthread = pthread
|
self.pthread = pthread
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Execute the given fn with the `pthread_t` that is used by this `Thread` as argument.
|
/// Execute the given body with the `pthread_t` that is used by this `Thread` as argument.
|
||||||
///
|
///
|
||||||
/// - warning: Do not escape `pthread_t` from the closure for later use.
|
/// - warning: Do not escape `pthread_t` from the closure for later use.
|
||||||
///
|
///
|
||||||
/// - parameters:
|
/// - parameters:
|
||||||
/// - fn: The closure that will accept the `pthread_t`.
|
/// - body: The closure that will accept the `pthread_t`.
|
||||||
/// - returns: The value returned by `fn`.
|
/// - returns: The value returned by `fn`.
|
||||||
func withUnsafePthread<T>(_ fn: (pthread_t) throws -> T) rethrows -> T {
|
func withUnsafePthread<T>(_ body: (pthread_t) throws -> T) rethrows -> T {
|
||||||
return try fn(self.pthread)
|
return try body(self.pthread)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get current name of the `Thread` or `nil` if not set.
|
/// Get current name of the `Thread` or `nil` if not set.
|
||||||
|
@ -91,7 +91,7 @@ final class Thread {
|
||||||
// Cast to UnsafeMutableRawPointer? and force unwrap to make the same code work on macOS and Linux.
|
// Cast to UnsafeMutableRawPointer? and force unwrap to make the same code work on macOS and Linux.
|
||||||
let b = Unmanaged<ThreadBox>.fromOpaque((p as UnsafeMutableRawPointer?)!.assumingMemoryBound(to: ThreadBox.self)).takeRetainedValue()
|
let b = Unmanaged<ThreadBox>.fromOpaque((p as UnsafeMutableRawPointer?)!.assumingMemoryBound(to: ThreadBox.self)).takeRetainedValue()
|
||||||
|
|
||||||
let fn = b.value.body
|
let body = b.value.body
|
||||||
let name = b.value.name
|
let name = b.value.name
|
||||||
|
|
||||||
let pt = pthread_self()
|
let pt = pthread_self()
|
||||||
|
@ -102,7 +102,7 @@ final class Thread {
|
||||||
precondition(res == 0, "pthread_setname_np failed for '\(threadName)': \(res)")
|
precondition(res == 0, "pthread_setname_np failed for '\(threadName)': \(res)")
|
||||||
}
|
}
|
||||||
|
|
||||||
fn(Thread(pthread: pt))
|
body(Thread(pthread: pt))
|
||||||
return nil
|
return nil
|
||||||
}, Unmanaged.passRetained(box).toOpaque())
|
}, Unmanaged.passRetained(box).toOpaque())
|
||||||
|
|
||||||
|
@ -176,7 +176,7 @@ internal struct ThreadSpecificVariable<T: AnyObject> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
extension Thread : Equatable {
|
extension Thread: Equatable {
|
||||||
public static func ==(lhs: Thread, rhs: Thread) -> Bool {
|
public static func ==(lhs: Thread, rhs: Thread) -> Bool {
|
||||||
return pthread_equal(lhs.pthread, rhs.pthread) != 0
|
return pthread_equal(lhs.pthread, rhs.pthread) != 0
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,7 @@ import Dispatch
|
||||||
let newLine = "\n".utf8.first!
|
let newLine = "\n".utf8.first!
|
||||||
|
|
||||||
/// Very simple example codec which will buffer inbound data until a `\n` was found.
|
/// Very simple example codec which will buffer inbound data until a `\n` was found.
|
||||||
final class LineDelimiterCodec : ByteToMessageDecoder {
|
final class LineDelimiterCodec: ByteToMessageDecoder {
|
||||||
public typealias InboundIn = ByteBuffer
|
public typealias InboundIn = ByteBuffer
|
||||||
public typealias InboundOut = ByteBuffer
|
public typealias InboundOut = ByteBuffer
|
||||||
|
|
||||||
|
@ -49,7 +49,7 @@ final class ChatHandler: ChannelInboundHandler {
|
||||||
|
|
||||||
// All access to channels is guarded by channelsSyncQueue.
|
// All access to channels is guarded by channelsSyncQueue.
|
||||||
private let channelsSyncQueue = DispatchQueue(label: "channelsQueue")
|
private let channelsSyncQueue = DispatchQueue(label: "channelsQueue")
|
||||||
private var channels = Dictionary<ObjectIdentifier, Channel>()
|
private var channels: [ObjectIdentifier: Channel] = [:]
|
||||||
|
|
||||||
public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
|
public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
|
||||||
let id = ObjectIdentifier(ctx.channel)
|
let id = ObjectIdentifier(ctx.channel)
|
||||||
|
@ -78,7 +78,7 @@ final class ChatHandler: ChannelInboundHandler {
|
||||||
let channel = ctx.channel
|
let channel = ctx.channel
|
||||||
self.channelsSyncQueue.async {
|
self.channelsSyncQueue.async {
|
||||||
// broadcast the message to all the connected clients except the one that just became active.
|
// broadcast the message to all the connected clients except the one that just became active.
|
||||||
self.writeToAll(channels: self.channels, allocator: channel.allocator, message: "(ChatServer) - New client connected with address: \(remoteAddress)\n");
|
self.writeToAll(channels: self.channels, allocator: channel.allocator, message: "(ChatServer) - New client connected with address: \(remoteAddress)\n")
|
||||||
|
|
||||||
self.channels[ObjectIdentifier(channel)] = channel
|
self.channels[ObjectIdentifier(channel)] = channel
|
||||||
}
|
}
|
||||||
|
@ -93,18 +93,18 @@ final class ChatHandler: ChannelInboundHandler {
|
||||||
self.channelsSyncQueue.async {
|
self.channelsSyncQueue.async {
|
||||||
if self.channels.removeValue(forKey: ObjectIdentifier(channel)) != nil {
|
if self.channels.removeValue(forKey: ObjectIdentifier(channel)) != nil {
|
||||||
// broadcast the message to all the connected clients except the one that just was disconnected.
|
// broadcast the message to all the connected clients except the one that just was disconnected.
|
||||||
self.writeToAll(channels: self.channels, allocator: channel.allocator, message: "(ChatServer) - Client disconnected\n");
|
self.writeToAll(channels: self.channels, allocator: channel.allocator, message: "(ChatServer) - Client disconnected\n")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private func writeToAll(channels: Dictionary<ObjectIdentifier, Channel>, allocator: ByteBufferAllocator, message: String) {
|
private func writeToAll(channels: [ObjectIdentifier: Channel], allocator: ByteBufferAllocator, message: String) {
|
||||||
var buffer = allocator.buffer(capacity: message.utf8.count)
|
var buffer = allocator.buffer(capacity: message.utf8.count)
|
||||||
buffer.write(string: message)
|
buffer.write(string: message)
|
||||||
self.writeToAll(channels: channels, buffer: buffer)
|
self.writeToAll(channels: channels, buffer: buffer)
|
||||||
}
|
}
|
||||||
|
|
||||||
private func writeToAll(channels: Dictionary<ObjectIdentifier, Channel>, buffer: ByteBuffer) {
|
private func writeToAll(channels: [ObjectIdentifier: Channel], buffer: ByteBuffer) {
|
||||||
channels.forEach { $0.value.writeAndFlush(buffer, promise: nil) }
|
channels.forEach { $0.value.writeAndFlush(buffer, promise: nil) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,21 +69,21 @@ extension Lock {
|
||||||
///
|
///
|
||||||
/// This convenience method should be preferred to `lock` and `unlock` in
|
/// This convenience method should be preferred to `lock` and `unlock` in
|
||||||
/// most situations, as it ensures that the lock will be released regardless
|
/// most situations, as it ensures that the lock will be released regardless
|
||||||
/// of how `fn` exits.
|
/// of how `body` exits.
|
||||||
///
|
///
|
||||||
/// - Parameter fn: The block to execute while holding the lock.
|
/// - Parameter body: The block to execute while holding the lock.
|
||||||
/// - Returns: The value returned by the block.
|
/// - Returns: The value returned by the block.
|
||||||
public func withLock<T>(_ fn: () throws -> T) rethrows -> T {
|
public func withLock<T>(_ body: () throws -> T) rethrows -> T {
|
||||||
self.lock()
|
self.lock()
|
||||||
defer {
|
defer {
|
||||||
self.unlock()
|
self.unlock()
|
||||||
}
|
}
|
||||||
return try fn()
|
return try body()
|
||||||
}
|
}
|
||||||
|
|
||||||
// specialise Void return (for performance)
|
// specialise Void return (for performance)
|
||||||
public func withLockVoid(_ fn: () throws -> Void) rethrows -> Void {
|
public func withLockVoid(_ body: () throws -> Void) rethrows -> Void {
|
||||||
try self.withLock(fn)
|
try self.withLock(body)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,9 +30,9 @@ import struct Foundation.Data
|
||||||
*/
|
*/
|
||||||
|
|
||||||
extension Data: ContiguousCollection {
|
extension Data: ContiguousCollection {
|
||||||
public func withUnsafeBytes<R>(_ fn: (UnsafeRawBufferPointer) throws -> R) rethrows -> R {
|
public func withUnsafeBytes<R>(_ body: (UnsafeRawBufferPointer) throws -> R) rethrows -> R {
|
||||||
return try self.withUnsafeBytes { (ptr: UnsafePointer<UInt8>) -> R in
|
return try self.withUnsafeBytes { (ptr: UnsafePointer<UInt8>) -> R in
|
||||||
return try fn(UnsafeRawBufferPointer(start: ptr, count: self.count))
|
return try body(UnsafeRawBufferPointer(start: ptr, count: self.count))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,7 +74,7 @@ private func writeTrailers(wrapOutboundOut: (IOData) -> NIOAny, ctx: ChannelHand
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private func writeHead(wrapOutboundOut: (IOData) -> NIOAny, writeStartLine: (inout ByteBuffer) -> (), ctx: ChannelHandlerContext, headers: HTTPHeaders, promise: EventLoopPromise<Void>?) {
|
private func writeHead(wrapOutboundOut: (IOData) -> NIOAny, writeStartLine: (inout ByteBuffer) -> Void, ctx: ChannelHandlerContext, headers: HTTPHeaders, promise: EventLoopPromise<Void>?) {
|
||||||
|
|
||||||
var buffer = ctx.channel.allocator.buffer(capacity: 256)
|
var buffer = ctx.channel.allocator.buffer(capacity: 256)
|
||||||
writeStartLine(&buffer)
|
writeStartLine(&buffer)
|
||||||
|
@ -112,7 +112,7 @@ private func sanitizeTransportHeaders(hasBody: HTTPMethod.HasBody, headers: inou
|
||||||
///
|
///
|
||||||
/// This channel handler is used to translate messages from a series of
|
/// This channel handler is used to translate messages from a series of
|
||||||
/// `HTTPClientRequestPart` into the HTTP/1.1 wire format.
|
/// `HTTPClientRequestPart` into the HTTP/1.1 wire format.
|
||||||
public final class HTTPRequestEncoder : ChannelOutboundHandler {
|
public final class HTTPRequestEncoder: ChannelOutboundHandler {
|
||||||
public typealias OutboundIn = HTTPClientRequestPart
|
public typealias OutboundIn = HTTPClientRequestPart
|
||||||
public typealias OutboundOut = IOData
|
public typealias OutboundOut = IOData
|
||||||
|
|
||||||
|
@ -147,7 +147,7 @@ public final class HTTPRequestEncoder : ChannelOutboundHandler {
|
||||||
///
|
///
|
||||||
/// This channel handler is used to translate messages from a series of
|
/// This channel handler is used to translate messages from a series of
|
||||||
/// `HTTPServerResponsePart` into the HTTP/1.1 wire format.
|
/// `HTTPServerResponsePart` into the HTTP/1.1 wire format.
|
||||||
public final class HTTPResponseEncoder : ChannelOutboundHandler {
|
public final class HTTPResponseEncoder: ChannelOutboundHandler {
|
||||||
public typealias OutboundIn = HTTPServerResponsePart
|
public typealias OutboundIn = HTTPServerResponsePart
|
||||||
public typealias OutboundOut = IOData
|
public typealias OutboundOut = IOData
|
||||||
|
|
||||||
|
|
|
@ -75,7 +75,7 @@ public enum HTTPPart<HeadT: Equatable, BodyT: Equatable> {
|
||||||
case end(HTTPHeaders?)
|
case end(HTTPHeaders?)
|
||||||
}
|
}
|
||||||
|
|
||||||
extension HTTPPart : Equatable {
|
extension HTTPPart: Equatable {
|
||||||
public static func ==(lhs: HTTPPart, rhs: HTTPPart) -> Bool {
|
public static func ==(lhs: HTTPPart, rhs: HTTPPart) -> Bool {
|
||||||
switch (lhs, rhs) {
|
switch (lhs, rhs) {
|
||||||
case (.head(let h1), .head(let h2)):
|
case (.head(let h1), .head(let h2)):
|
||||||
|
@ -119,7 +119,7 @@ extension HTTPRequestHead {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A representation of the status line and header fields of a HTTP response.
|
/// A representation of the status line and header fields of a HTTP response.
|
||||||
public struct HTTPResponseHead : Equatable {
|
public struct HTTPResponseHead: Equatable {
|
||||||
/// The HTTP response status.
|
/// The HTTP response status.
|
||||||
public let status: HTTPResponseStatus
|
public let status: HTTPResponseStatus
|
||||||
|
|
||||||
|
@ -145,14 +145,14 @@ public struct HTTPResponseHead : Equatable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fileprivate typealias HTTPHeadersStorage = [String:[(String, String)]] // [lowerCasedName: [(originalCaseName, value)]
|
fileprivate typealias HTTPHeadersStorage = [String: [(String, String)]] // [lowerCasedName: [(originalCaseName, value)]
|
||||||
|
|
||||||
|
|
||||||
/// An iterator of HTTP header fields.
|
/// An iterator of HTTP header fields.
|
||||||
///
|
///
|
||||||
/// This iterator will return each value for a given header name separately. That
|
/// This iterator will return each value for a given header name separately. That
|
||||||
/// means that `name` is not guaranteed to be unique in a given block of headers.
|
/// means that `name` is not guaranteed to be unique in a given block of headers.
|
||||||
struct HTTPHeadersIterator : IteratorProtocol {
|
struct HTTPHeadersIterator: IteratorProtocol {
|
||||||
fileprivate var storageIterator: HTTPHeadersStorage.Iterator
|
fileprivate var storageIterator: HTTPHeadersStorage.Iterator
|
||||||
fileprivate var valuesIterator: Array<(String, String)>.Iterator?
|
fileprivate var valuesIterator: Array<(String, String)>.Iterator?
|
||||||
|
|
||||||
|
@ -190,7 +190,7 @@ struct HTTPHeadersIterator : IteratorProtocol {
|
||||||
/// field when needed. It also supports recomposing headers to a maximally joined
|
/// field when needed. It also supports recomposing headers to a maximally joined
|
||||||
/// or split representation, such that header fields that are able to be repeated
|
/// or split representation, such that header fields that are able to be repeated
|
||||||
/// can be represented appropriately.
|
/// can be represented appropriately.
|
||||||
public struct HTTPHeaders : Sequence, CustomStringConvertible {
|
public struct HTTPHeaders: Sequence, CustomStringConvertible {
|
||||||
|
|
||||||
// [lowerCasedName: [(originalCaseName, value)]
|
// [lowerCasedName: [(originalCaseName, value)]
|
||||||
private var storage: HTTPHeadersStorage = HTTPHeadersStorage()
|
private var storage: HTTPHeadersStorage = HTTPHeadersStorage()
|
||||||
|
@ -366,7 +366,7 @@ private extension Substring {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
extension HTTPHeaders : Equatable {
|
extension HTTPHeaders: Equatable {
|
||||||
public static func ==(lhs: HTTPHeaders, rhs: HTTPHeaders) -> Bool {
|
public static func ==(lhs: HTTPHeaders, rhs: HTTPHeaders) -> Bool {
|
||||||
if lhs.storage.count != rhs.storage.count {
|
if lhs.storage.count != rhs.storage.count {
|
||||||
return false
|
return false
|
||||||
|
|
|
@ -254,8 +254,8 @@ struct HeapIterator<T: Comparable>: IteratorProtocol {
|
||||||
extension Heap: Sequence {
|
extension Heap: Sequence {
|
||||||
typealias Element = T
|
typealias Element = T
|
||||||
|
|
||||||
var startIndex : Int { return self.storage.startIndex }
|
var startIndex: Int { return self.storage.startIndex }
|
||||||
var endIndex : Int { return self.storage.endIndex }
|
var endIndex: Int { return self.storage.endIndex }
|
||||||
|
|
||||||
var underestimatedCount: Int {
|
var underestimatedCount: Int {
|
||||||
return self.storage.count
|
return self.storage.count
|
||||||
|
|
|
@ -55,20 +55,20 @@ private class PromiseOrderer {
|
||||||
|
|
||||||
private extension ByteBuffer {
|
private extension ByteBuffer {
|
||||||
@discardableResult
|
@discardableResult
|
||||||
mutating func withUnsafeMutableReadableUInt8Bytes<T>(fn: (UnsafeMutableBufferPointer<UInt8>) throws -> T) rethrows -> T {
|
mutating func withUnsafeMutableReadableUInt8Bytes<T>(_ body: (UnsafeMutableBufferPointer<UInt8>) throws -> T) rethrows -> T {
|
||||||
return try self.withUnsafeMutableReadableBytes { (ptr: UnsafeMutableRawBufferPointer) -> T in
|
return try self.withUnsafeMutableReadableBytes { (ptr: UnsafeMutableRawBufferPointer) -> T in
|
||||||
let baseInputPointer = ptr.baseAddress?.assumingMemoryBound(to: UInt8.self)
|
let baseInputPointer = ptr.baseAddress?.assumingMemoryBound(to: UInt8.self)
|
||||||
let inputBufferPointer = UnsafeMutableBufferPointer(start: baseInputPointer, count: ptr.count)
|
let inputBufferPointer = UnsafeMutableBufferPointer(start: baseInputPointer, count: ptr.count)
|
||||||
return try fn(inputBufferPointer)
|
return try body(inputBufferPointer)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@discardableResult
|
@discardableResult
|
||||||
mutating func writeWithUnsafeMutableUInt8Bytes(fn: (UnsafeMutableBufferPointer<UInt8>) throws -> Int) rethrows -> Int {
|
mutating func writeWithUnsafeMutableUInt8Bytes(_ body: (UnsafeMutableBufferPointer<UInt8>) throws -> Int) rethrows -> Int {
|
||||||
return try self.writeWithUnsafeMutableBytes { (ptr: UnsafeMutableRawBufferPointer) -> Int in
|
return try self.writeWithUnsafeMutableBytes { (ptr: UnsafeMutableRawBufferPointer) -> Int in
|
||||||
let baseInputPointer = ptr.baseAddress?.assumingMemoryBound(to: UInt8.self)
|
let baseInputPointer = ptr.baseAddress?.assumingMemoryBound(to: UInt8.self)
|
||||||
let inputBufferPointer = UnsafeMutableBufferPointer(start: baseInputPointer, count: ptr.count)
|
let inputBufferPointer = UnsafeMutableBufferPointer(start: baseInputPointer, count: ptr.count)
|
||||||
return try fn(inputBufferPointer)
|
return try body(inputBufferPointer)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,14 +20,14 @@ private final class TestChannelInboundHandler: ChannelInboundHandler {
|
||||||
public typealias InboundIn = HTTPServerRequestPart
|
public typealias InboundIn = HTTPServerRequestPart
|
||||||
public typealias InboundOut = HTTPServerRequestPart
|
public typealias InboundOut = HTTPServerRequestPart
|
||||||
|
|
||||||
private let fn: (HTTPServerRequestPart) -> HTTPServerRequestPart
|
private let body: (HTTPServerRequestPart) -> HTTPServerRequestPart
|
||||||
|
|
||||||
init(_ fn: @escaping (HTTPServerRequestPart) -> HTTPServerRequestPart) {
|
init(_ body: @escaping (HTTPServerRequestPart) -> HTTPServerRequestPart) {
|
||||||
self.fn = fn
|
self.body = body
|
||||||
}
|
}
|
||||||
|
|
||||||
public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
|
public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
|
||||||
ctx.fireChannelRead(self.wrapInboundOut(self.fn(self.unwrapInboundIn(data))))
|
ctx.fireChannelRead(self.wrapInboundOut(self.body(self.unwrapInboundIn(data))))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -921,16 +921,16 @@ class ByteBufferTest: XCTestCase {
|
||||||
buf.changeCapacity(to: 1024)
|
buf.changeCapacity(to: 1024)
|
||||||
buf.write(staticString: "hello world, just some trap bytes here")
|
buf.write(staticString: "hello world, just some trap bytes here")
|
||||||
|
|
||||||
func testIndexAndLengthFunc<T>(_ fn: (Int, Int) -> T?, file: StaticString = #file, line: UInt = #line) {
|
func testIndexAndLengthFunc<T>(_ body: (Int, Int) -> T?, file: StaticString = #file, line: UInt = #line) {
|
||||||
XCTAssertNil(fn(Int.max, 1), file: file, line: line)
|
XCTAssertNil(body(Int.max, 1), file: file, line: line)
|
||||||
XCTAssertNil(fn(Int.max - 1, 2), file: file, line: line)
|
XCTAssertNil(body(Int.max - 1, 2), file: file, line: line)
|
||||||
XCTAssertNil(fn(1, Int.max), file: file, line: line)
|
XCTAssertNil(body(1, Int.max), file: file, line: line)
|
||||||
XCTAssertNil(fn(2, Int.max - 1), file: file, line: line)
|
XCTAssertNil(body(2, Int.max - 1), file: file, line: line)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testIndexOrLengthFunc<T>(_ fn: (Int) -> T?, file: StaticString = #file, line: UInt = #line) {
|
func testIndexOrLengthFunc<T>(_ body: (Int) -> T?, file: StaticString = #file, line: UInt = #line) {
|
||||||
XCTAssertNil(fn(Int.max))
|
XCTAssertNil(body(Int.max))
|
||||||
XCTAssertNil(fn(Int.max - 1))
|
XCTAssertNil(body(Int.max - 1))
|
||||||
}
|
}
|
||||||
|
|
||||||
testIndexOrLengthFunc({ x in buf.getInteger(at: x) as UInt16? })
|
testIndexOrLengthFunc({ x in buf.getInteger(at: x) as UInt16? })
|
||||||
|
@ -1146,8 +1146,8 @@ class ByteBufferTest: XCTestCase {
|
||||||
return self.storage.index(after: i)
|
return self.storage.index(after: i)
|
||||||
}
|
}
|
||||||
|
|
||||||
func withUnsafeBytes<R>(_ fn: (UnsafeRawBufferPointer) throws -> R) rethrows -> R {
|
func withUnsafeBytes<R>(_ body: (UnsafeRawBufferPointer) throws -> R) rethrows -> R {
|
||||||
return try self.storage.withUnsafeBytes(fn)
|
return try self.storage.withUnsafeBytes(body)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
buf.clear()
|
buf.clear()
|
||||||
|
|
|
@ -100,15 +100,15 @@ class ChannelPipelineTest: XCTestCase {
|
||||||
typealias OutboundIn = In
|
typealias OutboundIn = In
|
||||||
typealias OutboundOut = Out
|
typealias OutboundOut = Out
|
||||||
|
|
||||||
private let fn: (OutboundIn) throws -> OutboundOut
|
private let body: (OutboundIn) throws -> OutboundOut
|
||||||
|
|
||||||
init(_ fn: @escaping (OutboundIn) throws -> OutboundOut) {
|
init(_ body: @escaping (OutboundIn) throws -> OutboundOut) {
|
||||||
self.fn = fn
|
self.body = body
|
||||||
}
|
}
|
||||||
|
|
||||||
public func write(ctx: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
|
public func write(ctx: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
|
||||||
do {
|
do {
|
||||||
ctx.write(self.wrapOutboundOut(try fn(self.unwrapOutboundIn(data))), promise: promise)
|
ctx.write(self.wrapOutboundOut(try body(self.unwrapOutboundIn(data))), promise: promise)
|
||||||
} catch let err {
|
} catch let err {
|
||||||
promise!.fail(error: err)
|
promise!.fail(error: err)
|
||||||
}
|
}
|
||||||
|
@ -293,14 +293,14 @@ class ChannelPipelineTest: XCTestCase {
|
||||||
class SomeHandler: ChannelInboundHandler {
|
class SomeHandler: ChannelInboundHandler {
|
||||||
typealias InboundIn = Never
|
typealias InboundIn = Never
|
||||||
|
|
||||||
let fn: (ChannelHandlerContext) -> Void
|
let body: (ChannelHandlerContext) -> Void
|
||||||
|
|
||||||
init(_ fn: @escaping (ChannelHandlerContext) -> Void) {
|
init(_ body: @escaping (ChannelHandlerContext) -> Void) {
|
||||||
self.fn = fn
|
self.body = body
|
||||||
}
|
}
|
||||||
|
|
||||||
func handlerAdded(ctx: ChannelHandlerContext) {
|
func handlerAdded(ctx: ChannelHandlerContext) {
|
||||||
self.fn(ctx)
|
self.body(ctx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -198,7 +198,7 @@ public class ChannelTests: XCTestCase {
|
||||||
try clientChannel.close().wait()
|
try clientChannel.close().wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
private func withPendingStreamWritesManager(_ fn: (PendingStreamWritesManager) throws -> Void) rethrows {
|
private func withPendingStreamWritesManager(_ body: (PendingStreamWritesManager) throws -> Void) rethrows {
|
||||||
try withExtendedLifetime(NSObject()) { o in
|
try withExtendedLifetime(NSObject()) { o in
|
||||||
var iovecs: [IOVector] = Array(repeating: iovec(), count: Socket.writevLimitIOVectors + 1)
|
var iovecs: [IOVector] = Array(repeating: iovec(), count: Socket.writevLimitIOVectors + 1)
|
||||||
var managed: [Unmanaged<AnyObject>] = Array(repeating: Unmanaged.passUnretained(o), count: Socket.writevLimitIOVectors + 1)
|
var managed: [Unmanaged<AnyObject>] = Array(repeating: Unmanaged.passUnretained(o), count: Socket.writevLimitIOVectors + 1)
|
||||||
|
@ -212,7 +212,7 @@ public class ChannelTests: XCTestCase {
|
||||||
XCTAssertFalse(pwm.isFlushPending)
|
XCTAssertFalse(pwm.isFlushPending)
|
||||||
XCTAssertTrue(pwm.isWritable)
|
XCTAssertTrue(pwm.isWritable)
|
||||||
|
|
||||||
try fn(pwm)
|
try body(pwm)
|
||||||
|
|
||||||
XCTAssertTrue(pwm.isEmpty)
|
XCTAssertTrue(pwm.isEmpty)
|
||||||
XCTAssertFalse(pwm.isFlushPending)
|
XCTAssertFalse(pwm.isFlushPending)
|
||||||
|
|
|
@ -247,7 +247,7 @@ public class MessageToByteEncoderTest: XCTestCase {
|
||||||
|
|
||||||
public func encode(ctx: ChannelHandlerContext, data value: Int32, out: inout ByteBuffer) throws {
|
public func encode(ctx: ChannelHandlerContext, data value: Int32, out: inout ByteBuffer) throws {
|
||||||
XCTAssertEqual(MemoryLayout<Int32>.size, out.writableBytes)
|
XCTAssertEqual(MemoryLayout<Int32>.size, out.writableBytes)
|
||||||
out.write(integer: value);
|
out.write(integer: value)
|
||||||
}
|
}
|
||||||
|
|
||||||
public func allocateOutBuffer(ctx: ChannelHandlerContext, data: Int32) throws -> ByteBuffer {
|
public func allocateOutBuffer(ctx: ChannelHandlerContext, data: Int32) throws -> ByteBuffer {
|
||||||
|
@ -261,7 +261,7 @@ public class MessageToByteEncoderTest: XCTestCase {
|
||||||
|
|
||||||
public func encode(ctx: ChannelHandlerContext, data value: Int32, out: inout ByteBuffer) throws {
|
public func encode(ctx: ChannelHandlerContext, data value: Int32, out: inout ByteBuffer) throws {
|
||||||
XCTAssertEqual(MemoryLayout<Int32>.size, 256)
|
XCTAssertEqual(MemoryLayout<Int32>.size, 256)
|
||||||
out.write(integer: value);
|
out.write(integer: value)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -190,10 +190,10 @@ public class EventLoopTest : XCTestCase {
|
||||||
|
|
||||||
public func testEventLoopThreads() throws {
|
public func testEventLoopThreads() throws {
|
||||||
var counter = 0
|
var counter = 0
|
||||||
let fn: ThreadInitializer = { t in
|
let body: ThreadInitializer = { t in
|
||||||
counter += 1
|
counter += 1
|
||||||
}
|
}
|
||||||
let threads: [ThreadInitializer] = [fn, fn]
|
let threads: [ThreadInitializer] = [body, body]
|
||||||
|
|
||||||
let group = MultiThreadedEventLoopGroup(threadInitializers: threads)
|
let group = MultiThreadedEventLoopGroup(threadInitializers: threads)
|
||||||
|
|
||||||
|
@ -203,12 +203,12 @@ public class EventLoopTest : XCTestCase {
|
||||||
|
|
||||||
public func testEventLoopPinned() throws {
|
public func testEventLoopPinned() throws {
|
||||||
#if os(Linux)
|
#if os(Linux)
|
||||||
let fn: ThreadInitializer = { t in
|
let body: ThreadInitializer = { t in
|
||||||
let set = LinuxCPUSet(0)
|
let set = LinuxCPUSet(0)
|
||||||
t.affinity = set
|
t.affinity = set
|
||||||
XCTAssertEqual(set, t.affinity)
|
XCTAssertEqual(set, t.affinity)
|
||||||
}
|
}
|
||||||
let threads: [ThreadInitializer] = [fn, fn]
|
let threads: [ThreadInitializer] = [fn, body]
|
||||||
|
|
||||||
let group = MultiThreadedEventLoopGroup(threadInitializers: threads)
|
let group = MultiThreadedEventLoopGroup(threadInitializers: threads)
|
||||||
|
|
||||||
|
|
|
@ -55,7 +55,7 @@ class PendingDatagramWritesManagerTests: XCTestCase {
|
||||||
case error(Error)
|
case error(Error)
|
||||||
}
|
}
|
||||||
|
|
||||||
private func withPendingDatagramWritesManager(_ fn: (PendingDatagramWritesManager) throws -> Void) rethrows {
|
private func withPendingDatagramWritesManager(_ body: (PendingDatagramWritesManager) throws -> Void) rethrows {
|
||||||
try withExtendedLifetime(NSObject()) { o in
|
try withExtendedLifetime(NSObject()) { o in
|
||||||
var iovecs: [IOVector] = Array(repeating: iovec(), count: Socket.writevLimitIOVectors + 1)
|
var iovecs: [IOVector] = Array(repeating: iovec(), count: Socket.writevLimitIOVectors + 1)
|
||||||
var managed: [Unmanaged<AnyObject>] = Array(repeating: Unmanaged.passUnretained(o), count: Socket.writevLimitIOVectors + 1)
|
var managed: [Unmanaged<AnyObject>] = Array(repeating: Unmanaged.passUnretained(o), count: Socket.writevLimitIOVectors + 1)
|
||||||
|
@ -73,7 +73,7 @@ class PendingDatagramWritesManagerTests: XCTestCase {
|
||||||
XCTAssertFalse(pwm.isFlushPending)
|
XCTAssertFalse(pwm.isFlushPending)
|
||||||
XCTAssertTrue(pwm.isWritable)
|
XCTAssertTrue(pwm.isWritable)
|
||||||
|
|
||||||
try fn(pwm)
|
try body(pwm)
|
||||||
|
|
||||||
XCTAssertTrue(pwm.isEmpty)
|
XCTAssertTrue(pwm.isEmpty)
|
||||||
XCTAssertFalse(pwm.isFlushPending)
|
XCTAssertFalse(pwm.isFlushPending)
|
||||||
|
|
|
@ -17,27 +17,27 @@ import Foundation
|
||||||
@testable import NIO
|
@testable import NIO
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
public func measureRunTime(_ fn: () throws -> Int) rethrows -> TimeInterval {
|
public func measureRunTime(_ body: () throws -> Int) rethrows -> TimeInterval {
|
||||||
func measureOne(_ fn: () throws -> Int) rethrows -> TimeInterval {
|
func measureOne(_ body: () throws -> Int) rethrows -> TimeInterval {
|
||||||
let start = Date()
|
let start = Date()
|
||||||
_ = try fn()
|
_ = try body()
|
||||||
let end = Date()
|
let end = Date()
|
||||||
return end.timeIntervalSince(start)
|
return end.timeIntervalSince(start)
|
||||||
}
|
}
|
||||||
|
|
||||||
_ = try measureOne(fn)
|
_ = try measureOne(body)
|
||||||
var measurements = Array(repeating: 0.0, count: 10)
|
var measurements = Array(repeating: 0.0, count: 10)
|
||||||
for i in 0..<10 {
|
for i in 0..<10 {
|
||||||
measurements[i] = try measureOne(fn)
|
measurements[i] = try measureOne(body)
|
||||||
}
|
}
|
||||||
|
|
||||||
//return measurements.reduce(0, +) / 10.0
|
//return measurements.reduce(0, +) / 10.0
|
||||||
return measurements.min()!
|
return measurements.min()!
|
||||||
}
|
}
|
||||||
|
|
||||||
public func measureRunTimeAndPrint(desc: String, fn: () throws -> Int) rethrows -> Void {
|
public func measureRunTimeAndPrint(desc: String, body: () throws -> Int) rethrows -> Void {
|
||||||
print("measuring: \(desc)")
|
print("measuring: \(desc)")
|
||||||
print("\(try measureRunTime(fn))s")
|
print("\(try measureRunTime(body))s")
|
||||||
}
|
}
|
||||||
|
|
||||||
enum TestError: Error {
|
enum TestError: Error {
|
||||||
|
|
|
@ -15,20 +15,20 @@
|
||||||
@testable import NIO
|
@testable import NIO
|
||||||
import XCTest
|
import XCTest
|
||||||
|
|
||||||
func withPipe(_ fn: (NIO.FileHandle, NIO.FileHandle) -> [NIO.FileHandle]) throws {
|
func withPipe(_ body: (NIO.FileHandle, NIO.FileHandle) -> [NIO.FileHandle]) throws {
|
||||||
var fds: [Int32] = [-1, -1]
|
var fds: [Int32] = [-1, -1]
|
||||||
fds.withUnsafeMutableBufferPointer { ptr in
|
fds.withUnsafeMutableBufferPointer { ptr in
|
||||||
XCTAssertEqual(0, pipe(ptr.baseAddress!))
|
XCTAssertEqual(0, pipe(ptr.baseAddress!))
|
||||||
}
|
}
|
||||||
let readFH = FileHandle(descriptor: fds[0])
|
let readFH = FileHandle(descriptor: fds[0])
|
||||||
let writeFH = FileHandle(descriptor: fds[1])
|
let writeFH = FileHandle(descriptor: fds[1])
|
||||||
let toClose = fn(readFH, writeFH)
|
let toClose = body(readFH, writeFH)
|
||||||
try toClose.forEach { fh in
|
try toClose.forEach { fh in
|
||||||
XCTAssertNoThrow(try fh.close())
|
XCTAssertNoThrow(try fh.close())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func withTemporaryFile<T>(content: String? = nil, _ fn: (NIO.FileHandle, String) throws -> T) rethrows -> T {
|
func withTemporaryFile<T>(content: String? = nil, _ body: (NIO.FileHandle, String) throws -> T) rethrows -> T {
|
||||||
let (fd, path) = openTemporaryFile()
|
let (fd, path) = openTemporaryFile()
|
||||||
let fileHandle = FileHandle(descriptor: fd)
|
let fileHandle = FileHandle(descriptor: fd)
|
||||||
defer {
|
defer {
|
||||||
|
@ -53,7 +53,7 @@ func withTemporaryFile<T>(content: String? = nil, _ fn: (NIO.FileHandle, String)
|
||||||
XCTAssertEqual(0, lseek(fd, 0, SEEK_SET))
|
XCTAssertEqual(0, lseek(fd, 0, SEEK_SET))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return try fn(fileHandle, path)
|
return try body(fileHandle, path)
|
||||||
}
|
}
|
||||||
|
|
||||||
func openTemporaryFile() -> (CInt, String) {
|
func openTemporaryFile() -> (CInt, String) {
|
||||||
|
|
Loading…
Reference in New Issue