Rename InterestedEvent to IOEvent and reuse it in SelectorEvent to make things more consistent
This commit is contained in:
parent
55fad73cd6
commit
6e55eb0ade
|
@ -369,7 +369,7 @@ final class SocketChannel : BaseSocketChannel<Socket> {
|
|||
try super.init(socket: socket, eventLoop: eventLoop)
|
||||
}
|
||||
|
||||
public override func registrationFor(interested: InterestedEvent) -> NIORegistration {
|
||||
public override func registrationFor(interested: IOEvent) -> NIORegistration {
|
||||
return .socketChannel(self, interested)
|
||||
}
|
||||
|
||||
|
@ -470,7 +470,7 @@ final class ServerSocketChannel : BaseSocketChannel<ServerSocket> {
|
|||
try super.init(socket: serverSocket, eventLoop: eventLoop)
|
||||
}
|
||||
|
||||
public override func registrationFor(interested: InterestedEvent) -> NIORegistration {
|
||||
public override func registrationFor(interested: IOEvent) -> NIORegistration {
|
||||
return .serverSocketChannel(self, interested)
|
||||
}
|
||||
|
||||
|
@ -592,12 +592,12 @@ protocol SelectableChannel : Channel {
|
|||
associatedtype SelectableType: Selectable
|
||||
|
||||
var selectable: SelectableType { get }
|
||||
var interestedEvent: InterestedEvent { get }
|
||||
var interestedEvent: IOEvent { get }
|
||||
|
||||
func writable()
|
||||
func readable()
|
||||
|
||||
func registrationFor(interested: InterestedEvent) -> NIORegistration
|
||||
func registrationFor(interested: IOEvent) -> NIORegistration
|
||||
}
|
||||
|
||||
extension Channel {
|
||||
|
@ -648,7 +648,7 @@ extension Channel {
|
|||
class BaseSocketChannel<T : BaseSocket> : SelectableChannel, ChannelCore {
|
||||
typealias SelectableType = T
|
||||
|
||||
func registrationFor(interested: InterestedEvent) -> NIORegistration {
|
||||
func registrationFor(interested: IOEvent) -> NIORegistration {
|
||||
fatalError("must override")
|
||||
}
|
||||
|
||||
|
@ -660,7 +660,7 @@ class BaseSocketChannel<T : BaseSocket> : SelectableChannel, ChannelCore {
|
|||
|
||||
// Visible to access from EventLoop directly
|
||||
let socket: T
|
||||
public var interestedEvent: InterestedEvent = .none
|
||||
public var interestedEvent: IOEvent = .none
|
||||
|
||||
public final var closed: Bool {
|
||||
return pendingWrites.closed
|
||||
|
@ -1075,7 +1075,7 @@ class BaseSocketChannel<T : BaseSocket> : SelectableChannel, ChannelCore {
|
|||
}
|
||||
}
|
||||
|
||||
private func safeReregister(interested: InterestedEvent) {
|
||||
private func safeReregister(interested: IOEvent) {
|
||||
guard !closed else {
|
||||
interestedEvent = .none
|
||||
return
|
||||
|
@ -1093,7 +1093,7 @@ class BaseSocketChannel<T : BaseSocket> : SelectableChannel, ChannelCore {
|
|||
}
|
||||
}
|
||||
|
||||
private func safeRegister(interested: InterestedEvent) -> Bool {
|
||||
private func safeRegister(interested: IOEvent) -> Bool {
|
||||
guard !closed else {
|
||||
interestedEvent = .none
|
||||
return false
|
||||
|
|
|
@ -62,10 +62,10 @@ extension EventLoop {
|
|||
}
|
||||
|
||||
enum NIORegistration: Registration {
|
||||
case serverSocketChannel(ServerSocketChannel, InterestedEvent)
|
||||
case socketChannel(SocketChannel, InterestedEvent)
|
||||
case serverSocketChannel(ServerSocketChannel, IOEvent)
|
||||
case socketChannel(SocketChannel, IOEvent)
|
||||
|
||||
var interested: InterestedEvent {
|
||||
var interested: IOEvent {
|
||||
set {
|
||||
switch self {
|
||||
case .serverSocketChannel(let c, _):
|
||||
|
@ -142,25 +142,29 @@ final class SelectableEventLoop : EventLoop {
|
|||
_ = try? selector.wakeup()
|
||||
}
|
||||
|
||||
private func handleEvent<R: Registration, C: SelectableChannel>(_ ev: SelectorEvent<R>, channel: C) {
|
||||
private func handleEvent<C: SelectableChannel>(_ ev: IOEvent, channel: C) {
|
||||
guard handleEvents(channel) else {
|
||||
return
|
||||
}
|
||||
|
||||
if ev.isWritable {
|
||||
|
||||
switch ev {
|
||||
case .write:
|
||||
channel.writable()
|
||||
|
||||
guard handleEvents(channel) else {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if ev.isReadable {
|
||||
case .read:
|
||||
channel.readable()
|
||||
|
||||
case .all:
|
||||
channel.writable()
|
||||
guard handleEvents(channel) else {
|
||||
return
|
||||
}
|
||||
channel.readable()
|
||||
case .none:
|
||||
// spurious wakup
|
||||
break
|
||||
|
||||
}
|
||||
guard handleEvents(channel) else {
|
||||
return
|
||||
}
|
||||
|
||||
// Ensure we never reach here if the channel is not open anymore.
|
||||
|
@ -179,9 +183,9 @@ final class SelectableEventLoop : EventLoop {
|
|||
try selector.whenReady(strategy: .block) { ev in
|
||||
switch ev.registration {
|
||||
case .serverSocketChannel(let chan, _):
|
||||
self.handleEvent(ev, channel: chan)
|
||||
self.handleEvent(ev.io, channel: chan)
|
||||
case .socketChannel(let chan, _):
|
||||
self.handleEvent(ev, channel: chan)
|
||||
self.handleEvent(ev.io, channel: chan)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ import Foundation
|
|||
#endif
|
||||
|
||||
public protocol Registration {
|
||||
var interested: InterestedEvent { get set }
|
||||
var interested: IOEvent { get set }
|
||||
}
|
||||
|
||||
public class BaseSocket : Selectable {
|
||||
|
|
|
@ -117,7 +117,7 @@ public final class Selector<R: Registration> {
|
|||
}
|
||||
}
|
||||
|
||||
private static func toEpollEvents(interested: InterestedEvent) -> UInt32 {
|
||||
private static func toEpollEvents(interested: IOEvent) -> UInt32 {
|
||||
// Also merge EPOLLRDHUP in so we can easily detect connection-reset
|
||||
switch interested {
|
||||
case .read:
|
||||
|
@ -159,7 +159,7 @@ public final class Selector<R: Registration> {
|
|||
}
|
||||
}
|
||||
|
||||
private func register_kqueue<S: Selectable>(selectable: S, interested: InterestedEvent, oldInterested: InterestedEvent?) throws {
|
||||
private func register_kqueue<S: Selectable>(selectable: S, interested: IOEvent, oldInterested: IOEvent?) throws {
|
||||
guard self.open else {
|
||||
throw IOError(errno: EBADF, reason: "can't register kqueue on selector as it's not open anymore.")
|
||||
}
|
||||
|
@ -243,7 +243,7 @@ public final class Selector<R: Registration> {
|
|||
}
|
||||
#endif
|
||||
|
||||
public func register<S: Selectable>(selectable: S, interested: InterestedEvent = .read, makeRegistration: (InterestedEvent) -> R) throws {
|
||||
public func register<S: Selectable>(selectable: S, interested: IOEvent = .read, makeRegistration: (IOEvent) -> R) throws {
|
||||
guard self.open else {
|
||||
throw IOError(errno: EBADF, reason: "can't register selector as it's not open anymore.")
|
||||
}
|
||||
|
@ -263,7 +263,7 @@ public final class Selector<R: Registration> {
|
|||
registrations[Int(selectable.descriptor)] = makeRegistration(interested)
|
||||
}
|
||||
|
||||
public func reregister<S: Selectable>(selectable: S, interested: InterestedEvent) throws {
|
||||
public func reregister<S: Selectable>(selectable: S, interested: IOEvent) throws {
|
||||
guard self.open else {
|
||||
throw IOError(errno: EBADF, reason: "can't re-register selector as it's not open anymore.")
|
||||
}
|
||||
|
@ -390,14 +390,17 @@ public final class Selector<R: Registration> {
|
|||
}
|
||||
|
||||
public struct SelectorEvent<R> {
|
||||
|
||||
public let registration: R
|
||||
public fileprivate(set) var isReadable: Bool
|
||||
public fileprivate(set) var isWritable: Bool
|
||||
|
||||
public let io: IOEvent
|
||||
|
||||
init(isReadable: Bool, isWritable: Bool, registration: R) {
|
||||
self.isReadable = isReadable
|
||||
self.isWritable = isWritable
|
||||
if isReadable {
|
||||
io = isWritable ? .all : .read
|
||||
} else if isWritable {
|
||||
io = .write
|
||||
} else {
|
||||
io = .none
|
||||
}
|
||||
self.registration = registration
|
||||
}
|
||||
}
|
||||
|
@ -408,7 +411,7 @@ public enum SelectorStrategy {
|
|||
case now
|
||||
}
|
||||
|
||||
public enum InterestedEvent {
|
||||
public enum IOEvent {
|
||||
case read
|
||||
case write
|
||||
case all
|
||||
|
|
|
@ -38,9 +38,9 @@ func deregisterAndClose<S: Selectable, R>(selector: Sockets.Selector<R>, s: S) {
|
|||
}
|
||||
|
||||
enum SocketRegistration: Registration {
|
||||
case socket(Socket, Buffer, InterestedEvent)
|
||||
case serverSocket(ServerSocket, Buffer?, InterestedEvent)
|
||||
var interested: InterestedEvent {
|
||||
case socket(Socket, Buffer, IOEvent)
|
||||
case serverSocket(ServerSocket, Buffer?, IOEvent)
|
||||
var interested: IOEvent {
|
||||
get {
|
||||
switch self {
|
||||
case .socket(_, _, let i):
|
||||
|
@ -85,8 +85,8 @@ try server.setOption(level: SOL_SOCKET, name: SO_REUSEADDR, value: 1)
|
|||
while true {
|
||||
// Block until there are events to handle
|
||||
try! selector.whenReady(strategy: .block) { ev in
|
||||
if ev.isReadable {
|
||||
|
||||
switch ev.io {
|
||||
case .read:
|
||||
// We can handle either read(...) or accept()
|
||||
switch ev.registration {
|
||||
case .socket(let socket, let buffer, _):
|
||||
|
@ -105,13 +105,13 @@ while true {
|
|||
// We could not write everything so we reregister with InterestedEvent.Write and so get woken up once the socket becomes writable again.
|
||||
// This also ensure we not read anymore until we were able to echo it back (backpressure FTW).
|
||||
if buffer.offset < buffer.limit {
|
||||
try selector.reregister(selectable: socket, interested: InterestedEvent.write)
|
||||
try selector.reregister(selectable: socket, interested: IOEvent.write)
|
||||
}
|
||||
|
||||
case .wouldBlock:
|
||||
// We could not write everything so we reregister with InterestedEvent.Write and so get woken up once the socket becomes writable again.
|
||||
// This also ensure we not read anymore until we were able to echo it back (backpressure FTW).
|
||||
try selector.reregister(selectable: socket, interested: InterestedEvent.write)
|
||||
try selector.reregister(selectable: socket, interested: IOEvent.write)
|
||||
}
|
||||
case .wouldBlock:
|
||||
()
|
||||
|
@ -131,7 +131,7 @@ while true {
|
|||
try selector.register(selectable: accepted) { i in .socket(accepted, buffer, i) }
|
||||
}
|
||||
}
|
||||
} else if ev.isWritable {
|
||||
case .write:
|
||||
switch ev.registration {
|
||||
case .socket(let socket, let buffer, _):
|
||||
do {
|
||||
|
@ -141,7 +141,7 @@ while true {
|
|||
|
||||
if buffer.offset == buffer.limit {
|
||||
// Everything was written, reregister again with InterestedEvent.Read so we are notified once there is more data on the socket to read.
|
||||
try selector.reregister(selectable: socket, interested: InterestedEvent.read)
|
||||
try selector.reregister(selectable: socket, interested: IOEvent.read)
|
||||
}
|
||||
case .wouldBlock:
|
||||
()
|
||||
|
@ -152,6 +152,12 @@ while true {
|
|||
default:
|
||||
fatalError("internal error: writable server socket")
|
||||
}
|
||||
|
||||
case .none:
|
||||
break
|
||||
case .all:
|
||||
fatalError("not expected")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue