sanitise time interval (aka `TimeAmount`) APIs
This commit is contained in:
parent
9a5390495e
commit
34e0b1ac0a
|
@ -15,6 +15,8 @@
|
|||
//
|
||||
//
|
||||
|
||||
import struct Dispatch.DispatchTime
|
||||
|
||||
/**
|
||||
ChannelHandler implementation which enforces back-pressure by stopping to read from the remote peer when it cannot write back fast enough.
|
||||
It will start reading again once pending data was written.
|
||||
|
@ -98,8 +100,8 @@ public class IdleStateHandler : ChannelInboundHandler, ChannelOutboundHandler {
|
|||
public let allTimeout: TimeAmount?
|
||||
|
||||
private var reading = false
|
||||
private var lastReadTime: TimeAmount?
|
||||
private var lastWriteCompleteTime: TimeAmount?
|
||||
private var lastReadTime: DispatchTime = DispatchTime(uptimeNanoseconds: 0)
|
||||
private var lastWriteCompleteTime: DispatchTime = DispatchTime(uptimeNanoseconds: 0)
|
||||
private var scheduledReaderTask: Scheduled<Void>?
|
||||
private var scheduledWriterTask: Scheduled<Void>?
|
||||
private var scheduledAllTask: Scheduled<Void>?
|
||||
|
@ -133,7 +135,7 @@ public class IdleStateHandler : ChannelInboundHandler, ChannelOutboundHandler {
|
|||
|
||||
public func channelReadComplete(ctx: ChannelHandlerContext) {
|
||||
if (readTimeout != nil || allTimeout != nil) && reading {
|
||||
lastReadTime = TimeAmount.now()
|
||||
lastReadTime = DispatchTime.now()
|
||||
reading = false
|
||||
}
|
||||
ctx.fireChannelReadComplete()
|
||||
|
@ -147,7 +149,7 @@ public class IdleStateHandler : ChannelInboundHandler, ChannelOutboundHandler {
|
|||
|
||||
let writePromise = promise ?? ctx.eventLoop.newPromise()
|
||||
writePromise.futureResult.whenComplete { _ in
|
||||
self.lastWriteCompleteTime = TimeAmount.now()
|
||||
self.lastWriteCompleteTime = DispatchTime.now()
|
||||
}
|
||||
ctx.write(data: data, promise: writePromise)
|
||||
}
|
||||
|
@ -170,7 +172,7 @@ public class IdleStateHandler : ChannelInboundHandler, ChannelOutboundHandler {
|
|||
return
|
||||
}
|
||||
|
||||
let diff = TimeAmount.now().nanoseconds - (self.lastReadTime?.nanoseconds ?? 0)
|
||||
let diff = Int(DispatchTime.now().uptimeNanoseconds) - Int(self.lastReadTime.uptimeNanoseconds)
|
||||
if diff >= timeout.nanoseconds {
|
||||
// Reader is idle - set a new timeout and trigger an event through the pipeline
|
||||
self.scheduledReaderTask = ctx.eventLoop.scheduleTask(in: timeout, self.newReadTimeoutTask(ctx, timeout))
|
||||
|
@ -189,8 +191,8 @@ public class IdleStateHandler : ChannelInboundHandler, ChannelOutboundHandler {
|
|||
return
|
||||
}
|
||||
|
||||
let lastWriteTime = self.lastWriteCompleteTime?.nanoseconds ?? 0
|
||||
let diff = TimeAmount.now().nanoseconds - lastWriteTime
|
||||
let lastWriteTime = self.lastWriteCompleteTime
|
||||
let diff = DispatchTime.now().uptimeNanoseconds - lastWriteTime.uptimeNanoseconds
|
||||
|
||||
if diff >= timeout.nanoseconds {
|
||||
// Writer is idle - set a new timeout and notify the callback.
|
||||
|
@ -199,7 +201,7 @@ public class IdleStateHandler : ChannelInboundHandler, ChannelOutboundHandler {
|
|||
ctx.fireUserInboundEventTriggered(event: IdleStateEvent.write)
|
||||
} else {
|
||||
// Write occurred before the timeout - set a new timeout with shorter delay.
|
||||
self.scheduledWriterTask = ctx.eventLoop.scheduleTask(in: .nanoseconds(timeout.nanoseconds - diff), self.newWriteTimeoutTask(ctx, timeout))
|
||||
self.scheduledWriterTask = ctx.eventLoop.scheduleTask(in: .nanoseconds(Int(timeout.nanoseconds) - Int(diff)), self.newWriteTimeoutTask(ctx, timeout))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -214,10 +216,10 @@ public class IdleStateHandler : ChannelInboundHandler, ChannelOutboundHandler {
|
|||
self.scheduledReaderTask = ctx.eventLoop.scheduleTask(in: timeout, self.newAllTimeoutTask(ctx, timeout))
|
||||
return
|
||||
}
|
||||
let lastRead = self.lastReadTime?.nanoseconds ?? 0
|
||||
let lastWrite = self.lastWriteCompleteTime?.nanoseconds ?? 0
|
||||
let lastRead = self.lastReadTime
|
||||
let lastWrite = self.lastWriteCompleteTime
|
||||
|
||||
let diff = TimeAmount.now().nanoseconds - (lastRead > lastWrite ? lastRead : lastWrite)
|
||||
let diff = Int(DispatchTime.now().uptimeNanoseconds) - Int((lastRead > lastWrite ? lastRead : lastWrite).uptimeNanoseconds)
|
||||
if diff >= timeout.nanoseconds {
|
||||
// Reader is idle - set a new timeout and trigger an event through the pipeline
|
||||
self.scheduledReaderTask = ctx.eventLoop.scheduleTask(in: timeout, self.newAllTimeoutTask(ctx, timeout))
|
||||
|
@ -225,7 +227,7 @@ public class IdleStateHandler : ChannelInboundHandler, ChannelOutboundHandler {
|
|||
ctx.fireUserInboundEventTriggered(event: IdleStateEvent.all)
|
||||
} else {
|
||||
// Read occurred before the timeout - set a new timeout with shorter delay.
|
||||
self.scheduledReaderTask = ctx.eventLoop.scheduleTask(in: .nanoseconds(timeout.nanoseconds - diff), self.newAllTimeoutTask(ctx, timeout))
|
||||
self.scheduledReaderTask = ctx.eventLoop.scheduleTask(in: .nanoseconds(Int(timeout.nanoseconds) - diff), self.newAllTimeoutTask(ctx, timeout))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -238,7 +240,7 @@ public class IdleStateHandler : ChannelInboundHandler, ChannelOutboundHandler {
|
|||
}
|
||||
|
||||
private func initIdleTasks(_ ctx: ChannelHandlerContext) {
|
||||
let now = TimeAmount.now()
|
||||
let now = DispatchTime.now()
|
||||
lastReadTime = now
|
||||
lastWriteCompleteTime = now
|
||||
scheduledReaderTask = schedule(ctx, readTimeout, newReadTimeoutTask)
|
||||
|
|
|
@ -118,12 +118,14 @@ public protocol EventLoop: EventLoopGroup {
|
|||
func newSucceedFuture<T>(result: T) -> EventLoopFuture<T>
|
||||
}
|
||||
|
||||
/// Represent an amount of time since the start of the system.
|
||||
/// Represents a time _interval_.
|
||||
///
|
||||
/// - note: `TimeAmount` should not be used to represent a point in time.
|
||||
public struct TimeAmount {
|
||||
/// The nanoseconds representation of the `TimeAmount`.
|
||||
public let nanoseconds: UInt64
|
||||
public let nanoseconds: Int
|
||||
|
||||
private init(_ nanoseconds: UInt64) {
|
||||
private init(_ nanoseconds: Int) {
|
||||
self.nanoseconds = nanoseconds
|
||||
}
|
||||
|
||||
|
@ -132,7 +134,7 @@ public struct TimeAmount {
|
|||
/// - parameters:
|
||||
/// - amount: the amount of nanoseconds this `TimeAmount` represents.
|
||||
/// - returns: the `TimeAmount` for the given amount.
|
||||
public static func nanoseconds(_ amount: UInt64) -> TimeAmount {
|
||||
public static func nanoseconds(_ amount: Int) -> TimeAmount {
|
||||
return TimeAmount(amount)
|
||||
}
|
||||
|
||||
|
@ -141,7 +143,7 @@ public struct TimeAmount {
|
|||
/// - parameters:
|
||||
/// - amount: the amount of microseconds this `TimeAmount` represents.
|
||||
/// - returns: the `TimeAmount` for the given amount.
|
||||
public static func microseconds(_ amount: UInt64) -> TimeAmount {
|
||||
public static func microseconds(_ amount: Int) -> TimeAmount {
|
||||
return TimeAmount(amount * 1000)
|
||||
}
|
||||
|
||||
|
@ -151,7 +153,7 @@ public struct TimeAmount {
|
|||
/// - amount: the amount of milliseconds this `TimeAmount` represents.
|
||||
/// - returns: the `TimeAmount` for the given amount.
|
||||
public static func milliseconds(_ amount: Int) -> TimeAmount {
|
||||
return TimeAmount(UInt64(amount) * 1000 * 1000)
|
||||
return TimeAmount(amount * 1000 * 1000)
|
||||
}
|
||||
|
||||
/// Creates a new `TimeAmount` for the given amount of seconds.
|
||||
|
@ -160,7 +162,7 @@ public struct TimeAmount {
|
|||
/// - amount: the amount of seconds this `TimeAmount` represents.
|
||||
/// - returns: the `TimeAmount` for the given amount.
|
||||
public static func seconds(_ amount: Int) -> TimeAmount {
|
||||
return TimeAmount(UInt64(amount) * 1000 * 1000 * 1000)
|
||||
return TimeAmount(amount * 1000 * 1000 * 1000)
|
||||
}
|
||||
|
||||
/// Creates a new `TimeAmount` for the given amount of minutes.
|
||||
|
@ -169,7 +171,7 @@ public struct TimeAmount {
|
|||
/// - amount: the amount of minutes this `TimeAmount` represents.
|
||||
/// - returns: the `TimeAmount` for the given amount.
|
||||
public static func minutes(_ amount: Int) -> TimeAmount {
|
||||
return TimeAmount(UInt64(amount) * 1000 * 1000 * 1000 * 60)
|
||||
return TimeAmount(amount * 1000 * 1000 * 1000 * 60)
|
||||
}
|
||||
|
||||
/// Creates a new `TimeAmount` for the given amount of hours.
|
||||
|
@ -178,14 +180,7 @@ public struct TimeAmount {
|
|||
/// - amount: the amount of hours this `TimeAmount` represents.
|
||||
/// - returns: the `TimeAmount` for the given amount.
|
||||
public static func hours(_ amount: Int) -> TimeAmount {
|
||||
return TimeAmount(UInt64(amount) * 1000 * 1000 * 1000 * 60 * 60)
|
||||
}
|
||||
|
||||
/// Creates a new `TimeAmount` represent the current point in time.
|
||||
///
|
||||
/// - returns: the `TimeAmount` for the current time.
|
||||
public static func now() -> TimeAmount {
|
||||
return nanoseconds(DispatchTime.now().uptimeNanoseconds)
|
||||
return TimeAmount(amount * 1000 * 1000 * 1000 * 60 * 60)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -432,13 +427,13 @@ internal final class SelectableEventLoop : EventLoop {
|
|||
return .block
|
||||
}
|
||||
|
||||
let nanos: UInt64 = sched.readyIn(DispatchTime.now())
|
||||
let nextReady = sched.readyIn(DispatchTime.now())
|
||||
|
||||
if nanos == 0 {
|
||||
if nextReady <= .nanoseconds(0) {
|
||||
// Something is ready to be processed just do a non-blocking select of events.
|
||||
return .now
|
||||
} else {
|
||||
return .blockUntilTimeout(nanoseconds: nanos)
|
||||
return .blockUntilTimeout(nextReady)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -488,7 +483,7 @@ internal final class SelectableEventLoop : EventLoop {
|
|||
let now = DispatchTime.now()
|
||||
|
||||
// Make a copy of the tasks so we can execute these while not holding the lock anymore
|
||||
while let task = scheduledTasks.peek(), task.readyIn(now) == 0 {
|
||||
while let task = scheduledTasks.peek(), task.readyIn(now) <= .nanoseconds(0) {
|
||||
tasksCopy.append(task.task)
|
||||
|
||||
let _ = scheduledTasks.pop()
|
||||
|
@ -712,19 +707,19 @@ final public class MultiThreadedEventLoopGroup : EventLoopGroup {
|
|||
private final class ScheduledTask {
|
||||
let task: () -> ()
|
||||
private let failFn: (Error) ->()
|
||||
private let readyTime: UInt64
|
||||
private let readyTime: Int
|
||||
|
||||
init(_ task: @escaping () -> (), _ failFn: @escaping (Error) -> (), _ time: TimeAmount) {
|
||||
self.task = task
|
||||
self.failFn = failFn
|
||||
self.readyTime = time.nanoseconds + DispatchTime.now().uptimeNanoseconds
|
||||
self.readyTime = time.nanoseconds + Int(DispatchTime.now().uptimeNanoseconds)
|
||||
}
|
||||
|
||||
func readyIn(_ t: DispatchTime) -> UInt64 {
|
||||
func readyIn(_ t: DispatchTime) -> TimeAmount {
|
||||
if readyTime < t.uptimeNanoseconds {
|
||||
return 0
|
||||
return .nanoseconds(0)
|
||||
}
|
||||
return readyTime - t.uptimeNanoseconds
|
||||
return .nanoseconds(readyTime - Int(t.uptimeNanoseconds))
|
||||
}
|
||||
|
||||
func fail(error: Error) {
|
||||
|
|
|
@ -24,6 +24,15 @@ private enum SelectorLifecycleState {
|
|||
case closed
|
||||
}
|
||||
|
||||
private extension timespec {
|
||||
init(timeAmount amount: TimeAmount) {
|
||||
let nsecPerSec: Int = 1_000_000_000
|
||||
let ns = amount.nanoseconds
|
||||
self.tv_sec = ns / nsecPerSec
|
||||
self.tv_nsec = ns - self.tv_sec * nsecPerSec
|
||||
}
|
||||
}
|
||||
|
||||
/* this is deliberately not thread-safe, only the wakeup() function may be called unprotectedly */
|
||||
final class Selector<R: Registration> {
|
||||
private var lifecycleState: SelectorLifecycleState
|
||||
|
@ -142,7 +151,7 @@ final class Selector<R: Registration> {
|
|||
case .now:
|
||||
return timespec(tv_sec: 0, tv_nsec: 0)
|
||||
case .blockUntilTimeout(let nanoseconds):
|
||||
return toTimerspec(nanoseconds)
|
||||
return timespec(timeAmount: nanoseconds)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -308,9 +317,9 @@ final class Selector<R: Registration> {
|
|||
switch strategy {
|
||||
case .now:
|
||||
ready = Int(try Epoll.epoll_wait(epfd: self.fd, events: events, maxevents: Int32(eventsCapacity), timeout: 0))
|
||||
case .blockUntilTimeout(let nanoseconds):
|
||||
case .blockUntilTimeout(let timeAmount):
|
||||
var ts = itimerspec()
|
||||
ts.it_value = toTimerspec(nanoseconds)
|
||||
ts.it_value = timespec(timeAmount: timeAmount)
|
||||
try TimerFd.timerfd_settime(fd: timerfd, flags: 0, newValue: &ts, oldValue: nil)
|
||||
fallthrough
|
||||
case .block:
|
||||
|
@ -374,12 +383,6 @@ final class Selector<R: Registration> {
|
|||
#endif
|
||||
}
|
||||
|
||||
private func toTimerspec(_ nanoseconds: UInt64) -> timespec {
|
||||
let delaySeconds = nanoseconds / 1000000000
|
||||
let delayNanoSeconds = nanoseconds - delaySeconds * 1000000000
|
||||
return timespec(tv_sec: Int(delaySeconds), tv_nsec: Int(delayNanoSeconds))
|
||||
}
|
||||
|
||||
public func close() throws {
|
||||
guard self.lifecycleState == .open else {
|
||||
throw IOError(errnoCode: EBADF, reason: "can't close selector as it's \(self.lifecycleState).")
|
||||
|
@ -463,7 +466,7 @@ internal extension Selector where R == NIORegistration {
|
|||
|
||||
enum SelectorStrategy {
|
||||
case block
|
||||
case blockUntilTimeout(nanoseconds: UInt64)
|
||||
case blockUntilTimeout(TimeAmount)
|
||||
case now
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue