From d9daac64cc959aebf4d6a863e079c5af0349d4b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Wei=C3=9F?= Date: Tue, 27 Jun 2017 15:58:22 +0100 Subject: [PATCH] make registration type-safe --- Sources/NIO/Channel.swift | 23 ++++++- Sources/NIO/Embedded.swift | 2 +- Sources/NIO/EventLoop.swift | 93 ++++++++++++++++++---------- Sources/Sockets/BaseSocket.swift | 3 + Sources/Sockets/Selectable.swift | 2 +- Sources/Sockets/Selector.swift | 56 +++++++---------- Sources/Sockets/Socket.swift | 1 - Sources/SocketsEchoServer/main.swift | 63 ++++++++++++------- 8 files changed, 149 insertions(+), 94 deletions(-) diff --git a/Sources/NIO/Channel.swift b/Sources/NIO/Channel.swift index 0cc0eb36..106f9d63 100644 --- a/Sources/NIO/Channel.swift +++ b/Sources/NIO/Channel.swift @@ -370,6 +370,10 @@ final class SocketChannel : BaseSocketChannel { try super.init(socket: socket, eventLoop: eventLoop) } + public override func registrationFor(interested: InterestedEvent) -> NIORegistration { + return .socketChannel(self, interested) + } + fileprivate override init(socket: Socket, eventLoop: SelectableEventLoop) throws { try socket.setNonBlocking() try super.init(socket: socket, eventLoop: eventLoop) @@ -467,6 +471,10 @@ final class ServerSocketChannel : BaseSocketChannel { try super.init(socket: serverSocket, eventLoop: eventLoop) } + public override func registrationFor(interested: InterestedEvent) -> NIORegistration { + return .serverSocketChannel(self, interested) + } + override fileprivate func setOption0(option: T, value: T.OptionType) throws { assert(eventLoop.inEventLoop) if option is BacklogOption { @@ -581,11 +589,15 @@ public protocol Channel : class, ChannelOutboundInvoker { } protocol SelectableChannel : Channel { - var selectable: Selectable { get } + associatedtype SelectableType: Selectable + + var selectable: SelectableType { get } var interestedEvent: InterestedEvent { get } func writable() func readable() + + func registrationFor(interested: InterestedEvent) -> NIORegistration } extension Channel { @@ -631,8 +643,15 @@ extension Channel { } class BaseSocketChannel : SelectableChannel, ChannelCore { + typealias SelectableType = T - public final var selectable: Selectable { return socket } + func registrationFor(interested: InterestedEvent) -> NIORegistration { + fatalError("must override") + } + + var selectable: T { + return self.socket + } public final var _unsafe: ChannelCore { return self } diff --git a/Sources/NIO/Embedded.swift b/Sources/NIO/Embedded.swift index 42b0d10a..07854456 100644 --- a/Sources/NIO/Embedded.swift +++ b/Sources/NIO/Embedded.swift @@ -125,7 +125,7 @@ class EmbeddedChannelCore : ChannelCore { public class EmbeddedChannel : Channel { public var closeFuture: Future { return channelcore.closePromise.futureResult } - private lazy var channelcore: EmbeddedChannelCore = EmbeddedChannelCore(pipeline: _pipeline) + private lazy var channelcore: EmbeddedChannelCore = EmbeddedChannelCore(pipeline: self._pipeline) public var _unsafe: ChannelCore { return channelcore diff --git a/Sources/NIO/EventLoop.swift b/Sources/NIO/EventLoop.swift index 15b82824..2a73b6ef 100644 --- a/Sources/NIO/EventLoop.swift +++ b/Sources/NIO/EventLoop.swift @@ -65,10 +65,34 @@ extension EventLoop { } } +enum NIORegistration: Registration { + case serverSocketChannel(ServerSocketChannel, InterestedEvent) + case socketChannel(SocketChannel, InterestedEvent) + + var interested: InterestedEvent { + set { + switch self { + case .serverSocketChannel(let c, _): + self = .serverSocketChannel(c, newValue) + case .socketChannel(let c, _): + self = .socketChannel(c, newValue) + } + } + get { + switch self { + case .serverSocketChannel(_, let i): + return i + case .socketChannel(_, let i): + return i + } + } + } + +} // TODO: Implement scheduling tasks in the future (a.k.a ScheduledExecutoreService final class SelectableEventLoop : EventLoop { - private let selector: Sockets.Selector + private let selector: Sockets.Selector private var thread: pthread_t? private var tasks: [() -> ()] private let tasksLock = Lock() @@ -94,17 +118,17 @@ final class SelectableEventLoop : EventLoop { _storageRefs.deallocate(capacity: Socket.writevLimit) } - func register(channel: SelectableChannel) throws { + func register(channel: C) throws { assert(inEventLoop) - try selector.register(selectable: channel.selectable, interested: channel.interestedEvent, attachment: channel) + try selector.register(selectable: channel.selectable, interested: channel.interestedEvent, makeRegistration: channel.registrationFor(interested:)) } - func deregister(channel: SelectableChannel) throws { + func deregister(channel: C) throws { assert(inEventLoop) try selector.deregister(selectable: channel.selectable) } - func reregister(channel: SelectableChannel) throws { + func reregister(channel: C) throws { assert(inEventLoop) try selector.reregister(selectable: channel.selectable, interested: channel.interestedEvent) } @@ -122,6 +146,32 @@ final class SelectableEventLoop : EventLoop { _ = try? selector.wakeup() } + private func handleEvent(_ ev: SelectorEvent, channel: C) { + guard handleEvents(channel) else { + return + } + + if ev.isWritable { + channel.writable() + + guard handleEvents(channel) else { + return + } + } + + if ev.isReadable { + channel.readable() + + guard handleEvents(channel) else { + return + } + } + + // Ensure we never reach here if the channel is not open anymore. + assert(channel.open) + + } + func run() throws { thread = pthread_self() defer { @@ -131,33 +181,12 @@ final class SelectableEventLoop : EventLoop { while !closed { // Block until there are events to handle or the selector was woken up try selector.whenReady(strategy: .block) { ev in - - guard let channel = ev.attachment as? SelectableChannel else { - fatalError("ev.attachment has type \(type(of: ev.attachment)), expected Channel") + switch ev.registration { + case .serverSocketChannel(let chan, _): + self.handleEvent(ev, channel: chan) + case .socketChannel(let chan, _): + self.handleEvent(ev, channel: chan) } - - guard handleEvents(channel) else { - return - } - - if ev.isWritable { - channel.writable() - - guard handleEvents(channel) else { - return - } - } - - if ev.isReadable { - channel.readable() - - guard handleEvents(channel) else { - return - } - } - - // Ensure we never reach here if the channel is not open anymore. - assert(channel.open) } // TODO: Better locking @@ -172,7 +201,7 @@ final class SelectableEventLoop : EventLoop { } } - private func handleEvents(_ channel: SelectableChannel) -> Bool { + private func handleEvents(_ channel: C) -> Bool { if channel.open { return true } diff --git a/Sources/Sockets/BaseSocket.swift b/Sources/Sockets/BaseSocket.swift index 75bdf7b7..2250c0ae 100644 --- a/Sources/Sockets/BaseSocket.swift +++ b/Sources/Sockets/BaseSocket.swift @@ -28,6 +28,9 @@ import Foundation let sysSOCK_STREAM = SOCK_STREAM #endif +public protocol Registration { + var interested: InterestedEvent { get set } +} public class BaseSocket : Selectable { public let descriptor: Int32 diff --git a/Sources/Sockets/Selectable.swift b/Sources/Sockets/Selectable.swift index b0f31a14..058ee9c5 100644 --- a/Sources/Sockets/Selectable.swift +++ b/Sources/Sockets/Selectable.swift @@ -19,5 +19,5 @@ public protocol Selectable { var open: Bool { get } - func close() throws; + func close() throws } diff --git a/Sources/Sockets/Selector.swift b/Sources/Sockets/Selector.swift index c87122d6..fe9bdfc5 100644 --- a/Sources/Sockets/Selector.swift +++ b/Sources/Sockets/Selector.swift @@ -22,22 +22,22 @@ import Foundation import Darwin #endif -public final class Selector { +public final class Selector { private var open: Bool -#if os(Linux) + #if os(Linux) private typealias EventType = epoll_event private let eventsCapacity = 2048 private let eventfd: Int32 -#else + #else private typealias EventType = kevent private let eventsCapacity = 2048 // TODO: Just reserve 0 is most likely not the best idea, need to think about a better way to handle this. - private static let EvUserIdent = UInt(0) -#endif + #endif + private let fd: Int32 private let events: UnsafeMutablePointer - private var registrations = [Int: Registration]() + private var registrations = [Int: R]() public init() throws { events = UnsafeMutablePointer.allocate(capacity: eventsCapacity) @@ -68,7 +68,7 @@ public final class Selector { self.open = true var event = kevent() - event.ident = Selector.EvUserIdent + event.ident = 0 event.filter = Int16(EVFILT_USER) event.fflags = UInt32(NOTE_FFNOP) event.data = 0 @@ -140,7 +140,7 @@ public final class Selector { } } - private func register_kqueue(selectable: Selectable, interested: InterestedEvent, oldInterested: InterestedEvent?) throws { + private func register_kqueue(selectable: S, interested: InterestedEvent, oldInterested: InterestedEvent?) throws { guard self.open else { throw IOError(errno: EBADF, reason: "can't register kqueue on selector as it's not open anymore.") } @@ -224,7 +224,7 @@ public final class Selector { } #endif - public func register(selectable: Selectable, interested: InterestedEvent = .read, attachment: AnyObject? = nil) throws { + public func register(selectable: S, interested: InterestedEvent = .read, makeRegistration: (InterestedEvent) -> R) throws { guard self.open else { throw IOError(errno: EBADF, reason: "can't register selector as it's not open anymore.") } @@ -241,10 +241,10 @@ public final class Selector { #else try register_kqueue(selectable: selectable, interested: interested, oldInterested: nil) #endif - registrations[Int(selectable.descriptor)] = Registration(selectable: selectable, interested: interested, attachment: attachment) + registrations[Int(selectable.descriptor)] = makeRegistration(interested) } - public func reregister(selectable: Selectable, interested: InterestedEvent) throws { + public func reregister(selectable: S, interested: InterestedEvent) throws { guard self.open else { throw IOError(errno: EBADF, reason: "can't re-register selector as it's not open anymore.") } @@ -266,7 +266,7 @@ public final class Selector { registrations[Int(selectable.descriptor)] = reg } - public func deregister(selectable: Selectable) throws { + public func deregister(selectable: S) throws { let reg = registrations.removeValue(forKey: Int(selectable.descriptor)) guard reg != nil else { return @@ -282,7 +282,7 @@ public final class Selector { } - public func whenReady(strategy: SelectorStrategy, _ fn: (SelectorEvent) throws -> Void) throws -> Void { + public func whenReady(strategy: SelectorStrategy, _ fn: (SelectorEvent) throws -> Void) throws -> Void { guard self.open else { throw IOError(errno: EBADF, reason: "can't call whenReady for selector as it's not open anymore.") } @@ -304,7 +304,7 @@ public final class Selector { SelectorEvent( isReadable: (ev.events & EPOLLIN.rawValue) != 0 || (ev.events & EPOLLERR.rawValue) != 0 || (ev.events & EPOLLRDHUP.rawValue) != 0, isWritable: (ev.events & EPOLLOUT.rawValue) != 0 || (ev.events & EPOLLERR.rawValue) != 0 || (ev.events & EPOLLRDHUP.rawValue) != 0, - selectable: registration.selectable, attachment: registration.attachment)) + registration: registration)) } } #else @@ -319,12 +319,12 @@ public final class Selector { } for i in 0.. { + public let registration: R public fileprivate(set) var isReadable: Bool public fileprivate(set) var isWritable: Bool - public fileprivate(set) var selectable: Selectable - public fileprivate(set) var attachment: AnyObject? - init(isReadable: Bool, isWritable: Bool, selectable: Selectable, attachment: AnyObject?) { + init(isReadable: Bool, isWritable: Bool, registration: R) { self.isReadable = isReadable self.isWritable = isWritable - self.selectable = selectable - self.attachment = attachment + self.registration = registration } } diff --git a/Sources/Sockets/Socket.swift b/Sources/Sockets/Socket.swift index 77f3e9b4..8d3cae8b 100644 --- a/Sources/Sockets/Socket.swift +++ b/Sources/Sockets/Socket.swift @@ -34,7 +34,6 @@ public typealias IOVector = iovec // TODO: scattering support public final class Socket : BaseSocket { - public static var writevLimit: Int { // UIO_MAXIOV is only exported on linux atm #if os(Linux) diff --git a/Sources/SocketsEchoServer/main.swift b/Sources/SocketsEchoServer/main.swift index cab73b16..ae7f8eef 100644 --- a/Sources/SocketsEchoServer/main.swift +++ b/Sources/SocketsEchoServer/main.swift @@ -32,14 +32,36 @@ public class Buffer { } } -func deregisterAndClose(selector: Sockets.Selector, s: Selectable) { +func deregisterAndClose(selector: Sockets.Selector, s: S) { do { try selector.deregister(selectable: s) } catch {} do { try s.close() } catch {} } +enum SocketRegistration: Registration { + case socket(Socket, Buffer, InterestedEvent) + case serverSocket(ServerSocket, Buffer?, InterestedEvent) + var interested: InterestedEvent { + get { + switch self { + case .socket(_, _, let i): + return i + case .serverSocket(_, _, let i): + return i + } + } + set { + switch self { + case .socket(let s, let b, _): + self = .socket(s, b, newValue) + case .serverSocket(let s, let b, _): + self = .serverSocket(s, b, newValue) + } + } + } +} // Bootstrap the server and create the Selector on which we register our sockets. -let selector = try Sockets.Selector() +let selector = try Sockets.Selector() defer { do { try selector.close() } catch { } @@ -50,7 +72,7 @@ try server.setNonBlocking() // this will register with InterestedEvent.READ and no attachment -try selector.register(selectable: server) +try selector.register(selectable: server) { i in .serverSocket(server, nil, i) } // cleanup defer { @@ -66,41 +88,38 @@ while true { if ev.isReadable { // We can handle either read(...) or accept() - if ev.selectable is Socket { + switch ev.registration { + case .socket(let socket, let buffer, _): // We stored the Buffer before as attachment so get it and clear the limit / offset. - let buffer = ev.attachment as! Buffer buffer.clear() - let s = ev.selectable as! Socket do { - switch try s.read(data: &buffer.data) { + switch try socket.read(data: &buffer.data) { case .processed(let read): buffer.limit = Int(read) - switch try s.write(data: buffer.data.subdata(in: buffer.offset..