Move Socket/* into NIO/ for max performance

Before:

`Bandwidth per channel: 18870.319⇅ Mbps (2358789.8 kBps)`

After:

`Bandwidth per channel: 21330.617⇅ Mbps (2666327.1 kBps)`
This commit is contained in:
Norman Maurer 2017-07-03 14:52:09 +02:00
parent 904150913d
commit fd936557c4
15 changed files with 38 additions and 47 deletions

View File

@ -28,20 +28,21 @@ import Foundation
let sysSOCK_STREAM = SOCK_STREAM
#endif
public protocol Registration {
protocol Registration {
var interested: IOEvent { get set }
}
public class BaseSocket : Selectable {
class BaseSocket : Selectable {
public let descriptor: Int32
public private(set) var open: Bool
public final var localAddress: SocketAddress? {
final var localAddress: SocketAddress? {
get {
return nil
}
}
public final var remoteAddress: SocketAddress? {
final var remoteAddress: SocketAddress? {
get {
return nil
}
@ -60,7 +61,7 @@ public class BaseSocket : Selectable {
self.open = true
}
public final func setNonBlocking() throws {
final func setNonBlocking() throws {
guard self.open else {
throw IOError(errno: EBADF, reason: "can't control file descriptor as it's not open anymore.")
}
@ -70,7 +71,7 @@ public class BaseSocket : Selectable {
}
}
public final func setOption<T>(level: Int32, name: Int32, value: T) throws {
final func setOption<T>(level: Int32, name: Int32, value: T) throws {
guard self.open else {
throw IOError(errno: EBADF, reason: "can't set socket options as it's not open anymore.")
}
@ -87,7 +88,7 @@ public class BaseSocket : Selectable {
}
}
public final func getOption<T>(level: Int32, name: Int32) throws -> T {
final func getOption<T>(level: Int32, name: Int32) throws -> T {
guard self.open else {
throw IOError(errno: EBADF, reason: "can't get socket options as it's not open anymore.")
}
@ -105,7 +106,7 @@ public class BaseSocket : Selectable {
return val.pointee
}
public final func bind(to address: SocketAddress) throws {
final func bind(to address: SocketAddress) throws {
switch address {
case .v4(address: let addr):
try bindSocket(addr: addr)
@ -129,7 +130,7 @@ public class BaseSocket : Selectable {
}
}
public final func close() throws {
final func close() throws {
guard self.open else {
throw IOError(errno: EBADF, reason: "can't close socket (as it's not open anymore.")
}

View File

@ -13,7 +13,6 @@
//===----------------------------------------------------------------------===//
import Foundation
import Sockets
public final class ServerBootstrap {

View File

@ -13,7 +13,6 @@
//===----------------------------------------------------------------------===//
import Foundation
import Sockets
import ConcurrencyHelpers
#if os(Linux)

View File

@ -13,7 +13,6 @@
//===----------------------------------------------------------------------===//
import Foundation
import Sockets
public protocol ChannelHandler : class {
func handlerAdded(ctx: ChannelHandlerContext) throws

View File

@ -11,8 +11,6 @@
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Sockets
public protocol ChannelOutboundInvoker {
func register() -> Future<Void>
func register(promise: Promise<Void>?)

View File

@ -13,7 +13,6 @@
//===----------------------------------------------------------------------===//
import Foundation
import Sockets
/*

View File

@ -14,7 +14,6 @@
import Foundation
import Dispatch
import Sockets
class EmbeddedEventLoop : EventLoop {

View File

@ -13,7 +13,6 @@
//===----------------------------------------------------------------------===//
import Foundation
import Sockets
import ConcurrencyHelpers
public protocol EventLoop: EventLoopGroup {
@ -88,7 +87,7 @@ enum NIORegistration: Registration {
// TODO: Implement scheduling tasks in the future (a.k.a ScheduledExecutoreService
final class SelectableEventLoop : EventLoop {
private let selector: Sockets.Selector<NIORegistration>
private let selector: NIO.Selector<NIORegistration>
private var thread: pthread_t?
private var tasks: [() -> ()]
private let tasksLock = Lock()
@ -101,7 +100,7 @@ final class SelectableEventLoop : EventLoop {
let storageRefs: UnsafeMutableBufferPointer<Unmanaged<AnyObject>>
init() throws {
self.selector = try Sockets.Selector()
self.selector = try NIO.Selector()
self.tasks = Array()
self._iovecs = UnsafeMutablePointer.allocate(capacity: Socket.writevLimit)
self._storageRefs = UnsafeMutablePointer.allocate(capacity: Socket.writevLimit)

View File

@ -14,7 +14,7 @@
import Foundation
public protocol Selectable {
protocol Selectable {
var descriptor: Int32 { get }
var open: Bool { get }

View File

@ -22,7 +22,7 @@ import Foundation
import Darwin
#endif
public final class Selector<R: Registration> {
final class Selector<R: Registration> {
private var open: Bool
#if os(Linux)
@ -60,7 +60,7 @@ public final class Selector<R: Registration> {
events = Selector.allocateEventsArray(capacity: eventsCapacity)
}
public init() throws {
init() throws {
events = Selector.allocateEventsArray(capacity: eventsCapacity)
self.open = false
@ -243,7 +243,7 @@ public final class Selector<R: Registration> {
}
#endif
public func register<S: Selectable>(selectable: S, interested: IOEvent = .read, makeRegistration: (IOEvent) -> R) throws {
func register<S: Selectable>(selectable: S, interested: IOEvent = .read, makeRegistration: (IOEvent) -> R) throws {
guard self.open else {
throw IOError(errno: EBADF, reason: "can't register selector as it's not open anymore.")
}
@ -263,7 +263,7 @@ public final class Selector<R: Registration> {
registrations[Int(selectable.descriptor)] = makeRegistration(interested)
}
public func reregister<S: Selectable>(selectable: S, interested: IOEvent) throws {
func reregister<S: Selectable>(selectable: S, interested: IOEvent) throws {
guard self.open else {
throw IOError(errno: EBADF, reason: "can't re-register selector as it's not open anymore.")
}
@ -285,7 +285,7 @@ public final class Selector<R: Registration> {
registrations[Int(selectable.descriptor)] = reg
}
public func deregister<S: Selectable>(selectable: S) throws {
func deregister<S: Selectable>(selectable: S) throws {
guard let reg = registrations.removeValue(forKey: Int(selectable.descriptor)) else {
return
}
@ -300,7 +300,7 @@ public final class Selector<R: Registration> {
#endif
}
public func whenReady(strategy: SelectorStrategy, _ fn: (SelectorEvent<R>) throws -> Void) throws -> Void {
func whenReady(strategy: SelectorStrategy, _ fn: (SelectorEvent<R>) throws -> Void) throws -> Void {
guard self.open else {
throw IOError(errno: EBADF, reason: "can't call whenReady for selector as it's not open anymore.")
}
@ -361,7 +361,7 @@ public final class Selector<R: Registration> {
#endif
}
public func close() throws {
func close() throws {
guard self.open else {
throw IOError(errno: EBADF, reason: "can't close selector as it's not open anymore.")
}
@ -377,7 +377,7 @@ public final class Selector<R: Registration> {
}
}
public func wakeup() throws {
func wakeup() throws {
guard self.open else {
throw IOError(errno: EBADF, reason: "can't wakeup selector as it's not open anymore.")
}
@ -397,7 +397,7 @@ public final class Selector<R: Registration> {
}
}
public struct SelectorEvent<R> {
struct SelectorEvent<R> {
public let registration: R
public let io: IOEvent
@ -413,13 +413,13 @@ public struct SelectorEvent<R> {
}
}
public enum SelectorStrategy {
enum SelectorStrategy {
case block
case blockUntilTimeout(ms: Int)
case now
}
public enum IOEvent {
enum IOEvent {
case read
case write
case all

View File

@ -27,7 +27,7 @@ import Foundation
// TODO: Handle AF_INET6 as well
public final class ServerSocket: BaseSocket {
final class ServerSocket: BaseSocket {
public class func bootstrap(host: String, port: Int32) throws -> ServerSocket {
let socket = try ServerSocket()
try socket.bind(to: try SocketAddresses.newAddress(for: host, on: port))
@ -35,18 +35,18 @@ public final class ServerSocket: BaseSocket {
return socket
}
public init() throws {
init() throws {
let sock = try BaseSocket.newSocket()
super.init(descriptor: sock)
}
public func listen(backlog: Int32 = 128) throws {
func listen(backlog: Int32 = 128) throws {
_ = try wrapSyscall({ $0 >= 0 }, function: "listen") { () -> Int32 in
sysListen(self.descriptor, backlog)
}
}
public func accept() throws -> Socket? {
func accept() throws -> Socket? {
var acceptAddr = sockaddr_in()
var addrSize = socklen_t(MemoryLayout<sockaddr_in>.size)

View File

@ -33,8 +33,8 @@ let sysConnect = Darwin.connect
public typealias IOVector = iovec
// TODO: scattering support
public final class Socket : BaseSocket {
public static var writevLimit: Int {
final class Socket : BaseSocket {
static var writevLimit: Int {
// UIO_MAXIOV is only exported on linux atm
#if os(Linux)
return Int(UIO_MAXIOV)
@ -43,7 +43,7 @@ public final class Socket : BaseSocket {
#endif
}
public init() throws {
init() throws {
let sock = try BaseSocket.newSocket()
super.init(descriptor: sock)
}
@ -52,7 +52,7 @@ public final class Socket : BaseSocket {
super.init(descriptor: descriptor)
}
public func connect(to address: SocketAddress) throws -> Bool {
func connect(to address: SocketAddress) throws -> Bool {
switch address {
case .v4(address: let addr):
return try connectSocket(addr: addr)
@ -83,18 +83,18 @@ public final class Socket : BaseSocket {
}
}
public func finishConnect() throws {
func finishConnect() throws {
let result: Int32 = try getOption(level: SOL_SOCKET, name: SO_ERROR)
if result != 0 {
throw ioError(errno: result, function: "getsockopt")
}
}
public func write(data: Data) throws -> IOResult<Int> {
func write(data: Data) throws -> IOResult<Int> {
return try data.withUnsafeBytes({ try write(pointer: $0, size: data.count) })
}
public func write(pointer: UnsafePointer<UInt8>, size: Int) throws -> IOResult<Int> {
func write(pointer: UnsafePointer<UInt8>, size: Int) throws -> IOResult<Int> {
guard self.open else {
throw IOError(errno: EBADF, reason: "can't write to socket as it's not open anymore.")
}
@ -103,7 +103,7 @@ public final class Socket : BaseSocket {
}
}
public func writev(iovecs: UnsafeBufferPointer<IOVector>) throws -> IOResult<Int> {
func writev(iovecs: UnsafeBufferPointer<IOVector>) throws -> IOResult<Int> {
guard self.open else {
throw IOError(errno: EBADF, reason: "can't writev to socket as it's not open anymore.")
}
@ -113,11 +113,11 @@ public final class Socket : BaseSocket {
}
}
public func read(data: inout Data) throws -> IOResult<Int> {
func read(data: inout Data) throws -> IOResult<Int> {
return try data.withUnsafeMutableBytes({ try read(pointer: $0, size: data.count) })
}
public func read(pointer: UnsafeMutablePointer<UInt8>, size: Int) throws -> IOResult<Int> {
func read(pointer: UnsafeMutablePointer<UInt8>, size: Int) throws -> IOResult<Int> {
guard self.open else {
throw IOError(errno: EBADF, reason: "can't read from socket as it's not open anymore.")
}

View File

@ -61,7 +61,6 @@ public enum SocketAddressError: Error {
case unsupported
}
public enum SocketAddress {
case v4(address: sockaddr_in)
case v6(address: sockaddr_in6)

View File

@ -13,7 +13,6 @@
//===----------------------------------------------------------------------===//
import Foundation
import Sockets
public class Buffer {
var data: Data