make registration type-safe

This commit is contained in:
Johannes Weiß 2017-06-27 15:58:22 +01:00
parent f35fb80c7e
commit d9daac64cc
8 changed files with 149 additions and 94 deletions

View File

@ -370,6 +370,10 @@ final class SocketChannel : BaseSocketChannel<Socket> {
try super.init(socket: socket, eventLoop: eventLoop)
}
public override func registrationFor(interested: InterestedEvent) -> NIORegistration {
return .socketChannel(self, interested)
}
fileprivate override init(socket: Socket, eventLoop: SelectableEventLoop) throws {
try socket.setNonBlocking()
try super.init(socket: socket, eventLoop: eventLoop)
@ -467,6 +471,10 @@ final class ServerSocketChannel : BaseSocketChannel<ServerSocket> {
try super.init(socket: serverSocket, eventLoop: eventLoop)
}
public override func registrationFor(interested: InterestedEvent) -> NIORegistration {
return .serverSocketChannel(self, interested)
}
override fileprivate func setOption0<T: ChannelOption>(option: T, value: T.OptionType) throws {
assert(eventLoop.inEventLoop)
if option is BacklogOption {
@ -581,11 +589,15 @@ public protocol Channel : class, ChannelOutboundInvoker {
}
protocol SelectableChannel : Channel {
var selectable: Selectable { get }
associatedtype SelectableType: Selectable
var selectable: SelectableType { get }
var interestedEvent: InterestedEvent { get }
func writable()
func readable()
func registrationFor(interested: InterestedEvent) -> NIORegistration
}
extension Channel {
@ -631,8 +643,15 @@ extension Channel {
}
class BaseSocketChannel<T : BaseSocket> : SelectableChannel, ChannelCore {
typealias SelectableType = T
public final var selectable: Selectable { return socket }
func registrationFor(interested: InterestedEvent) -> NIORegistration {
fatalError("must override")
}
var selectable: T {
return self.socket
}
public final var _unsafe: ChannelCore { return self }

View File

@ -125,7 +125,7 @@ class EmbeddedChannelCore : ChannelCore {
public class EmbeddedChannel : Channel {
public var closeFuture: Future<Void> { return channelcore.closePromise.futureResult }
private lazy var channelcore: EmbeddedChannelCore = EmbeddedChannelCore(pipeline: _pipeline)
private lazy var channelcore: EmbeddedChannelCore = EmbeddedChannelCore(pipeline: self._pipeline)
public var _unsafe: ChannelCore {
return channelcore

View File

@ -65,10 +65,34 @@ extension EventLoop {
}
}
enum NIORegistration: Registration {
case serverSocketChannel(ServerSocketChannel, InterestedEvent)
case socketChannel(SocketChannel, InterestedEvent)
var interested: InterestedEvent {
set {
switch self {
case .serverSocketChannel(let c, _):
self = .serverSocketChannel(c, newValue)
case .socketChannel(let c, _):
self = .socketChannel(c, newValue)
}
}
get {
switch self {
case .serverSocketChannel(_, let i):
return i
case .socketChannel(_, let i):
return i
}
}
}
}
// TODO: Implement scheduling tasks in the future (a.k.a ScheduledExecutoreService
final class SelectableEventLoop : EventLoop {
private let selector: Sockets.Selector
private let selector: Sockets.Selector<NIORegistration>
private var thread: pthread_t?
private var tasks: [() -> ()]
private let tasksLock = Lock()
@ -94,17 +118,17 @@ final class SelectableEventLoop : EventLoop {
_storageRefs.deallocate(capacity: Socket.writevLimit)
}
func register(channel: SelectableChannel) throws {
func register<C: SelectableChannel>(channel: C) throws {
assert(inEventLoop)
try selector.register(selectable: channel.selectable, interested: channel.interestedEvent, attachment: channel)
try selector.register(selectable: channel.selectable, interested: channel.interestedEvent, makeRegistration: channel.registrationFor(interested:))
}
func deregister(channel: SelectableChannel) throws {
func deregister<C: SelectableChannel>(channel: C) throws {
assert(inEventLoop)
try selector.deregister(selectable: channel.selectable)
}
func reregister(channel: SelectableChannel) throws {
func reregister<C: SelectableChannel>(channel: C) throws {
assert(inEventLoop)
try selector.reregister(selectable: channel.selectable, interested: channel.interestedEvent)
}
@ -122,6 +146,32 @@ final class SelectableEventLoop : EventLoop {
_ = try? selector.wakeup()
}
private func handleEvent<R: Registration, C: SelectableChannel>(_ ev: SelectorEvent<R>, channel: C) {
guard handleEvents(channel) else {
return
}
if ev.isWritable {
channel.writable()
guard handleEvents(channel) else {
return
}
}
if ev.isReadable {
channel.readable()
guard handleEvents(channel) else {
return
}
}
// Ensure we never reach here if the channel is not open anymore.
assert(channel.open)
}
func run() throws {
thread = pthread_self()
defer {
@ -131,33 +181,12 @@ final class SelectableEventLoop : EventLoop {
while !closed {
// Block until there are events to handle or the selector was woken up
try selector.whenReady(strategy: .block) { ev in
guard let channel = ev.attachment as? SelectableChannel else {
fatalError("ev.attachment has type \(type(of: ev.attachment)), expected Channel")
switch ev.registration {
case .serverSocketChannel(let chan, _):
self.handleEvent(ev, channel: chan)
case .socketChannel(let chan, _):
self.handleEvent(ev, channel: chan)
}
guard handleEvents(channel) else {
return
}
if ev.isWritable {
channel.writable()
guard handleEvents(channel) else {
return
}
}
if ev.isReadable {
channel.readable()
guard handleEvents(channel) else {
return
}
}
// Ensure we never reach here if the channel is not open anymore.
assert(channel.open)
}
// TODO: Better locking
@ -172,7 +201,7 @@ final class SelectableEventLoop : EventLoop {
}
}
private func handleEvents(_ channel: SelectableChannel) -> Bool {
private func handleEvents<C: SelectableChannel>(_ channel: C) -> Bool {
if channel.open {
return true
}

View File

@ -28,6 +28,9 @@ import Foundation
let sysSOCK_STREAM = SOCK_STREAM
#endif
public protocol Registration {
var interested: InterestedEvent { get set }
}
public class BaseSocket : Selectable {
public let descriptor: Int32

View File

@ -19,5 +19,5 @@ public protocol Selectable {
var open: Bool { get }
func close() throws;
func close() throws
}

View File

@ -22,22 +22,22 @@ import Foundation
import Darwin
#endif
public final class Selector {
public final class Selector<R: Registration> {
private var open: Bool
#if os(Linux)
#if os(Linux)
private typealias EventType = epoll_event
private let eventsCapacity = 2048
private let eventfd: Int32
#else
#else
private typealias EventType = kevent
private let eventsCapacity = 2048
// TODO: Just reserve 0 is most likely not the best idea, need to think about a better way to handle this.
private static let EvUserIdent = UInt(0)
#endif
#endif
private let fd: Int32
private let events: UnsafeMutablePointer<EventType>
private var registrations = [Int: Registration]()
private var registrations = [Int: R]()
public init() throws {
events = UnsafeMutablePointer.allocate(capacity: eventsCapacity)
@ -68,7 +68,7 @@ public final class Selector {
self.open = true
var event = kevent()
event.ident = Selector.EvUserIdent
event.ident = 0
event.filter = Int16(EVFILT_USER)
event.fflags = UInt32(NOTE_FFNOP)
event.data = 0
@ -140,7 +140,7 @@ public final class Selector {
}
}
private func register_kqueue(selectable: Selectable, interested: InterestedEvent, oldInterested: InterestedEvent?) throws {
private func register_kqueue<S: Selectable>(selectable: S, interested: InterestedEvent, oldInterested: InterestedEvent?) throws {
guard self.open else {
throw IOError(errno: EBADF, reason: "can't register kqueue on selector as it's not open anymore.")
}
@ -224,7 +224,7 @@ public final class Selector {
}
#endif
public func register(selectable: Selectable, interested: InterestedEvent = .read, attachment: AnyObject? = nil) throws {
public func register<S: Selectable>(selectable: S, interested: InterestedEvent = .read, makeRegistration: (InterestedEvent) -> R) throws {
guard self.open else {
throw IOError(errno: EBADF, reason: "can't register selector as it's not open anymore.")
}
@ -241,10 +241,10 @@ public final class Selector {
#else
try register_kqueue(selectable: selectable, interested: interested, oldInterested: nil)
#endif
registrations[Int(selectable.descriptor)] = Registration(selectable: selectable, interested: interested, attachment: attachment)
registrations[Int(selectable.descriptor)] = makeRegistration(interested)
}
public func reregister(selectable: Selectable, interested: InterestedEvent) throws {
public func reregister<S: Selectable>(selectable: S, interested: InterestedEvent) throws {
guard self.open else {
throw IOError(errno: EBADF, reason: "can't re-register selector as it's not open anymore.")
}
@ -266,7 +266,7 @@ public final class Selector {
registrations[Int(selectable.descriptor)] = reg
}
public func deregister(selectable: Selectable) throws {
public func deregister<S: Selectable>(selectable: S) throws {
let reg = registrations.removeValue(forKey: Int(selectable.descriptor))
guard reg != nil else {
return
@ -282,7 +282,7 @@ public final class Selector {
}
public func whenReady(strategy: SelectorStrategy, _ fn: (SelectorEvent) throws -> Void) throws -> Void {
public func whenReady(strategy: SelectorStrategy, _ fn: (SelectorEvent<R>) throws -> Void) throws -> Void {
guard self.open else {
throw IOError(errno: EBADF, reason: "can't call whenReady for selector as it's not open anymore.")
}
@ -304,7 +304,7 @@ public final class Selector {
SelectorEvent(
isReadable: (ev.events & EPOLLIN.rawValue) != 0 || (ev.events & EPOLLERR.rawValue) != 0 || (ev.events & EPOLLRDHUP.rawValue) != 0,
isWritable: (ev.events & EPOLLOUT.rawValue) != 0 || (ev.events & EPOLLERR.rawValue) != 0 || (ev.events & EPOLLRDHUP.rawValue) != 0,
selectable: registration.selectable, attachment: registration.attachment))
registration: registration))
}
}
#else
@ -319,12 +319,12 @@ public final class Selector {
}
for i in 0..<ready {
let ev = events[i]
if ev.ident != Selector.EvUserIdent {
if ev.ident != 0 {
guard let registration = registrations[Int(ev.ident)] else {
// Just ignore as this means the user deregistered already in between. This can happen as kevent returns two different events, one for EVFILT_READ and one for EVFILT_WRITE.
continue
}
try fn((SelectorEvent(isReadable: Int32(ev.filter) == EVFILT_READ, isWritable: Int32(ev.filter) == EVFILT_WRITE, selectable: registration.selectable, attachment: registration.attachment)))
try fn((SelectorEvent(isReadable: Int32(ev.filter) == EVFILT_READ, isWritable: Int32(ev.filter) == EVFILT_WRITE, registration: registration)))
}
}
#endif
@ -355,7 +355,7 @@ public final class Selector {
let _ = CEventfd.eventfd_write(eventfd, 1)
#else
var event = kevent()
event.ident = Selector.EvUserIdent
event.ident = 0
event.filter = Int16(EVFILT_USER)
event.fflags = UInt32(NOTE_TRIGGER | NOTE_FFNOP)
event.data = 0
@ -366,30 +366,16 @@ public final class Selector {
}
}
private struct Registration {
fileprivate(set) var interested: InterestedEvent
fileprivate(set) var selectable: Selectable
fileprivate(set) var attachment: AnyObject?
init(selectable: Selectable, interested: InterestedEvent, attachment: AnyObject?) {
self.selectable = selectable
self.interested = interested
self.attachment = attachment
}
}
public struct SelectorEvent {
public struct SelectorEvent<R> {
public let registration: R
public fileprivate(set) var isReadable: Bool
public fileprivate(set) var isWritable: Bool
public fileprivate(set) var selectable: Selectable
public fileprivate(set) var attachment: AnyObject?
init(isReadable: Bool, isWritable: Bool, selectable: Selectable, attachment: AnyObject?) {
init(isReadable: Bool, isWritable: Bool, registration: R) {
self.isReadable = isReadable
self.isWritable = isWritable
self.selectable = selectable
self.attachment = attachment
self.registration = registration
}
}

View File

@ -34,7 +34,6 @@ public typealias IOVector = iovec
// TODO: scattering support
public final class Socket : BaseSocket {
public static var writevLimit: Int {
// UIO_MAXIOV is only exported on linux atm
#if os(Linux)

View File

@ -32,14 +32,36 @@ public class Buffer {
}
}
func deregisterAndClose(selector: Sockets.Selector, s: Selectable) {
func deregisterAndClose<S: Selectable, R>(selector: Sockets.Selector<R>, s: S) {
do { try selector.deregister(selectable: s) } catch {}
do { try s.close() } catch {}
}
enum SocketRegistration: Registration {
case socket(Socket, Buffer, InterestedEvent)
case serverSocket(ServerSocket, Buffer?, InterestedEvent)
var interested: InterestedEvent {
get {
switch self {
case .socket(_, _, let i):
return i
case .serverSocket(_, _, let i):
return i
}
}
set {
switch self {
case .socket(let s, let b, _):
self = .socket(s, b, newValue)
case .serverSocket(let s, let b, _):
self = .serverSocket(s, b, newValue)
}
}
}
}
// Bootstrap the server and create the Selector on which we register our sockets.
let selector = try Sockets.Selector()
let selector = try Sockets.Selector<SocketRegistration>()
defer {
do { try selector.close() } catch { }
@ -50,7 +72,7 @@ try server.setNonBlocking()
// this will register with InterestedEvent.READ and no attachment
try selector.register(selectable: server)
try selector.register(selectable: server) { i in .serverSocket(server, nil, i) }
// cleanup
defer {
@ -66,41 +88,38 @@ while true {
if ev.isReadable {
// We can handle either read(...) or accept()
if ev.selectable is Socket {
switch ev.registration {
case .socket(let socket, let buffer, _):
// We stored the Buffer before as attachment so get it and clear the limit / offset.
let buffer = ev.attachment as! Buffer
buffer.clear()
let s = ev.selectable as! Socket
do {
switch try s.read(data: &buffer.data) {
switch try socket.read(data: &buffer.data) {
case .processed(let read):
buffer.limit = Int(read)
switch try s.write(data: buffer.data.subdata(in: buffer.offset..<buffer.limit)) {
switch try socket.write(data: buffer.data.subdata(in: buffer.offset..<buffer.limit)) {
case .processed(let written):
buffer.offset += Int(written)
// We could not write everything so we reregister with InterestedEvent.Write and so get woken up once the socket becomes writable again.
// This also ensure we not read anymore until we were able to echo it back (backpressure FTW).
if buffer.offset < buffer.limit {
try selector.reregister(selectable: s, interested: InterestedEvent.write)
try selector.reregister(selectable: socket, interested: InterestedEvent.write)
}
case .wouldBlock:
// We could not write everything so we reregister with InterestedEvent.Write and so get woken up once the socket becomes writable again.
// This also ensure we not read anymore until we were able to echo it back (backpressure FTW).
try selector.reregister(selectable: s, interested: InterestedEvent.write)
try selector.reregister(selectable: socket, interested: InterestedEvent.write)
}
case .wouldBlock:
()
}
} catch {
deregisterAndClose(selector: selector, s: s)
deregisterAndClose(selector: selector, s: socket)
}
} else if ev.selectable is ServerSocket {
let socket = ev.selectable as! ServerSocket
case .serverSocket(let socket, _, _):
// Accept new connections until there are no more in the backlog
while let accepted = try socket.accept() {
try accepted.setNonBlocking()
@ -109,29 +128,29 @@ while true {
// Allocate an 8kb buffer for reading and writing and register the socket with the selector
let buffer = Buffer(capacity: 8 * 1024)
try selector.register(selectable: accepted, attachment: buffer)
try selector.register(selectable: accepted) { i in .socket(accepted, buffer, i) }
}
}
} else if ev.isWritable {
if ev.selectable is Socket {
let buffer = ev.attachment as! Buffer
let s = ev.selectable as! Socket
switch ev.registration {
case .socket(let socket, let buffer, _):
do {
switch try s.write(data: buffer.data.subdata(in: buffer.offset..<buffer.limit)) {
switch try socket.write(data: buffer.data.subdata(in: buffer.offset..<buffer.limit)) {
case .processed(let written):
buffer.offset += Int(written)
if buffer.offset == buffer.limit {
// Everything was written, reregister again with InterestedEvent.Read so we are notified once there is more data on the socket to read.
try selector.reregister(selectable: s, interested: InterestedEvent.read)
try selector.reregister(selectable: socket, interested: InterestedEvent.read)
}
case .wouldBlock:
()
}
} catch {
deregisterAndClose(selector: selector, s: s)
deregisterAndClose(selector: selector, s: socket)
}
default:
fatalError("internal error: writable server socket")
}
}
}