add PipeChannel (#1138)

Motivation:

There are use-cases for SwiftNIO where there are no actual sockets but
rather two pipe file descriptors - one for input, one for output.
There's no real reason why SwiftNIO shouldn't work for those.

Modifications:

Add a PipeChannel.

Result:

More use-cases for SwiftNIO.
This commit is contained in:
Johannes Weiss 2019-10-16 22:06:55 -07:00 committed by GitHub
parent a796af1a96
commit e102aa9ae9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1579 additions and 320 deletions

View File

@ -0,0 +1,44 @@
#!/bin/bash
##===----------------------------------------------------------------------===##
##
## This source file is part of the SwiftNIO open source project
##
## Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
## Licensed under Apache License v2.0
##
## See LICENSE.txt for license information
## See CONTRIBUTORS.txt for the list of SwiftNIO project authors
##
## SPDX-License-Identifier: Apache-2.0
##
##===----------------------------------------------------------------------===##
source defines.sh
swift build
echo -ne "::: HELLO\n::: WORLD\n:::\n" > "$tmp/file"
lines_in_file="$(cat "$tmp/file" | wc -l | tr -d '\t ')"
function echo_request_close() {
echo -e 'GET /fileio/file HTTP/1.1\r\nconnection: close\r\nhost: stdio\r\n\r\n'
}
function echo_request_keep_alive() {
echo -e 'GET /fileio/file HTTP/1.1\r\nconnection: keep-alive\r\nhost: stdio\r\n\r\n'
}
echo_request_close | "$(swift build --show-bin-path)/NIOHTTP1Server" - "$tmp" | cat > "$tmp/output"
tail -n "$lines_in_file" "$tmp/output" > "$tmp/output-just-file"
assert_equal_files "$tmp/file" "$tmp/output-just-file"
how_many=100
{
for f in $(seq "$(( how_many - 1 ))" ); do
echo_request_keep_alive
done
echo_request_close
} | "$(swift build --show-bin-path)/NIOHTTP1Server" - "$tmp" | grep ^::: > "$tmp/multi-actual"
set +o pipefail # we know that 'yes' will fail with SIGPIPE
yes "$(cat "$tmp/file")" | head -n $(( lines_in_file * how_many )) > "$tmp/multi-expected"
assert_equal_files "$tmp/multi-expected" "$tmp/multi-actual"

View File

@ -12,6 +12,8 @@
//
//===----------------------------------------------------------------------===//
import NIOConcurrencyHelpers
/// A Registration on a `Selector`, which is interested in an `SelectorEventSet`.
protocol Registration {
/// The `SelectorEventSet` in which the `Registration` is interested.
@ -204,7 +206,8 @@ extension sockaddr_storage {
/// Base class for sockets.
///
/// This should not be created directly but one of its sub-classes should be used, like `ServerSocket` or `Socket`.
class BaseSocket: Selectable {
class BaseSocket: Selectable, BaseSocketProtocol {
typealias SelectableType = BaseSocket
private var descriptor: CInt
public var isOpen: Bool {
@ -248,20 +251,6 @@ class BaseSocket: Selectable {
return addr.convert()
}
private static func setNonBlocking(fileDescriptor: CInt) throws {
let flags = try Posix.fcntl(descriptor: fileDescriptor, command: F_GETFL, value: 0)
do {
let ret = try Posix.fcntl(descriptor: fileDescriptor, command: F_SETFL, value: flags | O_NONBLOCK)
assert(ret == 0, "unexpectedly, fcntl(\(fileDescriptor), F_SETFL, \(flags) | O_NONBLOCK) returned \(ret)")
} catch let error as IOError {
if error.errnoCode == EINVAL {
// Darwin seems to sometimes do this despite the docs claiming it can't happen
throw NIOFailedToSetSocketNonBlockingError()
}
throw error
}
}
/// Create a new socket and return the file descriptor of it.
///
/// - parameters:
@ -270,7 +259,7 @@ class BaseSocket: Selectable {
/// - setNonBlocking: Set non-blocking mode on the socket.
/// - returns: the file descriptor of the socket that was created.
/// - throws: An `IOError` if creation of the socket failed.
static func makeSocket(protocolFamily: Int32, type: CInt, setNonBlocking: Bool = false) throws -> Int32 {
static func makeSocket(protocolFamily: Int32, type: CInt, setNonBlocking: Bool = false) throws -> CInt {
var sockType = type
#if os(Linux)
if setNonBlocking {
@ -316,9 +305,10 @@ class BaseSocket: Selectable {
///
/// - parameters:
/// - descriptor: The file descriptor to wrap.
init(descriptor: CInt) {
init(descriptor: CInt) throws {
precondition(descriptor >= 0, "invalid file descriptor")
self.descriptor = descriptor
try self.ignoreSIGPIPE(descriptor: descriptor)
}
deinit {
@ -431,6 +421,6 @@ class BaseSocket: Selectable {
extension BaseSocket: CustomStringConvertible {
var description: String {
return "BaseSocket { fd=\(self.descriptor) } "
return "BaseSocket { fd=\(self.descriptor) }"
}
}

View File

@ -202,15 +202,15 @@ private struct SocketChannelLifecycleManager {
/// For this reason, `BaseSocketChannel` exists to provide a common core implementation of
/// the `SelectableChannel` protocol. It uses a number of private functions to provide hooks
/// for subclasses to implement the specific logic to handle their writes and reads.
class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
typealias SelectableType = T
class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, ChannelCore {
typealias SelectableType = SocketType.SelectableType
// MARK: Stored Properties
// Visible to access from EventLoop directly
public let parent: Channel?
internal let socket: T
internal let socket: SocketType
private let closePromise: EventLoopPromise<Void>
private let selectableEventLoop: SelectableEventLoop
internal let selectableEventLoop: SelectableEventLoop
private let addressesCached: AtomicBox<Box<(local:SocketAddress?, remote:SocketAddress?)>> = AtomicBox(value: Box((local: nil, remote: nil)))
private let bufferAllocatorCached: AtomicBox<Box<ByteBufferAllocator>>
private let isActiveAtomic: Atomic<Bool> = Atomic(value: false)
@ -302,10 +302,6 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
return self.lifecycleManager.isPreRegistered
}
internal var selectable: T {
return self.socket
}
// This is `Channel` API so must be thread-safe.
public var isActive: Bool {
return self.isActiveAtomic.load()
@ -344,11 +340,6 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
fatalError("must be overridden")
}
/// Provides the registration for this selector. Must be implemented by subclasses.
func registrationFor(interested: SelectorEventSet) -> NIORegistration {
fatalError("must override")
}
/// Read data from the underlying socket and dispatch it to the `ChannelPipeline`
///
/// - returns: `true` if any data was read, `false` otherwise.
@ -388,7 +379,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}
// MARK: Common base socket logic.
init(socket: T, parent: Channel? = nil, eventLoop: SelectableEventLoop, recvAllocator: RecvByteBufferAllocator) throws {
init(socket: SocketType, parent: Channel?, eventLoop: SelectableEventLoop, recvAllocator: RecvByteBufferAllocator) throws {
self.bufferAllocatorCached = AtomicBox(value: Box(self.bufferAllocator))
self.socket = socket
self.selectableEventLoop = eventLoop
@ -398,7 +389,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
self.lifecycleManager = SocketChannelLifecycleManager(eventLoop: eventLoop, isActiveAtomic: self.isActiveAtomic)
// As the socket may already be connected we should ensure we start with the correct addresses cached.
self.addressesCached.store(Box((local: try? socket.localAddress(), remote: try? socket.remoteAddress())))
self.socketDescription = (try? socket.withUnsafeFileDescriptor { "socket fd: \($0)" }) ?? "[closed socket]"
self.socketDescription = socket.description
self._pipeline = ChannelPipeline(channel: self)
}
@ -884,7 +875,11 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}
}
final func readEOF() {
func writeEOF() {
fatalError("\(self) received writeEOF which is unexpected")
}
func readEOF() {
assert(!self.lifecycleManager.hasSeenEOFNotification)
self.lifecycleManager.hasSeenEOFNotification = true
@ -1111,7 +1106,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
assert(self.lifecycleManager.isRegisteredFully)
guard self.isOpen else {
assert(self.interestedEvent == .reset, "interestedEvent=\(self.interestedEvent) event though we're closed")
assert(self.interestedEvent == .reset, "interestedEvent=\(self.interestedEvent) even though we're closed")
return
}
if interested == interestedEvent {
@ -1177,4 +1172,16 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
self.registerForReadEOF()
self.readIfNeeded0()
}
func register(selector: Selector<NIORegistration>, interested: SelectorEventSet) throws {
fatalError("must override")
}
func deregister(selector: Selector<NIORegistration>, mode: CloseMode) throws {
fatalError("must override")
}
func reregister(selector: Selector<NIORegistration>, interested: SelectorEventSet) throws {
fatalError("must override")
}
}

View File

@ -0,0 +1,243 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2019 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket> {
internal var connectTimeoutScheduled: Scheduled<Void>?
private var allowRemoteHalfClosure: Bool = false
private var inputShutdown: Bool = false
private var outputShutdown: Bool = false
private let pendingWrites: PendingStreamWritesManager
override init(socket: Socket,
parent: Channel?,
eventLoop: SelectableEventLoop,
recvAllocator: RecvByteBufferAllocator) throws {
self.pendingWrites = PendingStreamWritesManager(iovecs: eventLoop.iovecs, storageRefs: eventLoop.storageRefs)
try super.init(socket: socket, parent: parent, eventLoop: eventLoop, recvAllocator: recvAllocator)
}
deinit {
// We should never have any pending writes left as otherwise we may leak callbacks
assert(self.pendingWrites.isEmpty)
}
// MARK: BaseSocketChannel's must override API that might be further refined by subclasses
override func setOption0<Option: ChannelOption>(_ option: Option, value: Option.Value) throws {
self.eventLoop.assertInEventLoop()
guard self.isOpen else {
throw ChannelError.ioOnClosedChannel
}
switch option {
case _ as AllowRemoteHalfClosureOption:
self.allowRemoteHalfClosure = value as! Bool
case _ as WriteSpinOption:
self.pendingWrites.writeSpinCount = value as! UInt
case _ as WriteBufferWaterMarkOption:
self.pendingWrites.waterMark = value as! WriteBufferWaterMark
default:
try super.setOption0(option, value: value)
}
}
override func getOption0<Option: ChannelOption>(_ option: Option) throws -> Option.Value {
self.eventLoop.assertInEventLoop()
guard self.isOpen else {
throw ChannelError.ioOnClosedChannel
}
switch option {
case _ as AllowRemoteHalfClosureOption:
return self.allowRemoteHalfClosure as! Option.Value
case _ as WriteSpinOption:
return self.pendingWrites.writeSpinCount as! Option.Value
case _ as WriteBufferWaterMarkOption:
return self.pendingWrites.waterMark as! Option.Value
default:
return try super.getOption0(option)
}
}
// MARK: BaseSocketChannel's must override API that cannot be further refined by subclasses
// This is `Channel` API so must be thread-safe.
final override public var isWritable: Bool {
return self.pendingWrites.isWritable
}
final override var isOpen: Bool {
self.eventLoop.assertInEventLoop()
assert(super.isOpen == self.pendingWrites.isOpen)
return super.isOpen
}
final override func readFromSocket() throws -> ReadResult {
self.eventLoop.assertInEventLoop()
// Just allocate one time for the while read loop. This is fine as ByteBuffer is a struct and uses COW.
var buffer = self.recvAllocator.buffer(allocator: allocator)
var result = ReadResult.none
for i in 1...self.maxMessagesPerRead {
guard self.isOpen && !self.inputShutdown else {
throw ChannelError.eof
}
// Reset reader and writerIndex and so allow to have the buffer filled again. This is better here than at
// the end of the loop to not do an allocation when the loop exits.
buffer.clear()
switch try buffer.withMutableWritePointer(body: self.socket.read(pointer:)) {
case .processed(let bytesRead):
if bytesRead > 0 {
let mayGrow = recvAllocator.record(actualReadBytes: bytesRead)
self.readPending = false
assert(self.isActive)
self.pipeline.fireChannelRead0(NIOAny(buffer))
result = .some
if buffer.writableBytes > 0 {
// If we did not fill the whole buffer with read(...) we should stop reading and wait until we get notified again.
// Otherwise chances are good that the next read(...) call will either read nothing or only a very small amount of data.
// Also this will allow us to call fireChannelReadComplete() which may give the user the chance to flush out all pending
// writes.
return result
} else if mayGrow && i < self.maxMessagesPerRead {
// if the ByteBuffer may grow on the next allocation due we used all the writable bytes we should allocate a new `ByteBuffer` to allow ramping up how much data
// we are able to read on the next read operation.
buffer = self.recvAllocator.buffer(allocator: allocator)
}
} else {
if self.inputShutdown {
// We received a EOF because we called shutdown on the fd by ourself, unregister from the Selector and return
self.readPending = false
self.unregisterForReadable()
return result
}
// end-of-file
throw ChannelError.eof
}
case .wouldBlock(let bytesRead):
assert(bytesRead == 0)
return result
}
}
return result
}
final override func writeToSocket() throws -> OverallWriteResult {
let result = try self.pendingWrites.triggerAppropriateWriteOperations(scalarBufferWriteOperation: { ptr in
guard ptr.count > 0 else {
// No need to call write if the buffer is empty.
return .processed(0)
}
// normal write
return try self.socket.write(pointer: ptr)
}, vectorBufferWriteOperation: { ptrs in
// Gathering write
try self.socket.writev(iovecs: ptrs)
}, scalarFileWriteOperation: { descriptor, index, endIndex in
try self.socket.sendFile(fd: descriptor, offset: index, count: endIndex - index)
})
if result.writable {
// writable again
self.pipeline.fireChannelWritabilityChanged0()
}
return result.writeResult
}
final override func close0(error: Error, mode: CloseMode, promise: EventLoopPromise<Void>?) {
do {
switch mode {
case .output:
if self.outputShutdown {
promise?.fail(ChannelError.outputClosed)
return
}
try self.socket.shutdown(how: .WR)
self.outputShutdown = true
// Fail all pending writes and so ensure all pending promises are notified
self.pendingWrites.failAll(error: error, close: false)
self.unregisterForWritable()
promise?.succeed(())
self.pipeline.fireUserInboundEventTriggered(ChannelEvent.outputClosed)
case .input:
if self.inputShutdown {
promise?.fail(ChannelError.inputClosed)
return
}
switch error {
case ChannelError.eof:
// No need to explicit call socket.shutdown(...) as we received an EOF and the call would only cause
// ENOTCON
break
default:
try socket.shutdown(how: .RD)
}
self.inputShutdown = true
self.unregisterForReadable()
promise?.succeed(())
self.pipeline.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
case .all:
if let timeout = self.connectTimeoutScheduled {
self.connectTimeoutScheduled = nil
timeout.cancel()
}
super.close0(error: error, mode: mode, promise: promise)
}
} catch let err {
promise?.fail(err)
}
}
final override func markFlushPoint() {
// Even if writable() will be called later by the EventLoop we still need to mark the flush checkpoint so we are sure all the flushed messages
// are actually written once writable() is called.
self.pendingWrites.markFlushCheckpoint()
}
final override func cancelWritesOnClose(error: Error) {
self.pendingWrites.failAll(error: error, close: true)
}
@discardableResult
final override func readIfNeeded0() -> Bool {
if self.inputShutdown {
return false
}
return super.readIfNeeded0()
}
final override public func read0() {
if self.inputShutdown {
return
}
super.read0()
}
final override func bufferPendingWrite(data: NIOAny, promise: EventLoopPromise<Void>?) {
if self.outputShutdown {
promise?.fail(ChannelError.outputClosed)
return
}
let data = data.forceAsIOData()
if !self.pendingWrites.add(data: data, promise: promise) {
self.pipeline.fireChannelWritabilityChanged0()
}
}
}

View File

@ -475,7 +475,7 @@ public final class ClientBootstrap {
///
/// - parameters:
/// - descriptor: The _Unix file descriptor_ representing the connected stream socket.
/// - returns: an `EventLoopFuture<Channel>` to deliver the `Channel` immediately.
/// - returns: an `EventLoopFuture<Channel>` to deliver the `Channel`.
public func withConnectedSocket(descriptor: CInt) -> EventLoopFuture<Channel> {
let eventLoop = group.next()
let channelInitializer = self.channelInitializer ?? { _ in eventLoop.makeSucceededFuture(()) }
@ -705,3 +705,116 @@ public final class DatagramBootstrap {
}
}
}
/// A `NIOPipeBootstrap` is an easy way to bootstrap a `PipeChannel` which uses two (uni-directional) UNIX pipes
/// and makes a `Channel` out of them.
///
/// Example bootstrapping a `Channel` using `stdin` and `stdout`:
///
/// let channel = try NIOPipeBootstrap(group: group)
/// .channelInitializer { channel in
/// channel.pipeline.addHandler(MyChannelHandler())
/// }
/// .withPipes(inputDescriptor: STDIN_FILENO, outputDescriptor: STDOUT_FILENO)
///
public final class NIOPipeBootstrap {
private let group: EventLoopGroup
private var channelInitializer: ((Channel) -> EventLoopFuture<Void>)?
@usableFromInline
internal var _channelOptions = ChannelOptions.Storage()
/// Create a `NIOPipeBootstrap` on the `EventLoopGroup` `group`.
///
/// - parameters:
/// - group: The `EventLoopGroup` to use.
public init(group: EventLoopGroup) {
self.group = group
}
/// Initialize the connected `PipeChannel` with `initializer`. The most common task in initializer is to add
/// `ChannelHandler`s to the `ChannelPipeline`.
///
/// The connected `Channel` will operate on `ByteBuffer` as inbound and outbound messages. Please note that
/// `IOData.fileRegion` is _not_ supported for `PipeChannel`s because `sendfile` only works on sockets.
///
/// - parameters:
/// - handler: A closure that initializes the provided `Channel`.
public func channelInitializer(_ handler: @escaping (Channel) -> EventLoopFuture<Void>) -> Self {
self.channelInitializer = handler
return self
}
/// Specifies a `ChannelOption` to be applied to the `PipeChannel`.
///
/// - parameters:
/// - option: The option to be applied.
/// - value: The value for the option.
@inlinable
public func channelOption<Option: ChannelOption>(_ option: Option, value: Option.Value) -> Self {
self._channelOptions.append(key: option, value: value)
return self
}
private func validateFileDescriptorIsNotAFile(_ descriptor: CInt) throws {
precondition(MultiThreadedEventLoopGroup.currentEventLoop == nil,
"limitation in SwiftNIO: cannot bootstrap PipeChannel on EventLoop")
var s: stat = .init()
try withUnsafeMutablePointer(to: &s) { ptr in
try Posix.fstat(descriptor: descriptor, outStat: ptr)
}
if (s.st_mode & S_IFREG) != 0 || (s.st_mode & S_IFDIR) != 0 {
throw ChannelError.operationUnsupported
}
}
/// Create the `PipeChannel` with the provided input and output file descriptors.
///
/// - parameters:
/// - inputDescriptor: The _Unix file descriptor_ for the input (ie. the read side).
/// - outputDescriptor: The _Unix file descriptor_ for the output (ie. the write side).
/// - returns: an `EventLoopFuture<Channel>` to deliver the `Channel`.
public func withPipes(inputDescriptor: CInt, outputDescriptor: CInt) -> EventLoopFuture<Channel> {
let eventLoop = group.next()
do {
try self.validateFileDescriptorIsNotAFile(inputDescriptor)
try self.validateFileDescriptorIsNotAFile(outputDescriptor)
} catch {
return eventLoop.makeFailedFuture(error)
}
let channelInitializer = self.channelInitializer ?? { _ in eventLoop.makeSucceededFuture(()) }
let channel: PipeChannel
do {
let inputFH = NIOFileHandle(descriptor: inputDescriptor)
let outputFH = NIOFileHandle(descriptor: outputDescriptor)
channel = try PipeChannel(eventLoop: eventLoop as! SelectableEventLoop,
inputPipe: inputFH,
outputPipe: outputFH)
} catch {
return eventLoop.makeFailedFuture(error)
}
func setupChannel() -> EventLoopFuture<Channel> {
eventLoop.assertInEventLoop()
// We need to hop to `eventLoop` as the user might have returned a future from a different `EventLoop`.
return channelInitializer(channel).hop(to: eventLoop).flatMap {
self._channelOptions.applyAllChannelOptions(to: channel)
}.flatMap {
let promise = eventLoop.makePromise(of: Void.self)
channel.registerAlreadyConfigured0(promise: promise)
return promise.futureResult
}.map {
channel
}.flatMapError { error in
channel.close0(error: error, mode: .all, promise: nil)
return channel.eventLoop.makeFailedFuture(error)
}
}
if eventLoop.inEventLoop {
return setupChannel()
} else {
return eventLoop.submit{ setupChannel() }.flatMap { $0 }
}
}
}

View File

@ -151,8 +151,7 @@ internal protocol SelectableChannel: Channel {
/// `Selector`.
associatedtype SelectableType: Selectable
/// Returns the `Selectable` which usually contains the file descriptor for the socket.
var selectable: SelectableType { get }
var isOpen: Bool { get }
/// The event(s) of interest.
var interestedEvent: SelectorEventSet { get }
@ -166,15 +165,17 @@ internal protocol SelectableChannel: Channel {
/// Called when the read side of the `SelectableChannel` hit EOF.
func readEOF()
/// Called when the read side of the `SelectableChannel` hit EOF.
func writeEOF()
/// Called when the `SelectableChannel` was reset (ie. is now unusable)
func reset()
/// Creates a registration for the `interested` `SelectorEventSet` suitable for this `Channel`.
///
/// - parameters:
/// - interested: The event(s) of interest.
/// - returns: A suitable registration for the `SelectorEventSet` of interest.
func registrationFor(interested: SelectorEventSet) -> NIORegistration
func register(selector: Selector<NIORegistration>, interested: SelectorEventSet) throws
func deregister(selector: Selector<NIORegistration>, mode: CloseMode) throws
func reregister(selector: Selector<NIORegistration>, interested: SelectorEventSet) throws
}
/// Default implementations which will start on the head of the `ChannelPipeline`.

View File

@ -580,6 +580,7 @@ enum NIORegistration: Registration {
case serverSocketChannel(ServerSocketChannel, SelectorEventSet)
case socketChannel(SocketChannel, SelectorEventSet)
case datagramChannel(DatagramChannel, SelectorEventSet)
case pipeChannel(PipeChannel, PipeChannel.Direction, SelectorEventSet)
/// The `SelectorEventSet` in which this `NIORegistration` is interested in.
var interested: SelectorEventSet {
@ -591,6 +592,8 @@ enum NIORegistration: Registration {
self = .socketChannel(c, newValue)
case .datagramChannel(let c, _):
self = .datagramChannel(c, newValue)
case .pipeChannel(let c, let d, _):
self = .pipeChannel(c, d, newValue)
}
}
get {
@ -601,6 +604,8 @@ enum NIORegistration: Registration {
return i
case .datagramChannel(_, let i):
return i
case .pipeChannel(_, _, let i):
return i
}
}
}
@ -710,24 +715,26 @@ internal final class SelectableEventLoop: EventLoop {
throw EventLoopError.shutdown
}
try selector.register(selectable: channel.selectable, interested: channel.interestedEvent, makeRegistration: channel.registrationFor(interested:))
try channel.register(selector: self.selector, interested: channel.interestedEvent)
}
/// Deregister the given `SelectableChannel` from this `SelectableEventLoop`.
public func deregister<C: SelectableChannel>(channel: C) throws {
public func deregister<C: SelectableChannel>(channel: C, mode: CloseMode = .all) throws {
self.assertInEventLoop()
guard lifecycleState == .open else {
// It's possible the EventLoop was closed before we were able to call deregister, so just return in this case as there is no harm.
return
}
try selector.deregister(selectable: channel.selectable)
try channel.deregister(selector: self.selector, mode: mode)
}
/// Register the given `SelectableChannel` with this `SelectableEventLoop`. This should be done whenever `channel.interestedEvents` has changed and it should be taken into account when
/// waiting for new I/O for the given `SelectableChannel`.
public func reregister<C: SelectableChannel>(channel: C) throws {
self.assertInEventLoop()
try selector.reregister(selectable: channel.selectable, interested: channel.interestedEvent)
try channel.reregister(selector: self.selector, interested: channel.interestedEvent)
}
/// - see: `EventLoop.inEventLoop`
@ -789,8 +796,8 @@ internal final class SelectableEventLoop: EventLoop {
}
/// Handle the given `SelectorEventSet` for the `SelectableChannel`.
private func handleEvent<C: SelectableChannel>(_ ev: SelectorEventSet, channel: C) {
guard channel.selectable.isOpen else {
internal final func handleEvent<C: SelectableChannel>(_ ev: SelectorEventSet, channel: C) {
guard channel.isOpen else {
return
}
@ -798,10 +805,16 @@ internal final class SelectableEventLoop: EventLoop {
if ev.contains(.reset) {
channel.reset()
} else {
if ev.contains(.write) {
if ev.contains(.writeEOF) {
channel.writeEOF()
guard channel.isOpen else {
return
}
} else if ev.contains(.write) {
channel.writable()
guard channel.selectable.isOpen else {
guard channel.isOpen else {
return
}
}
@ -861,6 +874,15 @@ internal final class SelectableEventLoop: EventLoop {
self.handleEvent(ev.io, channel: chan)
case .datagramChannel(let chan, _):
self.handleEvent(ev.io, channel: chan)
case .pipeChannel(let chan, let direction, _):
var ev = ev
if ev.io.contains(.reset) {
// .reset needs special treatment here because we're dealing with two separate pipes instead
// of one socket. So we turn .reset input .readEOF/.writeEOF.
ev.io.subtract([.reset])
ev.io.formUnion([direction == .input ? .readEOF : .writeEOF])
}
self.handleEvent(ev.io, channel: chan)
}
}
}

View File

@ -31,3 +31,19 @@ public protocol FileDescriptor {
/// Close this `FileDescriptor`.
func close() throws
}
extension FileDescriptor {
internal static func setNonBlocking(fileDescriptor: CInt) throws {
let flags = try Posix.fcntl(descriptor: fileDescriptor, command: F_GETFL, value: 0)
do {
let ret = try Posix.fcntl(descriptor: fileDescriptor, command: F_SETFL, value: flags | O_NONBLOCK)
assert(ret == 0, "unexpectedly, fcntl(\(fileDescriptor), F_SETFL, \(flags) | O_NONBLOCK) returned \(ret)")
} catch let error as IOError {
if error.errnoCode == EINVAL {
// Darwin seems to sometimes do this despite the docs claiming it can't happen
throw NIOFailedToSetSocketNonBlockingError()
}
throw error
}
}
}

View File

@ -158,6 +158,6 @@ extension NIOFileHandle {
extension NIOFileHandle: CustomStringConvertible {
public var description: String {
return "FileHandle { descriptor: \(self.descriptor), isOpen: \(self.isOpen) }"
return "FileHandle { descriptor: \(self.descriptor) }"
}
}

View File

@ -0,0 +1,101 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2019 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
final class PipeChannel: BaseStreamSocketChannel<PipePair> {
private let pipePair: PipePair
internal enum Direction {
case input
case output
}
init(eventLoop: SelectableEventLoop,
inputPipe: NIOFileHandle,
outputPipe: NIOFileHandle) throws {
self.pipePair = try PipePair(inputFD: inputPipe, outputFD: outputPipe)
try super.init(socket: self.pipePair,
parent: nil,
eventLoop: eventLoop,
recvAllocator: AdaptiveRecvByteBufferAllocator())
}
func registrationForInput(interested: SelectorEventSet) -> NIORegistration {
return .pipeChannel(self, .input, interested)
}
func registrationForOutput(interested: SelectorEventSet) -> NIORegistration {
return .pipeChannel(self, .output, interested)
}
override func connectSocket(to address: SocketAddress) throws -> Bool {
throw ChannelError.operationUnsupported
}
override func finishConnectSocket() throws {
throw ChannelError.inappropriateOperationForState
}
override func register(selector: Selector<NIORegistration>, interested: SelectorEventSet) throws {
try selector.register(selectable: self.pipePair.inputFD,
interested: interested.intersection([.read, .reset]),
makeRegistration: self.registrationForInput(interested:))
try selector.register(selectable: self.pipePair.outputFD,
interested: interested.intersection([.write, .reset]),
makeRegistration: self.registrationForOutput(interested:))
}
override func deregister(selector: Selector<NIORegistration>, mode: CloseMode) throws {
if (mode == .all || mode == .input) && self.pipePair.inputFD.isOpen {
try selector.deregister(selectable: self.pipePair.inputFD)
}
if (mode == .all || mode == .output) && self.pipePair.outputFD.isOpen {
try selector.deregister(selectable: self.pipePair.outputFD)
}
}
override func reregister(selector: Selector<NIORegistration>, interested: SelectorEventSet) throws {
if self.pipePair.inputFD.isOpen {
try selector.reregister(selectable: self.pipePair.inputFD,
interested: interested.intersection([.read, .reset]))
}
if self.pipePair.outputFD.isOpen {
try selector.reregister(selectable: self.pipePair.outputFD,
interested: interested.intersection([.write, .reset]))
}
}
override func readEOF() {
super.readEOF()
guard self.pipePair.inputFD.isOpen else {
return
}
try! self.selectableEventLoop.deregister(channel: self, mode: .input)
try! self.pipePair.inputFD.close()
}
override func writeEOF() {
guard self.pipePair.outputFD.isOpen else {
return
}
try! self.selectableEventLoop.deregister(channel: self, mode: .output)
try! self.pipePair.outputFD.close()
}
}
extension PipeChannel: CustomStringConvertible {
var description: String {
return "PipeChannel { \(self.socketDescription), active = \(self.isActive), localAddress = \(self.localAddress.debugDescription), remoteAddress = \(self.remoteAddress.debugDescription) }"
}
}

137
Sources/NIO/PipePair.swift Normal file
View File

@ -0,0 +1,137 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2019 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
extension NIOFileHandle: Selectable {
}
final class PipePair: SocketProtocol {
typealias SelectableType = NIOFileHandle
let inputFD: NIOFileHandle
let outputFD: NIOFileHandle
init(inputFD: NIOFileHandle, outputFD: NIOFileHandle) throws {
self.inputFD = inputFD
self.outputFD = outputFD
for fileHandle in [inputFD, outputFD] {
try fileHandle.withUnsafeFileDescriptor {
try NIOFileHandle.setNonBlocking(fileDescriptor: $0)
try ignoreSIGPIPE(descriptor: $0)
}
}
}
var description: String {
return "PipePair { in=\(inputFD), out=\(outputFD) }"
}
func connect(to address: SocketAddress) throws -> Bool {
throw ChannelError.operationUnsupported
}
func finishConnect() throws {
throw ChannelError.operationUnsupported
}
func write(pointer: UnsafeRawBufferPointer) throws -> IOResult<Int> {
return try self.outputFD.withUnsafeFileDescriptor { fd in
try Posix.write(descriptor: fd, pointer: pointer.baseAddress!, size: pointer.count)
}
}
func writev(iovecs: UnsafeBufferPointer<IOVector>) throws -> IOResult<Int> {
return try self.outputFD.withUnsafeFileDescriptor { fd in
try Posix.writev(descriptor: fd, iovecs: iovecs)
}
}
func sendto(pointer: UnsafeRawBufferPointer, destinationPtr: UnsafePointer<sockaddr>, destinationSize: socklen_t) throws -> IOResult<Int> {
throw ChannelError.operationUnsupported
}
func read(pointer: UnsafeMutableRawBufferPointer) throws -> IOResult<Int> {
return try self.inputFD.withUnsafeFileDescriptor { fd in
try Posix.read(descriptor: fd, pointer: pointer.baseAddress!, size: pointer.count)
}
}
func recvfrom(pointer: UnsafeMutableRawBufferPointer, storage: inout sockaddr_storage, storageLen: inout socklen_t) throws -> IOResult<Int> {
throw ChannelError.operationUnsupported
}
func sendFile(fd: Int32, offset: Int, count: Int) throws -> IOResult<Int> {
throw ChannelError.operationUnsupported
}
func recvmmsg(msgs: UnsafeMutableBufferPointer<MMsgHdr>) throws -> IOResult<Int> {
throw ChannelError.operationUnsupported
}
func sendmmsg(msgs: UnsafeMutableBufferPointer<MMsgHdr>) throws -> IOResult<Int> {
throw ChannelError.operationUnsupported
}
func shutdown(how: Shutdown) throws {
switch how {
case .RD:
try self.inputFD.close()
case .WR:
try self.outputFD.close()
case .RDWR:
try self.close()
}
}
var isOpen: Bool {
return self.inputFD.isOpen || self.outputFD.isOpen
}
func close() throws {
guard self.inputFD.isOpen || self.outputFD.isOpen else {
throw ChannelError.alreadyClosed
}
let r1 = Result {
if self.inputFD.isOpen {
try self.inputFD.close()
}
}
let r2 = Result {
if self.outputFD.isOpen {
try self.outputFD.close()
}
}
try r1.get()
try r2.get()
}
func bind(to address: SocketAddress) throws {
throw ChannelError.operationUnsupported
}
func localAddress() throws -> SocketAddress {
throw ChannelError.operationUnsupported
}
func remoteAddress() throws -> SocketAddress {
throw ChannelError.operationUnsupported
}
func setOption<T>(level: Int32, name: Int32, value: T) throws {
throw ChannelError.operationUnsupported
}
func getOption<T>(level: Int32, name: Int32) throws -> T {
throw ChannelError.operationUnsupported
}
}

View File

@ -69,6 +69,11 @@ struct SelectorEventSet: OptionSet, Equatable {
/// Interest in/availability of data to be written
static let write = SelectorEventSet(rawValue: 1 << 3)
/// EOF at the write/output end of a `Selectable`.
///
/// - note: This is rarely used because in many cases, there is no signal that this happened.
static let writeEOF = SelectorEventSet(rawValue: 1 << 4)
init(rawValue: SelectorEventSet.RawValue) {
self.rawValue = rawValue
}
@ -319,6 +324,7 @@ final class Selector<R: Registration> {
}
deinit {
assert(self.registrations.count == 0, "left-over registrations: \(self.registrations)")
assert(self.lifecycleState == .closed, "Selector \(self.lifecycleState) (expected .closed) on deinit")
Selector.deallocateEventsArray(events: events, capacity: eventsCapacity)
@ -644,7 +650,7 @@ extension Selector: CustomStringConvertible {
/// An event that is triggered once the `Selector` was able to select something.
struct SelectorEvent<R> {
public let registration: R
public let io: SelectorEventSet
public var io: SelectorEventSet
/// Create new instance
///
@ -681,6 +687,8 @@ extension Selector where R == NIORegistration {
return closeChannel(chan)
case .datagramChannel(let chan, _):
return closeChannel(chan)
case .pipeChannel(let chan, _, _):
return closeChannel(chan)
}
}.map { future in
future.flatMapErrorThrowing { error in

View File

@ -13,7 +13,9 @@
//===----------------------------------------------------------------------===//
/// A server socket that can accept new connections.
/* final but tests */ class ServerSocket: BaseSocket {
/* final but tests */ class ServerSocket: BaseSocket, ServerSocketProtocol {
typealias SocketType = ServerSocket
public final class func bootstrap(protocolFamily: Int32, host: String, port: Int) throws -> ServerSocket {
let socket = try ServerSocket(protocolFamily: protocolFamily)
try socket.bind(to: SocketAddress.makeAddressResolvingHost(host, port: port))
@ -29,7 +31,7 @@
/// - throws: An `IOError` if creation of the socket failed.
init(protocolFamily: Int32, setNonBlocking: Bool = false) throws {
let sock = try BaseSocket.makeSocket(protocolFamily: protocolFamily, type: Posix.SOCK_STREAM, setNonBlocking: setNonBlocking)
super.init(descriptor: sock)
try super.init(descriptor: sock)
}
/// Create a new instance.
@ -39,7 +41,7 @@
/// - setNonBlocking: Set non-blocking mode on the socket.
/// - throws: An `IOError` if socket is invalid.
init(descriptor: CInt, setNonBlocking: Bool = false) throws {
super.init(descriptor: descriptor)
try super.init(descriptor: descriptor)
if setNonBlocking {
try self.setNonBlocking()
}
@ -86,7 +88,7 @@
guard let fd = result else {
return nil
}
let sock = Socket(descriptor: fd)
let sock = try Socket(descriptor: fd)
#if !os(Linux)
if setNonBlocking {
do {

View File

@ -16,7 +16,8 @@
typealias IOVector = iovec
// TODO: scattering support
/* final but tests */ class Socket: BaseSocket {
/* final but tests */ class Socket: BaseSocket, SocketProtocol {
typealias SocketType = Socket
/// The maximum number of bytes to write per `writev` call.
static var writevLimitBytes = Int(Int32.max)
@ -33,7 +34,7 @@ typealias IOVector = iovec
/// - throws: An `IOError` if creation of the socket failed.
init(protocolFamily: CInt, type: CInt, setNonBlocking: Bool = false) throws {
let sock = try BaseSocket.makeSocket(protocolFamily: protocolFamily, type: type, setNonBlocking: setNonBlocking)
super.init(descriptor: sock)
try super.init(descriptor: sock)
}
/// Create a new instance out of an already established socket.
@ -43,7 +44,7 @@ typealias IOVector = iovec
/// - setNonBlocking: Set non-blocking mode on the socket.
/// - throws: An `IOError` if could not change the socket into non-blocking
init(descriptor: CInt, setNonBlocking: Bool) throws {
super.init(descriptor: descriptor)
try super.init(descriptor: descriptor)
if setNonBlocking {
try self.setNonBlocking()
}
@ -56,8 +57,8 @@ typealias IOVector = iovec
///
/// - parameters:
/// - descriptor: The file descriptor to wrap.
override init(descriptor: CInt) {
super.init(descriptor: descriptor)
override init(descriptor: CInt) throws {
try super.init(descriptor: descriptor)
}
/// Connect to the `SocketAddress`.

View File

@ -12,7 +12,7 @@
//
//===----------------------------------------------------------------------===//
private extension ByteBuffer {
extension ByteBuffer {
mutating func withMutableWritePointer(body: (UnsafeMutableRawBufferPointer) throws -> IOResult<Int>) rethrows -> IOResult<Int> {
var singleResult: IOResult<Int>!
_ = try self.writeWithUnsafeMutableBytes { ptr in
@ -32,42 +32,21 @@ private extension ByteBuffer {
/// A `Channel` for a client socket.
///
/// - note: All operations on `SocketChannel` are thread-safe.
final class SocketChannel: BaseSocketChannel<Socket> {
final class SocketChannel: BaseStreamSocketChannel<Socket> {
private var connectTimeout: TimeAmount? = nil
private var connectTimeoutScheduled: Scheduled<Void>?
private var allowRemoteHalfClosure: Bool = false
private var inputShutdown: Bool = false
private var outputShutdown: Bool = false
private let pendingWrites: PendingStreamWritesManager
// This is `Channel` API so must be thread-safe.
override public var isWritable: Bool {
return pendingWrites.isWritable
}
override var isOpen: Bool {
self.eventLoop.assertInEventLoop()
assert(super.isOpen == self.pendingWrites.isOpen)
return super.isOpen
}
init(eventLoop: SelectableEventLoop, protocolFamily: Int32) throws {
let socket = try Socket(protocolFamily: protocolFamily, type: Posix.SOCK_STREAM, setNonBlocking: true)
self.pendingWrites = PendingStreamWritesManager(iovecs: eventLoop.iovecs, storageRefs: eventLoop.storageRefs)
try super.init(socket: socket, eventLoop: eventLoop, recvAllocator: AdaptiveRecvByteBufferAllocator())
try super.init(socket: socket, parent: nil, eventLoop: eventLoop, recvAllocator: AdaptiveRecvByteBufferAllocator())
}
init(eventLoop: SelectableEventLoop, descriptor: CInt) throws {
let socket = try Socket(descriptor: descriptor, setNonBlocking: true)
self.pendingWrites = PendingStreamWritesManager(iovecs: eventLoop.iovecs, storageRefs: eventLoop.storageRefs)
try super.init(socket: socket, eventLoop: eventLoop, recvAllocator: AdaptiveRecvByteBufferAllocator())
try super.init(socket: socket, parent: nil, eventLoop: eventLoop, recvAllocator: AdaptiveRecvByteBufferAllocator())
}
deinit {
// We should never have any pending writes left as otherwise we may leak callbacks
assert(pendingWrites.isEmpty)
init(socket: Socket, parent: Channel? = nil, eventLoop: SelectableEventLoop) throws {
try super.init(socket: socket, parent: parent, eventLoop: eventLoop, recvAllocator: AdaptiveRecvByteBufferAllocator())
}
override func setOption0<Option: ChannelOption>(_ option: Option, value: Option.Value) throws {
@ -80,12 +59,6 @@ final class SocketChannel: BaseSocketChannel<Socket> {
switch option {
case _ as ConnectTimeoutOption:
connectTimeout = value as? TimeAmount
case _ as AllowRemoteHalfClosureOption:
allowRemoteHalfClosure = value as! Bool
case _ as WriteSpinOption:
pendingWrites.writeSpinCount = value as! UInt
case _ as WriteBufferWaterMarkOption:
pendingWrites.waterMark = value as! WriteBufferWaterMark
default:
try super.setOption0(option, value: value)
}
@ -101,99 +74,15 @@ final class SocketChannel: BaseSocketChannel<Socket> {
switch option {
case _ as ConnectTimeoutOption:
return connectTimeout as! Option.Value
case _ as AllowRemoteHalfClosureOption:
return allowRemoteHalfClosure as! Option.Value
case _ as WriteSpinOption:
return pendingWrites.writeSpinCount as! Option.Value
case _ as WriteBufferWaterMarkOption:
return pendingWrites.waterMark as! Option.Value
default:
return try super.getOption0(option)
}
}
override func registrationFor(interested: SelectorEventSet) -> NIORegistration {
func registrationFor(interested: SelectorEventSet) -> NIORegistration {
return .socketChannel(self, interested)
}
init(socket: Socket, parent: Channel? = nil, eventLoop: SelectableEventLoop) throws {
self.pendingWrites = PendingStreamWritesManager(iovecs: eventLoop.iovecs, storageRefs: eventLoop.storageRefs)
try super.init(socket: socket, parent: parent, eventLoop: eventLoop, recvAllocator: AdaptiveRecvByteBufferAllocator())
}
override func readFromSocket() throws -> ReadResult {
self.eventLoop.assertInEventLoop()
// Just allocate one time for the while read loop. This is fine as ByteBuffer is a struct and uses COW.
var buffer = recvAllocator.buffer(allocator: allocator)
var result = ReadResult.none
for i in 1...maxMessagesPerRead {
guard self.isOpen && !self.inputShutdown else {
throw ChannelError.eof
}
// Reset reader and writerIndex and so allow to have the buffer filled again. This is better here than at
// the end of the loop to not do an allocation when the loop exits.
buffer.clear()
switch try buffer.withMutableWritePointer(body: self.socket.read(pointer:)) {
case .processed(let bytesRead):
if bytesRead > 0 {
let mayGrow = recvAllocator.record(actualReadBytes: bytesRead)
readPending = false
assert(self.isActive)
pipeline.fireChannelRead0(NIOAny(buffer))
result = .some
if buffer.writableBytes > 0 {
// If we did not fill the whole buffer with read(...) we should stop reading and wait until we get notified again.
// Otherwise chances are good that the next read(...) call will either read nothing or only a very small amount of data.
// Also this will allow us to call fireChannelReadComplete() which may give the user the chance to flush out all pending
// writes.
return result
} else if mayGrow && i < maxMessagesPerRead {
// if the ByteBuffer may grow on the next allocation due we used all the writable bytes we should allocate a new `ByteBuffer` to allow ramping up how much data
// we are able to read on the next read operation.
buffer = recvAllocator.buffer(allocator: allocator)
}
} else {
if inputShutdown {
// We received a EOF because we called shutdown on the fd by ourself, unregister from the Selector and return
readPending = false
unregisterForReadable()
return result
}
// end-of-file
throw ChannelError.eof
}
case .wouldBlock(let bytesRead):
assert(bytesRead == 0)
return result
}
}
return result
}
override func writeToSocket() throws -> OverallWriteResult {
let result = try self.pendingWrites.triggerAppropriateWriteOperations(scalarBufferWriteOperation: { ptr in
guard ptr.count > 0 else {
// No need to call write if the buffer is empty.
return .processed(0)
}
// normal write
return try self.socket.write(pointer: ptr)
}, vectorBufferWriteOperation: { ptrs in
// Gathering write
try self.socket.writev(iovecs: ptrs)
}, scalarFileWriteOperation: { descriptor, index, endIndex in
try self.socket.sendFile(fd: descriptor, offset: index, count: endIndex - index)
})
if result.writable {
// writable again
self.pipeline.fireChannelWritabilityChanged0()
}
return result.writeResult
}
override func connectSocket(to address: SocketAddress) throws -> Bool {
if try self.socket.connect(to: address) {
return true
@ -219,88 +108,18 @@ final class SocketChannel: BaseSocketChannel<Socket> {
try self.socket.finishConnect()
}
override func close0(error: Error, mode: CloseMode, promise: EventLoopPromise<Void>?) {
do {
switch mode {
case .output:
if outputShutdown {
promise?.fail(ChannelError.outputClosed)
return
}
try socket.shutdown(how: .WR)
outputShutdown = true
// Fail all pending writes and so ensure all pending promises are notified
pendingWrites.failAll(error: error, close: false)
unregisterForWritable()
promise?.succeed(())
pipeline.fireUserInboundEventTriggered(ChannelEvent.outputClosed)
case .input:
if inputShutdown {
promise?.fail(ChannelError.inputClosed)
return
}
switch error {
case ChannelError.eof:
// No need to explicit call socket.shutdown(...) as we received an EOF and the call would only cause
// ENOTCON
break
default:
try socket.shutdown(how: .RD)
}
inputShutdown = true
unregisterForReadable()
promise?.succeed(())
pipeline.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
case .all:
if let timeout = connectTimeoutScheduled {
connectTimeoutScheduled = nil
timeout.cancel()
}
super.close0(error: error, mode: mode, promise: promise)
}
} catch let err {
promise?.fail(err)
}
override func register(selector: Selector<NIORegistration>, interested: SelectorEventSet) throws {
try selector.register(selectable: self.socket, interested: interested, makeRegistration: self.registrationFor(interested:))
}
override func markFlushPoint() {
// Even if writable() will be called later by the EventLoop we still need to mark the flush checkpoint so we are sure all the flushed messages
// are actually written once writable() is called.
self.pendingWrites.markFlushCheckpoint()
override func deregister(selector: Selector<NIORegistration>, mode: CloseMode) throws {
assert(mode == .all)
try selector.deregister(selectable: self.socket)
}
override func cancelWritesOnClose(error: Error) {
self.pendingWrites.failAll(error: error, close: true)
}
@discardableResult override func readIfNeeded0() -> Bool {
if inputShutdown {
return false
}
return super.readIfNeeded0()
}
override public func read0() {
if inputShutdown {
return
}
super.read0()
}
override func bufferPendingWrite(data: NIOAny, promise: EventLoopPromise<Void>?) {
if outputShutdown {
promise?.fail(ChannelError.outputClosed)
return
}
let data = data.forceAsIOData()
if !self.pendingWrites.add(data: data, promise: promise) {
pipeline.fireChannelWritabilityChanged0()
}
override func reregister(selector: Selector<NIORegistration>, interested: SelectorEventSet) throws {
try selector.reregister(selectable: self.socket, interested: interested)
}
}
@ -322,7 +141,10 @@ final class ServerSocketChannel: BaseSocketChannel<ServerSocket> {
init(serverSocket: ServerSocket, eventLoop: SelectableEventLoop, group: EventLoopGroup) throws {
self.group = group
try super.init(socket: serverSocket, eventLoop: eventLoop, recvAllocator: AdaptiveRecvByteBufferAllocator())
try super.init(socket: serverSocket,
parent: nil,
eventLoop: eventLoop,
recvAllocator: AdaptiveRecvByteBufferAllocator())
}
convenience init(descriptor: CInt, eventLoop: SelectableEventLoop, group: EventLoopGroup) throws {
@ -331,7 +153,7 @@ final class ServerSocketChannel: BaseSocketChannel<ServerSocket> {
try self.socket.listen(backlog: backlog)
}
override func registrationFor(interested: SelectorEventSet) -> NIORegistration {
func registrationFor(interested: SelectorEventSet) -> NIORegistration {
return .serverSocketChannel(self, interested)
}
@ -480,6 +302,19 @@ final class ServerSocketChannel: BaseSocketChannel<ServerSocket> {
override func flushNow() -> IONotificationState {
return IONotificationState.unregister
}
override func register(selector: Selector<NIORegistration>, interested: SelectorEventSet) throws {
try selector.register(selectable: self.socket, interested: interested, makeRegistration: self.registrationFor(interested:))
}
override func deregister(selector: Selector<NIORegistration>, mode: CloseMode) throws {
assert(mode == .all)
try selector.deregister(selectable: self.socket)
}
override func reregister(selector: Selector<NIORegistration>, interested: SelectorEventSet) throws {
try selector.reregister(selectable: self.socket, interested: interested)
}
}
/// A channel used with datagram sockets.
@ -513,7 +348,7 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
}
convenience init(eventLoop: SelectableEventLoop, descriptor: CInt) throws {
let socket = Socket(descriptor: descriptor)
let socket = try Socket(descriptor: descriptor)
do {
try self.init(socket: socket, eventLoop: eventLoop)
@ -543,7 +378,10 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
addresses: eventLoop.addresses,
storageRefs: eventLoop.storageRefs)
try super.init(socket: socket, eventLoop: eventLoop, recvAllocator: FixedSizeRecvByteBufferAllocator(capacity: 2048))
try super.init(socket: socket,
parent: nil,
eventLoop: eventLoop,
recvAllocator: FixedSizeRecvByteBufferAllocator(capacity: 2048))
}
init(socket: Socket, parent: Channel? = nil, eventLoop: SelectableEventLoop) throws {
@ -600,7 +438,7 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
}
}
override func registrationFor(interested: SelectorEventSet) -> NIORegistration {
func registrationFor(interested: SelectorEventSet) -> NIORegistration {
return .datagramChannel(self, interested)
}
@ -782,23 +620,36 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
promise?.fail(err)
}
}
override func register(selector: Selector<NIORegistration>, interested: SelectorEventSet) throws {
try selector.register(selectable: self.socket, interested: interested, makeRegistration: self.registrationFor(interested:))
}
override func deregister(selector: Selector<NIORegistration>, mode: CloseMode) throws {
assert(mode == .all)
try selector.deregister(selectable: self.socket)
}
override func reregister(selector: Selector<NIORegistration>, interested: SelectorEventSet) throws {
try selector.reregister(selectable: self.socket, interested: interested)
}
}
extension SocketChannel: CustomStringConvertible {
var description: String {
return "SocketChannel { \(self.socketDescription), localAddress = \(self.localAddress.debugDescription), remoteAddress = \(self.remoteAddress.debugDescription) }"
return "SocketChannel { \(self.socketDescription), active = \(self.isActive), localAddress = \(self.localAddress.debugDescription), remoteAddress = \(self.remoteAddress.debugDescription) }"
}
}
extension ServerSocketChannel: CustomStringConvertible {
var description: String {
return "ServerSocketChannel { \(self.socketDescription), localAddress = \(self.localAddress.debugDescription), remoteAddress = \(self.remoteAddress.debugDescription) }"
return "ServerSocketChannel { \(self.socketDescription), active = \(self.isActive), localAddress = \(self.localAddress.debugDescription), remoteAddress = \(self.remoteAddress.debugDescription) }"
}
}
extension DatagramChannel: CustomStringConvertible {
var description: String {
return "DatagramChannel { \(self.socketDescription), localAddress = \(self.localAddress.debugDescription), remoteAddress = \(self.remoteAddress.debugDescription) }"
return "DatagramChannel { \(self.socketDescription), active = \(self.isActive), localAddress = \(self.localAddress.debugDescription), remoteAddress = \(self.remoteAddress.debugDescription) }"
}
}

View File

@ -0,0 +1,90 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
protocol BaseSocketProtocol: CustomStringConvertible {
associatedtype SelectableType: Selectable
var isOpen: Bool { get }
func close() throws
func bind(to address: SocketAddress) throws
func localAddress() throws -> SocketAddress
func remoteAddress() throws -> SocketAddress
func setOption<T>(level: Int32, name: Int32, value: T) throws
func getOption<T>(level: Int32, name: Int32) throws -> T
}
protocol ServerSocketProtocol: BaseSocketProtocol {
func listen(backlog: Int32) throws
func accept(setNonBlocking: Bool) throws -> Socket?
}
protocol SocketProtocol: BaseSocketProtocol {
func connect(to address: SocketAddress) throws -> Bool
func finishConnect() throws
func write(pointer: UnsafeRawBufferPointer) throws -> IOResult<Int>
func writev(iovecs: UnsafeBufferPointer<IOVector>) throws -> IOResult<Int>
func sendto(pointer: UnsafeRawBufferPointer, destinationPtr: UnsafePointer<sockaddr>, destinationSize: socklen_t) throws -> IOResult<Int>
func read(pointer: UnsafeMutableRawBufferPointer) throws -> IOResult<Int>
func recvfrom(pointer: UnsafeMutableRawBufferPointer, storage: inout sockaddr_storage, storageLen: inout socklen_t) throws -> IOResult<Int>
func sendFile(fd: Int32, offset: Int, count: Int) throws -> IOResult<Int>
func recvmmsg(msgs: UnsafeMutableBufferPointer<MMsgHdr>) throws -> IOResult<Int>
func sendmmsg(msgs: UnsafeMutableBufferPointer<MMsgHdr>) throws -> IOResult<Int>
func shutdown(how: Shutdown) throws
}
#if os(Linux)
// This is a lazily initialised global variable that when read for the first time, will ignore SIGPIPE.
private let globallyIgnoredSIGPIPE: Bool = {
/* no F_SETNOSIGPIPE on Linux :( */
_ = Glibc.signal(SIGPIPE, SIG_IGN)
return true
}()
#endif
extension BaseSocketProtocol {
// used by `BaseSocket` and `PipePair`.
internal func ignoreSIGPIPE(descriptor fd: CInt) throws {
#if os(Linux)
let haveWeIgnoredSIGPIEThisIsHereToTriggerIgnoringIt = globallyIgnoredSIGPIPE
guard haveWeIgnoredSIGPIEThisIsHereToTriggerIgnoringIt else {
fatalError("BUG in NIO. We did not ignore SIGPIPE, this code path should definitely not be reachable.")
}
#else
assert(fd >= 0, "illegal file descriptor \(fd)")
do {
try Posix.fcntl(descriptor: fd, command: F_SETNOSIGPIPE, value: 1)
} catch {
_ = try? Posix.close(descriptor: fd) // don't care about failure here
throw error
}
#endif
}
}

View File

@ -87,9 +87,11 @@ private let sysAF_UNIX = AF_UNIX
private let sysInet_ntop: @convention(c) (CInt, UnsafeRawPointer?, UnsafeMutablePointer<CChar>?, socklen_t) -> UnsafePointer<CChar>? = inet_ntop
#if os(Linux)
private let sysFstat: @convention(c) (CInt, UnsafeMutablePointer<stat>) -> CInt = fstat
private let sysSendMmsg: @convention(c) (CInt, UnsafeMutablePointer<CNIOLinux_mmsghdr>?, CUnsignedInt, CInt) -> CInt = CNIOLinux_sendmmsg
private let sysRecvMmsg: @convention(c) (CInt, UnsafeMutablePointer<CNIOLinux_mmsghdr>?, CUnsignedInt, CInt, UnsafeMutablePointer<timespec>?) -> CInt = CNIOLinux_recvmmsg
#else
private let sysFstat: @convention(c) (CInt, UnsafeMutablePointer<stat>?) -> CInt = fstat
private let sysKevent = kevent
private let sysSendMmsg: @convention(c) (CInt, UnsafeMutablePointer<CNIODarwin_mmsghdr>?, CUnsignedInt, CInt) -> CInt = CNIODarwin_sendmmsg
private let sysRecvMmsg: @convention(c) (CInt, UnsafeMutablePointer<CNIODarwin_mmsghdr>?, CUnsignedInt, CInt, UnsafeMutablePointer<timespec>?) -> CInt = CNIODarwin_recvmmsg
@ -266,22 +268,7 @@ internal enum Posix {
@inline(never)
public static func socket(domain: CInt, type: CInt, `protocol`: CInt) throws -> CInt {
return try wrapSyscall {
let fd = sysSocket(domain, type, `protocol`)
#if os(Linux)
/* no SO_NOSIGPIPE on Linux :( */
_ = unsafeBitCast(Glibc.signal(SIGPIPE, SIG_IGN) as sighandler_t?, to: Int.self)
#else
if fd != -1 {
do {
try Posix.fcntl(descriptor: fd, command: F_SETNOSIGPIPE, value: 1)
} catch {
_ = sysClose(fd) // don't care about failure here
throw error
}
}
#endif
return fd
return sysSocket(domain, type, `protocol`)
}
}
@ -503,6 +490,13 @@ internal enum Posix {
sysPoll(fds, nfds, timeout)
}
}
@inline(never)
public static func fstat(descriptor: CInt, outStat: UnsafeMutablePointer<stat>) throws {
_ = try wrapSyscall {
sysFstat(descriptor, outStat)
}
}
}
#if os(macOS) || os(iOS) || os(watchOS) || os(tvOS)

View File

@ -485,6 +485,7 @@ let defaultHtdocs = "/dev/null/"
enum BindTo {
case ip(host: String, port: Int)
case unixDomainSocket(path: String)
case stdio
}
let htdocs: String
@ -500,8 +501,12 @@ case (_, .some(let p), let maybeHtdocs, _, _):
bindTarget = .ip(host: defaultHost, port: p)
htdocs = maybeHtdocs ?? defaultHtdocs
case (.some(let portString), .none, let maybeHtdocs, .none, .none):
/* couldn't parse as number --> uds-path [htdocs] */
bindTarget = .unixDomainSocket(path: portString)
/* couldn't parse as number --> uds-path-or-stdio [htdocs] */
if portString == "-" {
bindTarget = .stdio
} else {
bindTarget = .unixDomainSocket(path: portString)
}
htdocs = maybeHtdocs ?? defaultHtdocs
default:
htdocs = defaultHtdocs
@ -512,23 +517,31 @@ let group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
let threadPool = NIOThreadPool(numberOfThreads: 6)
threadPool.start()
func childChannelInitializer(channel: Channel) -> EventLoopFuture<Void> {
return channel.pipeline.configureHTTPServerPipeline(withErrorHandling: true).flatMap {
channel.pipeline.addHandler(HTTPHandler(fileIO: fileIO, htdocsPath: htdocs))
}
}
let fileIO = NonBlockingFileIO(threadPool: threadPool)
let bootstrap = ServerBootstrap(group: group)
let socketBootstrap = ServerBootstrap(group: group)
// Specify backlog and enable SO_REUSEADDR for the server itself
.serverChannelOption(ChannelOptions.backlog, value: 256)
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
// Set the handlers that are applied to the accepted Channels
.childChannelInitializer { channel in
channel.pipeline.configureHTTPServerPipeline(withErrorHandling: true).flatMap {
channel.pipeline.addHandler(HTTPHandler(fileIO: fileIO, htdocsPath: htdocs))
}
}
.childChannelInitializer(childChannelInitializer(channel:))
// Enable SO_REUSEADDR for the accepted Channels
.childChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.childChannelOption(ChannelOptions.maxMessagesPerRead, value: 1)
.childChannelOption(ChannelOptions.allowRemoteHalfClosure, value: allowHalfClosure)
let pipeBootstrap = NIOPipeBootstrap(group: group)
// Set the handlers that are applied to the accepted Channels
.channelInitializer(childChannelInitializer(channel:))
.channelOption(ChannelOptions.maxMessagesPerRead, value: 1)
.channelOption(ChannelOptions.allowRemoteHalfClosure, value: allowHalfClosure)
defer {
try! group.syncShutdownGracefully()
@ -540,14 +553,23 @@ print("htdocs = \(htdocs)")
let channel = try { () -> Channel in
switch bindTarget {
case .ip(let host, let port):
return try bootstrap.bind(host: host, port: port).wait()
return try socketBootstrap.bind(host: host, port: port).wait()
case .unixDomainSocket(let path):
return try bootstrap.bind(unixDomainSocketPath: path).wait()
print("hi \(path)")
return try socketBootstrap.bind(unixDomainSocketPath: path).wait()
case .stdio:
return try pipeBootstrap.withPipes(inputDescriptor: STDIN_FILENO, outputDescriptor: STDOUT_FILENO).wait()
}
}()
guard let localAddress = channel.localAddress else {
fatalError("Address was unable to bind. Please check that the socket was not closed or that the address family was understood.")
let localAddress: String
if case .stdio = bindTarget {
localAddress = "STDIO"
} else {
guard let channelLocalAddress = channel.localAddress else {
fatalError("Address was unable to bind. Please check that the socket was not closed or that the address family was understood.")
}
localAddress = "\(channelLocalAddress)"
}
print("Server started and listening on \(localAddress), htdocs path \(htdocs)")

View File

@ -84,12 +84,14 @@ import XCTest
testCase(NIOHTTP1TestServerTest.allTests),
testCase(NonBlockingFileIOTest.allTests),
testCase(PendingDatagramWritesManagerTests.allTests),
testCase(PipeChannelTest.allTests),
testCase(PriorityQueueTest.allTests),
testCase(SNIHandlerTest.allTests),
testCase(SelectorTest.allTests),
testCase(SocketAddressTest.allTests),
testCase(SocketChannelTest.allTests),
testCase(SocketOptionProviderTest.allTests),
testCase(StreamChannelTest.allTests),
testCase(SystemTest.allTests),
testCase(ThreadTest.allTests),
testCase(TypeAssistedChannelHandlerTest.allTests),

View File

@ -1933,7 +1933,7 @@ public final class ChannelTests: XCTestCase {
// In here what we're doing is that we flip the order around and connect it first, make sure the server
// has written something and then on registration something is available to be read. We then 'fake connect'
// again which our special `Socket` subclass will let succeed.
_ = try sc.selectable.connect(to: bootstrap.localAddress!)
_ = try sc.socket.connect(to: bootstrap.localAddress!)
try serverWriteHappenedPromise.futureResult.wait()
try sc.pipeline.addHandler(ReadDoesNotHappen(hasRegisteredPromise: clientHasRegistered,
hasUnregisteredPromise: clientHasUnregistered,

View File

@ -34,8 +34,7 @@ class NIOAnyDebugTest: XCTestCase {
FileRegion { \
handle: \
FileHandle \
{ descriptor: 1, \
isOpen: \(fileHandle.isOpen) \
{ descriptor: 1 \
}, \
readerIndex: \(fileRegion.readerIndex), \
endIndex: \(fileRegion.endIndex) }

View File

@ -0,0 +1,35 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
//
// PipeChannelTest+XCTest.swift
//
import XCTest
///
/// NOTE: This file was generated by generate_linux_tests.rb
///
/// Do NOT edit this file directly as it will be regenerated automatically when needed.
///
extension PipeChannelTest {
static var allTests : [(String, (PipeChannelTest) -> () throws -> Void)] {
return [
("testBasicIO", testBasicIO),
("testWriteErrorsCloseChannel", testWriteErrorsCloseChannel),
("testWeDontAcceptRegularFiles", testWeDontAcceptRegularFiles),
]
}
}

View File

@ -0,0 +1,164 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import XCTest
import Foundation
@testable import NIO
import NIOTestUtils
final class PipeChannelTest: XCTestCase {
var group: MultiThreadedEventLoopGroup! = nil
var channel: Channel! = nil
var toChannel: FileHandle! = nil
var fromChannel: FileHandle! = nil
var buffer: ByteBuffer! = nil
var eventLoop: SelectableEventLoop {
return self.group.next() as! SelectableEventLoop
}
override func setUp() {
self.group = .init(numberOfThreads: 1)
XCTAssertNoThrow(try withPipe { pipe1Read, pipe1Write in
try withPipe { pipe2Read, pipe2Write in
self.toChannel = try pipe1Write.withUnsafeFileDescriptor { fd in
FileHandle(fileDescriptor: fd, closeOnDealloc: false)
}
self.fromChannel = try pipe2Read.withUnsafeFileDescriptor { fd in
FileHandle(fileDescriptor: fd, closeOnDealloc: false)
}
try pipe1Read.withUnsafeFileDescriptor { channelIn in
try pipe2Write.withUnsafeFileDescriptor { channelOut in
let channel = NIOPipeBootstrap(group: self.group)
.withPipes(inputDescriptor: channelIn,
outputDescriptor: channelOut)
XCTAssertNoThrow(self.channel = try channel.wait())
}
}
for pipe in [pipe1Read, pipe1Write, pipe2Read, pipe2Write] {
XCTAssertNoThrow(try pipe.takeDescriptorOwnership())
}
return [] // we may leak the file handles because we take care of closing
}
return [] // we may leak the file handles because we take care of closing
})
self.buffer = self.channel.allocator.buffer(capacity: 128)
}
override func tearDown() {
self.buffer = nil
self.toChannel.closeFile()
self.fromChannel.closeFile()
self.toChannel = nil
self.fromChannel = nil
XCTAssertNoThrow(try self.channel.syncCloseAcceptingAlreadyClosed())
XCTAssertNoThrow(try self.group.syncShutdownGracefully())
}
func testBasicIO() throws {
class Handler: ChannelInboundHandler {
typealias InboundIn = ByteBuffer
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
context.writeAndFlush(data).whenFailure { error in
XCTFail("unexpected error: \(error)")
}
}
}
XCTAssertTrue(self.channel.isActive)
XCTAssertNoThrow(try self.channel.pipeline.addHandler(Handler()).wait())
let longArray = Array(repeating: UInt8(ascii: "x"), count: 200_000)
for length in [1, 10_000, 100_000, 200_000] {
let fromChannel = self.fromChannel!
XCTAssertNoThrow(try self.toChannel.writeBytes(longArray[0 ..< length]))
let data = try? fromChannel.readBytes(ofExactLength: length)
XCTAssertEqual(Array(longArray[0 ..< length]), data)
}
XCTAssertNoThrow(try self.channel.close().wait())
}
func testWriteErrorsCloseChannel() {
XCTAssertNoThrow(try self.channel.setOption(ChannelOptions.allowRemoteHalfClosure, value: true).wait())
self.fromChannel.closeFile()
var buffer = self.channel.allocator.buffer(capacity: 1)
buffer.writeString("X")
XCTAssertThrowsError(try self.channel.writeAndFlush(buffer).wait()) { error in
if let error = error as? IOError {
XCTAssert([EPIPE, EBADF].contains(error.errnoCode), "unexpected errno: \(error)")
} else {
XCTFail("unexpected error: \(error)")
}
}
XCTAssertNoThrow(try self.channel.closeFuture.wait())
}
func testWeDontAcceptRegularFiles() throws {
try withPipe { pipeIn, pipeOut in
try withTemporaryFile { fileFH, path in
try fileFH.withUnsafeFileDescriptor { fileFHDescriptor in
try pipeIn.withUnsafeFileDescriptor { pipeInDescriptor in
try pipeOut.withUnsafeFileDescriptor { pipeOutDescriptor in
XCTAssertThrowsError(try NIOPipeBootstrap(group: self.group)
.withPipes(inputDescriptor: fileFHDescriptor,
outputDescriptor: pipeOutDescriptor).wait()) { error in
XCTAssertEqual(ChannelError.operationUnsupported, error as? ChannelError)
}
XCTAssertThrowsError(try NIOPipeBootstrap(group: self.group)
.withPipes(inputDescriptor: pipeInDescriptor,
outputDescriptor: fileFHDescriptor).wait()) { error in
XCTAssertEqual(ChannelError.operationUnsupported, error as? ChannelError)
}
XCTAssertThrowsError(try NIOPipeBootstrap(group: self.group)
.withPipes(inputDescriptor: fileFHDescriptor,
outputDescriptor: fileFHDescriptor).wait()) { error in
XCTAssertEqual(ChannelError.operationUnsupported, error as? ChannelError)
}
}
}
}
}
return [pipeIn, pipeOut]
}
}
}
extension FileHandle {
func writeBytes(_ bytes: ByteBuffer) throws {
try self.writeBytes(Array(bytes.readableBytesView))
}
func writeBytes(_ bytes: ArraySlice<UInt8>) throws {
bytes.withUnsafeBytes {
self.write(Data(bytesNoCopy: .init(mutating: $0.baseAddress!), count: $0.count, deallocator: .none))
}
}
func writeBytes(_ bytes: [UInt8]) throws {
try self.writeBytes(bytes[...])
}
func readBytes(ofExactLength completeLength: Int) throws -> [UInt8] {
var buffer: [UInt8] = []
buffer.reserveCapacity(completeLength)
var remaining = completeLength
while remaining > 0 {
buffer.append(contentsOf: self.readData(ofLength: remaining))
remaining = completeLength - buffer.count
}
return buffer
}
}

View File

@ -376,9 +376,9 @@ class SelectorTest: XCTestCase {
}
class FakeSocket: Socket {
private let hasBeenClosedPromise: EventLoopPromise<Void>
init(hasBeenClosedPromise: EventLoopPromise<Void>, descriptor: CInt) {
init(hasBeenClosedPromise: EventLoopPromise<Void>, descriptor: CInt) throws {
self.hasBeenClosedPromise = hasBeenClosedPromise
super.init(descriptor: descriptor)
try super.init(descriptor: descriptor)
}
override func close() throws {
self.hasBeenClosedPromise.succeed(())

View File

@ -0,0 +1,40 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
//
// StreamChannelsTest+XCTest.swift
//
import XCTest
///
/// NOTE: This file was generated by generate_linux_tests.rb
///
/// Do NOT edit this file directly as it will be regenerated automatically when needed.
///
extension StreamChannelTest {
static var allTests : [(String, (StreamChannelTest) -> () throws -> Void)] {
return [
("testEchoBasic", testEchoBasic),
("testWritabilityStartsTrueGoesFalseAndBackToTrue", testWritabilityStartsTrueGoesFalseAndBackToTrue),
("testHalfCloseOwnOutput", testHalfCloseOwnOutput),
("testHalfCloseOwnInput", testHalfCloseOwnInput),
("testDoubleShutdownInput", testDoubleShutdownInput),
("testDoubleShutdownOutput", testDoubleShutdownOutput),
("testWriteFailsAfterOutputClosed", testWriteFailsAfterOutputClosed),
("testVectorWrites", testVectorWrites),
]
}
}

View File

@ -0,0 +1,278 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2019 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import XCTest
@testable import NIO
class StreamChannelTest: XCTestCase {
var buffer: ByteBuffer! = nil
override func setUp() {
self.buffer = ByteBufferAllocator().buffer(capacity: 128)
}
override func tearDown() {
self.buffer = nil
}
func testEchoBasic() throws {
class EchoHandler: ChannelInboundHandler {
typealias InboundIn = ByteBuffer
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
context.write(data, promise: nil)
}
func channelReadComplete(context: ChannelHandlerContext) {
context.flush()
}
}
func runTest(chan1: Channel, chan2: Channel) throws {
var everythingBuffer = chan1.allocator.buffer(capacity: 300000)
let allDonePromise = chan1.eventLoop.makePromise(of: ByteBuffer.self)
XCTAssertNoThrow(try chan1.pipeline.addHandler(EchoHandler()).wait())
XCTAssertNoThrow(try chan2.pipeline.addHandler(AccumulateAllReads(allDonePromise: allDonePromise)).wait())
for f in [1, 10, 100, 1_000, 10_000, 300_000] {
let from = everythingBuffer.writerIndex
everythingBuffer.writeString("\(f)")
everythingBuffer.writeBytes(repeatElement(UInt8(ascii: "x"), count: f))
XCTAssertNoThrow(chan2.writeAndFlush(everythingBuffer.getSlice(at: from,
length: everythingBuffer.writerIndex - from)!))
}
let from = everythingBuffer.writerIndex
everythingBuffer.writeString("$") // magic end marker that will cause the channel to close
XCTAssertNoThrow(chan2.writeAndFlush(everythingBuffer.getSlice(at: from, length: 1)!))
XCTAssertNoThrow(XCTAssertEqual(everythingBuffer, try allDonePromise.futureResult.wait()))
}
XCTAssertNoThrow(try forEachCrossConnectedStreamChannelPair(runTest))
}
func testWritabilityStartsTrueGoesFalseAndBackToTrue() throws {
class WritabilityTrackerStateMachine: ChannelInboundHandler {
typealias InboundIn = Never
typealias OutboundOut = ByteBuffer
enum State: Int {
case beginsTrue = 0
case thenFalse = 1
case thenTrueAgain = 2
}
var channelWritabilityChangedCalls = 0
var state = State.beginsTrue
let writabilityNowFalsePromise: EventLoopPromise<Void>
let writeFullyDonePromise: EventLoopPromise<Void>
init(writabilityNowFalsePromise: EventLoopPromise<Void>,
writeFullyDonePromise: EventLoopPromise<Void>) {
self.writabilityNowFalsePromise = writabilityNowFalsePromise
self.writeFullyDonePromise = writeFullyDonePromise
}
func handlerAdded(context: ChannelHandlerContext) {
// 5 MB, this must be safely more than send buffer + receive buffer. The reason is that we don't want
// the overall write to complete before we make the other end of the channel readable.
let totalAmount = 5 * 1024 * 1024
let chunkSize = 10 * 1024
XCTAssertEqual(.beginsTrue, self.state)
self.state = .thenFalse
XCTAssertEqual(true, context.channel.isWritable)
var buffer = context.channel.allocator.buffer(capacity: chunkSize)
buffer.writeBytes(repeatElement(UInt8(ascii: "x"), count: chunkSize))
for _ in 0 ..< (totalAmount / chunkSize) {
context.write(self.wrapOutboundOut(buffer)).whenFailure { error in
XCTFail("unexpected error \(error)")
}
}
context.write(self.wrapOutboundOut(buffer)).map {
XCTAssertEqual(self.state, .thenTrueAgain)
}.recover { error in
XCTFail("unexpected error \(error)")
}.cascade(to: self.writeFullyDonePromise)
context.flush()
}
func channelWritabilityChanged(context: ChannelHandlerContext) {
self.channelWritabilityChangedCalls += 1
XCTAssertEqual(self.state.rawValue % 2 == 0, context.channel.isWritable)
XCTAssertEqual(State(rawValue: self.channelWritabilityChangedCalls), self.state)
if let newState = State(rawValue: self.channelWritabilityChangedCalls + 1) {
if self.state == .thenFalse {
context.eventLoop.scheduleTask(in: .microseconds(100)) {
// Let's delay this a tiny little bit just so we get a higher chance to actually exhaust all
// the buffers. The delay is not necessary for this test to work but it makes the tests a
// little bit harder.
self.writabilityNowFalsePromise.succeed(())
}
}
self.state = newState
}
}
}
func runTest(chan1: Channel, chan2: Channel) throws {
let allDonePromise = chan1.eventLoop.makePromise(of: ByteBuffer.self)
let writabilityFalsePromise = chan1.eventLoop.makePromise(of: Void.self)
let writeFullyDonePromise = chan1.eventLoop.makePromise(of: Void.self)
XCTAssertNoThrow(try chan2.setOption(ChannelOptions.autoRead, value: false).wait())
XCTAssertNoThrow(try chan2.pipeline.addHandler(AccumulateAllReads(allDonePromise: allDonePromise)).wait())
XCTAssertNoThrow(try chan1.pipeline.addHandler(WritabilityTrackerStateMachine(writabilityNowFalsePromise: writabilityFalsePromise,
writeFullyDonePromise: writeFullyDonePromise)).wait())
// Writability should turn false because we're writing lots of data and we aren't reading.
XCTAssertNoThrow(try writabilityFalsePromise.futureResult.wait())
// Ok, let's read.
XCTAssertNoThrow(try chan2.setOption(ChannelOptions.autoRead, value: true).wait())
// Which should lead to the write to complete.
XCTAssertNoThrow(try writeFullyDonePromise.futureResult.wait())
// To finish up, let's just tear this down.
XCTAssertNoThrow(try chan2.close().wait())
XCTAssertNoThrow(try chan1.closeFuture.wait())
}
XCTAssertNoThrow(try forEachCrossConnectedStreamChannelPair(runTest))
}
func testHalfCloseOwnOutput() throws {
func runTest(chan1: Channel, chan2: Channel) throws {
let readPromise = chan2.eventLoop.makePromise(of: Void.self)
let eofPromise = chan1.eventLoop.makePromise(of: Void.self)
XCTAssertNoThrow(try chan1.setOption(ChannelOptions.allowRemoteHalfClosure, value: true).wait())
XCTAssertNoThrow(try chan1.pipeline.addHandler(FulfillOnFirstEventHandler(userInboundEventTriggeredPromise: eofPromise)).wait())
// let's close chan2's output
XCTAssertNoThrow(try chan2.close(mode: .output).wait())
XCTAssertNoThrow(try eofPromise.futureResult.wait())
self.buffer.writeString("X")
XCTAssertNoThrow(try chan2.pipeline.addHandler(FulfillOnFirstEventHandler(channelReadPromise: readPromise)).wait())
// let's write a byte from chan1 to chan2.
XCTAssertNoThrow(try chan1.writeAndFlush(self.buffer).wait(), "write on \(chan1) failed")
// and wait for it to arrive
XCTAssertNoThrow(try readPromise.futureResult.wait())
XCTAssertNoThrow(try chan1.syncCloseAcceptingAlreadyClosed())
XCTAssertNoThrow(try chan2.syncCloseAcceptingAlreadyClosed())
}
XCTAssertNoThrow(try forEachCrossConnectedStreamChannelPair(runTest))
}
func testHalfCloseOwnInput() {
func runTest(chan1: Channel, chan2: Channel) throws {
let readPromise = chan1.eventLoop.makePromise(of: Void.self)
XCTAssertNoThrow(try chan2.setOption(ChannelOptions.allowRemoteHalfClosure, value: true).wait())
// let's close chan2's input
XCTAssertNoThrow(try chan2.close(mode: .input).wait())
self.buffer.writeString("X")
XCTAssertNoThrow(try chan1.pipeline.addHandler(FulfillOnFirstEventHandler(channelReadPromise: readPromise)).wait())
// let's write a byte from chan2 to chan1.
XCTAssertNoThrow(try chan2.writeAndFlush(self.buffer).wait())
// and wait for it to arrive
XCTAssertNoThrow(try readPromise.futureResult.wait())
XCTAssertNoThrow(try chan1.syncCloseAcceptingAlreadyClosed())
XCTAssertNoThrow(try chan2.syncCloseAcceptingAlreadyClosed())
}
XCTAssertNoThrow(try forEachCrossConnectedStreamChannelPair(runTest))
}
func testDoubleShutdownInput() {
func runTest(chan1: Channel, chan2: Channel) throws {
XCTAssertNoThrow(try chan1.setOption(ChannelOptions.allowRemoteHalfClosure, value: true).wait())
XCTAssertNoThrow(try chan1.close(mode: .input).wait())
XCTAssertThrowsError(try chan1.close(mode: .input).wait()) { error in
XCTAssertEqual(ChannelError.inputClosed, error as? ChannelError, "\(chan1)")
}
}
XCTAssertNoThrow(try forEachCrossConnectedStreamChannelPair(runTest))
}
func testDoubleShutdownOutput() {
func runTest(chan1: Channel, chan2: Channel) throws {
XCTAssertNoThrow(try chan2.setOption(ChannelOptions.allowRemoteHalfClosure, value: true).wait())
XCTAssertNoThrow(try chan1.close(mode: .output).wait())
XCTAssertThrowsError(try chan1.close(mode: .output).wait()) { error in
XCTAssertEqual(ChannelError.outputClosed, error as? ChannelError, "\(chan1)")
}
}
XCTAssertNoThrow(try forEachCrossConnectedStreamChannelPair(runTest))
}
func testWriteFailsAfterOutputClosed() {
func runTest(chan1: Channel, chan2: Channel) throws {
XCTAssertNoThrow(try chan2.setOption(ChannelOptions.allowRemoteHalfClosure, value: true).wait())
XCTAssertNoThrow(try chan1.close(mode: .output).wait())
var buffer = chan1.allocator.buffer(capacity: 10)
buffer.writeString("helloworld")
XCTAssertThrowsError(try chan1.writeAndFlush(buffer).wait()) { error in
XCTAssertEqual(ChannelError.outputClosed, error as? ChannelError, "\(chan1)")
}
}
XCTAssertNoThrow(try forEachCrossConnectedStreamChannelPair(runTest))
}
func testVectorWrites() {
func runTest(chan1: Channel, chan2: Channel) throws {
let readPromise = chan2.eventLoop.makePromise(of: Void.self)
XCTAssertNoThrow(chan2.pipeline.addHandler(FulfillOnFirstEventHandler(channelReadPromise: readPromise)))
var buffer = chan1.allocator.buffer(capacity: 1)
buffer.writeString("X")
for _ in 0..<100 {
chan1.write(buffer, promise: nil)
}
chan1.flush()
XCTAssertNoThrow(try readPromise.futureResult.wait())
}
XCTAssertNoThrow(try forEachCrossConnectedStreamChannelPair(runTest))
}
}
final class AccumulateAllReads: ChannelInboundHandler {
typealias InboundIn = ByteBuffer
var accumulator: ByteBuffer!
let allDonePromise: EventLoopPromise<ByteBuffer>
init(allDonePromise: EventLoopPromise<ByteBuffer>) {
self.allDonePromise = allDonePromise
}
func handlerAdded(context: ChannelHandlerContext) {
self.accumulator = context.channel.allocator.buffer(capacity: 1024)
}
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
var buffer = self.unwrapInboundIn(data)
let closeAfter = buffer.readableBytesView.last == UInt8(ascii: "$")
self.accumulator.writeBuffer(&buffer)
if closeAfter {
context.close(promise: nil)
}
}
func channelInactive(context: ChannelHandlerContext) {
self.allDonePromise.succeed(self.accumulator)
self.accumulator = nil
}
}

View File

@ -16,17 +16,26 @@ import XCTest
@testable import NIO
import NIOConcurrencyHelpers
func withPipe(_ body: (NIO.NIOFileHandle, NIO.NIOFileHandle) -> [NIO.NIOFileHandle]) throws {
func withPipe(_ body: (NIO.NIOFileHandle, NIO.NIOFileHandle) throws -> [NIO.NIOFileHandle]) throws {
var fds: [Int32] = [-1, -1]
fds.withUnsafeMutableBufferPointer { ptr in
XCTAssertEqual(0, pipe(ptr.baseAddress!))
}
let readFH = NIOFileHandle(descriptor: fds[0])
let writeFH = NIOFileHandle(descriptor: fds[1])
let toClose = body(readFH, writeFH)
var toClose: [NIOFileHandle] = [readFH, writeFH]
var error: Error? = nil
do {
toClose = try body(readFH, writeFH)
} catch let err {
error = err
}
try toClose.forEach { fh in
XCTAssertNoThrow(try fh.close())
}
if let error = error {
throw error
}
}
func withTemporaryDirectory<T>(_ body: (String) throws -> T) rethrows -> T {
@ -474,7 +483,45 @@ func forEachActiveChannelType<T>(file: StaticString = #file,
}
let channelEL = group.next()
// TCP
let lock = Lock()
var ret: [T] = []
_ = try forEachCrossConnectedStreamChannelPair(file: file, line: line) { (chan1: Channel, chan2: Channel) throws -> Void in
var innerRet: [T] = [try body(chan1)]
if let parent = chan1.parent {
innerRet.append(try body(parent))
}
lock.withLock {
ret.append(contentsOf: innerRet)
}
}
// UDP
let udpChannel = DatagramBootstrap(group: channelEL)
.channelInitializer { channel in
XCTAssert(channel.eventLoop.inEventLoop)
return channelEL.makeSucceededFuture(())
}
.bind(host: "127.0.0.1", port: 0)
defer {
XCTAssertNoThrow(try udpChannel.wait().syncCloseAcceptingAlreadyClosed())
}
return try lock.withLock {
ret.append(try body(udpChannel.wait()))
return ret
}
}
func withCrossConnectedSockAddrChannels<R>(bindTarget: SocketAddress,
file: StaticString = #file,
line: UInt = #line,
_ body: (Channel, Channel) throws -> R) throws -> R {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}
let channelEL = group.next()
let tcpAcceptedChannel = channelEL.makePromise(of: Channel.self)
let tcpServerChannel = try assertNoThrowWithValue(ServerBootstrap(group: channelEL)
.childChannelInitializer { channel in
@ -484,7 +531,7 @@ func forEachActiveChannelType<T>(file: StaticString = #file,
}.cascade(to: tcpAcceptedChannel)
return channel.pipeline.addHandler(FulfillOnFirstEventHandler(channelActivePromise: accepted))
}
.bind(host: "127.0.0.1", port: 0)
.bind(to: bindTarget)
.wait(), file: file, line: line)
defer {
XCTAssertNoThrow(try tcpServerChannel.syncCloseAcceptingAlreadyClosed())
@ -501,21 +548,73 @@ func forEachActiveChannelType<T>(file: StaticString = #file,
XCTAssertNoThrow(try tcpClientChannel.syncCloseAcceptingAlreadyClosed())
}
// UDP
let udpChannel = DatagramBootstrap(group: channelEL)
.channelInitializer { channel in
XCTAssert(channel.eventLoop.inEventLoop)
return channelEL.makeSucceededFuture(())
}
.bind(host: "127.0.0.1", port: 0)
defer {
XCTAssertNoThrow(try udpChannel.wait().syncCloseAcceptingAlreadyClosed())
}
return try body(try tcpAcceptedChannel.futureResult.wait(), tcpClientChannel)
}
return try [tcpServerChannel,
tcpAcceptedChannel.futureResult.wait(),
tcpClientChannel,
udpChannel.wait()].map {
try body($0)
func withCrossConnectedTCPChannels<R>(file: StaticString = #file,
line: UInt = #line,
_ body: (Channel, Channel) throws -> R) throws -> R {
return try withCrossConnectedSockAddrChannels(bindTarget: .init(ipAddress: "127.0.0.1", port: 0), body)
}
func withCrossConnectedUnixDomainSocketChannels<R>(file: StaticString = #file,
line: UInt = #line,
_ body: (Channel, Channel) throws -> R) throws -> R {
return try withTemporaryDirectory { tempDir in
let bindTarget = try SocketAddress(unixDomainSocketPath: tempDir + "/server.sock")
return try withCrossConnectedSockAddrChannels(bindTarget: bindTarget, body)
}
}
func withCrossConnectedPipeChannels<R>(file: StaticString = #file,
line: UInt = #line,
_ body: (Channel, Channel) throws -> R) throws -> R {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully(), file: file, line: line)
}
var result: R? = nil
XCTAssertNoThrow(try withPipe { pipe1Read, pipe1Write -> [NIOFileHandle] in
try withPipe { pipe2Read, pipe2Write -> [NIOFileHandle] in
try pipe1Read.withUnsafeFileDescriptor { pipe1Read in
try pipe1Write.withUnsafeFileDescriptor { pipe1Write in
try pipe2Read.withUnsafeFileDescriptor { pipe2Read in
try pipe2Write.withUnsafeFileDescriptor { pipe2Write in
let channel1 = try NIOPipeBootstrap(group: group)
.withPipes(inputDescriptor: pipe1Read, outputDescriptor: pipe2Write)
.wait()
defer {
XCTAssertNoThrow(try channel1.syncCloseAcceptingAlreadyClosed())
}
let channel2 = try NIOPipeBootstrap(group: group)
.withPipes(inputDescriptor: pipe2Read, outputDescriptor: pipe1Write)
.wait()
defer {
XCTAssertNoThrow(try channel2.syncCloseAcceptingAlreadyClosed())
}
result = try body(channel1, channel2)
}
}
}
}
XCTAssertNoThrow(try pipe1Read.takeDescriptorOwnership(), file: file, line: line)
XCTAssertNoThrow(try pipe1Write.takeDescriptorOwnership(), file: file, line: line)
XCTAssertNoThrow(try pipe2Read.takeDescriptorOwnership(), file: file, line: line)
XCTAssertNoThrow(try pipe2Write.takeDescriptorOwnership(), file: file, line: line)
return []
}
return [] // the channels are closing the pipes
}, file: file, line: line)
return result!
}
func forEachCrossConnectedStreamChannelPair<R>(file: StaticString = #file,
line: UInt = #line,
_ body: (Channel, Channel) throws -> R) throws -> [R] {
let r1 = try withCrossConnectedTCPChannels(body)
let r2 = try withCrossConnectedPipeChannels(body)
let r3 = try withCrossConnectedUnixDomainSocketChannels(body)
return [r1, r2, r3]
}

View File

@ -19,7 +19,7 @@ services:
image: swift-nio:16.04-5.1
environment:
- MAX_ALLOCS_ALLOWED_1000_reqs_1_conn=30600
- MAX_ALLOCS_ALLOWED_1_reqs_1000_conn=592100
- MAX_ALLOCS_ALLOWED_1_reqs_1000_conn=594100
- MAX_ALLOCS_ALLOWED_ping_pong_1000_reqs_1_conn=4600
- MAX_ALLOCS_ALLOWED_bytebuffer_lots_of_rw=2100
- MAX_ALLOCS_ALLOWED_future_lots_of_callbacks=99100

View File

@ -19,7 +19,7 @@ services:
image: swift-nio:18.04-5.0
environment:
- MAX_ALLOCS_ALLOWED_1000_reqs_1_conn=31200
- MAX_ALLOCS_ALLOWED_1_reqs_1000_conn=1062050
- MAX_ALLOCS_ALLOWED_1_reqs_1000_conn=1055050
- MAX_ALLOCS_ALLOWED_ping_pong_1000_reqs_1_conn=4600
- MAX_ALLOCS_ALLOWED_bytebuffer_lots_of_rw=2100
- MAX_ALLOCS_ALLOWED_future_lots_of_callbacks=99100