Add cancel notification to RepeatedTask (#783)

* Add cancel notification to RepeatedTask

Motivation:

When cancelling a RepeatedTask, it is often desirable to
wait until any still in-progress execution of the task is
finished - for example, to know when a periodic state save
is done before shutting down. The current API provides no
easy way to perform such a wait. With this addition,
repeating tasks can report completion of a cancellation
request regardless of in what state the task was in when the
request was issued.

Modifications:

A default-nil parameter is added to EventLoop.scheduleRepeatedTask(),
to which a promise can be provided; the promise will be
fulfilled at the time of the task's cancellation, plus any
time required for the task to complete running if it was
cancelled during execution. The promise is signaled even if
the cancellation is the result of the task throwing an error.
A default-nil paramter is also added to RepeatedTask.cancel(),
to which a second promise can be passed. This promise is
fulfilled under the same conditions as the first, even if the
first was not given; neither of the promises is dependent
upon the other and both are fulfilled at the same time if
provided. Further calls to cancel() with different promises
will result in fulfillment of each of those promises as well.

Result:

The observed behavior of RepeatedTask.cancel() now optionally
includes deterministic knowledge of when the task is no longer
executing or scheduled to be executed. The API changes are
designed to be additive in form, but the function signatures
do change. Source compatibility is maintained; binary
compatibility is not.
This commit is contained in:
Gwynne Raskind 2019-01-29 07:19:12 -06:00 committed by Johannes Weiss
parent 2ad1e21512
commit 8222af4aad
3 changed files with 114 additions and 10 deletions

View File

@ -56,12 +56,14 @@ public struct Scheduled<T> {
public final class RepeatedTask {
private let delay: TimeAmount
private let eventLoop: EventLoop
private let cancellationPromise: EventLoopPromise<Void>?
private var scheduled: Scheduled<EventLoopFuture<Void>>?
private var task: ((RepeatedTask) -> EventLoopFuture<Void>)?
internal init(interval: TimeAmount, eventLoop: EventLoop, task: @escaping (RepeatedTask) -> EventLoopFuture<Void>) {
internal init(interval: TimeAmount, eventLoop: EventLoop, cancellationPromise: EventLoopPromise<Void>? = nil, task: @escaping (RepeatedTask) -> EventLoopFuture<Void>) {
self.delay = interval
self.eventLoop = eventLoop
self.cancellationPromise = cancellationPromise
self.task = task
}
@ -92,21 +94,45 @@ public final class RepeatedTask {
/// This means immediate cancellation is not guaranteed.
///
/// The safest way to cancel is by using the passed reference of `RepeatedTask` inside the task closure.
public func cancel() {
///
/// If the promise parameter is not `nil`, the passed promise is fulfilled when cancellation is complete.
/// Passing a promise does not prevent fulfillment of any promise provided on original task creation.
public func cancel(promise: EventLoopPromise<Void>? = nil) {
if self.eventLoop.inEventLoop {
self.cancel0()
self.cancel0(localCancellationPromise: promise)
} else {
self.eventLoop.execute {
self.cancel0()
self.cancel0(localCancellationPromise: promise)
}
}
}
private func cancel0() {
private func cancel0(localCancellationPromise: EventLoopPromise<Void>?) {
self.eventLoop.assertInEventLoop()
self.scheduled?.cancel()
self.scheduled = nil
self.task = nil
// Possible states at this time are:
// 1) Task is scheduled but has not yet executed.
// 2) Task is currently executing and invoked `cancel()` on itself.
// 3) Task is currently executing and `cancel0()` has been reentrantly invoked.
// 4) NOT VALID: Task is currently executing and has NOT invoked `cancel()` (`EventLoop` guarantees serial execution)
// 5) NOT VALID: Task has completed execution in a success state (`reschedule()` ensures state #2).
// 6) Task has completed execution in a failure state.
// 7) Task has been fully cancelled at a previous time.
//
// It is desirable that the task has fully completed any execution before any cancellation promise is
// fulfilled. States 2 and 3 occur during execution, so the requirement is implemented by deferring
// fulfillment to the next `EventLoop` cycle. The delay is harmless to other states and distinguishing
// them from 2 and 3 is not practical (or necessarily possible), so is used unconditionally. Check the
// promises for nil so as not to otherwise invoke `execute()` unnecessarily.
if self.cancellationPromise != nil || localCancellationPromise != nil {
self.eventLoop.execute {
self.cancellationPromise?.succeed(())
localCancellationPromise?.succeed(())
}
}
}
private func reschedule() {
@ -122,7 +148,7 @@ public final class RepeatedTask {
}
scheduled.futureResult.whenFailure { (_: Error) in
self.cancel0()
self.cancel0(localCancellationPromise: nil)
}
}
@ -441,10 +467,11 @@ extension EventLoop {
/// - parameters:
/// - initialDelay: The delay after which the first task is executed.
/// - delay: The delay between the end of one task and the start of the next.
/// - promise: If non-nil, a promise to fulfill when the task is cancelled and all execution is complete.
/// - task: The closure that will be executed.
/// - return: `RepeatedTask`
@discardableResult
public func scheduleRepeatedTask(initialDelay: TimeAmount, delay: TimeAmount, _ task: @escaping (RepeatedTask) throws -> Void) -> RepeatedTask {
public func scheduleRepeatedTask(initialDelay: TimeAmount, delay: TimeAmount, notifying promise: EventLoopPromise<Void>? = nil, _ task: @escaping (RepeatedTask) throws -> Void) -> RepeatedTask {
let futureTask: (RepeatedTask) -> EventLoopFuture<Void> = { repeatedTask in
do {
try task(repeatedTask)
@ -453,7 +480,7 @@ extension EventLoop {
return self.makeFailedFuture(error)
}
}
return self.scheduleRepeatedTask(initialDelay: initialDelay, delay: delay, futureTask)
return self.scheduleRepeatedTask(initialDelay: initialDelay, delay: delay, notifying: promise, futureTask)
}
/// Schedule a repeated task to be executed by the `EventLoop` with a fixed delay between the end and start of each task.
@ -461,11 +488,12 @@ extension EventLoop {
/// - parameters:
/// - initialDelay: The delay after which the first task is executed.
/// - delay: The delay between the end of one task and the start of the next.
/// - promise: If non-nil, a promise to fulfill when the task is cancelled and all execution is complete.
/// - task: The closure that will be executed.
/// - return: `RepeatedTask`
@discardableResult
public func scheduleRepeatedTask(initialDelay: TimeAmount, delay: TimeAmount, _ task: @escaping (RepeatedTask) -> EventLoopFuture<Void>) -> RepeatedTask {
let repeated = RepeatedTask(interval: delay, eventLoop: self, task: task)
public func scheduleRepeatedTask(initialDelay: TimeAmount, delay: TimeAmount, notifying promise: EventLoopPromise<Void>? = nil, _ task: @escaping (RepeatedTask) -> EventLoopFuture<Void>) -> RepeatedTask {
let repeated = RepeatedTask(interval: delay, eventLoop: self, cancellationPromise: promise, task: task)
repeated.begin(in: initialDelay)
return repeated
}

View File

@ -46,6 +46,9 @@ extension EventLoopTest {
("testShutdownWhileScheduledTasksNotReady", testShutdownWhileScheduledTasksNotReady),
("testCloseFutureNotifiedBeforeUnblock", testCloseFutureNotifiedBeforeUnblock),
("testScheduleMultipleTasks", testScheduleMultipleTasks),
("testRepeatedTaskThatIsImmediatelyCancelledNotifies", testRepeatedTaskThatIsImmediatelyCancelledNotifies),
("testRepeatedTaskThatIsCancelledAfterRunningAtLeastTwiceNotifies", testRepeatedTaskThatIsCancelledAfterRunningAtLeastTwiceNotifies),
("testRepeatedTaskThatCancelsItselfNotifiesOnlyWhenFinished", testRepeatedTaskThatCancelsItselfNotifiesOnlyWhenFinished),
]
}
}

View File

@ -570,4 +570,77 @@ public class EventLoopTest : XCTestCase {
XCTAssertTrue(result.isEmpty)
}
public func testRepeatedTaskThatIsImmediatelyCancelledNotifies() throws {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully())
}
let loop = eventLoopGroup.next()
let promise1: EventLoopPromise<Void> = loop.makePromise()
let promise2: EventLoopPromise<Void> = loop.makePromise()
let expect1 = XCTestExpectation(description: "Initializer promise was fulfilled")
let expect2 = XCTestExpectation(description: "Cancellation-specific promise was fulfilled")
promise1.futureResult.whenSuccess { expect1.fulfill() }
promise2.futureResult.whenSuccess { expect2.fulfill() }
loop.execute {
let task = loop.scheduleRepeatedTask(initialDelay: .milliseconds(0), delay: .milliseconds(0), notifying: promise1) { task in
XCTFail()
}
task.cancel(promise: promise2)
}
Thread.sleep(until: .init(timeIntervalSinceNow: 0.1))
let res = XCTWaiter.wait(for: [expect1, expect2], timeout: 1.0)
XCTAssertEqual(res, .completed)
}
public func testRepeatedTaskThatIsCancelledAfterRunningAtLeastTwiceNotifies() throws {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully())
}
let loop = eventLoopGroup.next()
let promise1: EventLoopPromise<Void> = loop.makePromise()
let promise2: EventLoopPromise<Void> = loop.makePromise()
let expectRuns = XCTestExpectation(description: "Repeated task has run")
expectRuns.expectedFulfillmentCount = 2
let task = loop.scheduleRepeatedTask(initialDelay: .milliseconds(0), delay: .milliseconds(10), notifying: promise1) { task in
expectRuns.fulfill()
}
XCTAssertEqual(XCTWaiter.wait(for: [expectRuns], timeout: 0.05), .completed)
let expect1 = XCTestExpectation(description: "Initializer promise was fulfilled")
let expect2 = XCTestExpectation(description: "Cancellation-specific promise was fulfilled")
promise1.futureResult.whenSuccess { expect1.fulfill() }
promise2.futureResult.whenSuccess { expect2.fulfill() }
task.cancel(promise: promise2)
XCTAssertEqual(XCTWaiter.wait(for: [expect1, expect2], timeout: 1.0), .completed)
}
public func testRepeatedTaskThatCancelsItselfNotifiesOnlyWhenFinished() throws {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully())
}
let loop = eventLoopGroup.next()
let promise1: EventLoopPromise<Void> = loop.makePromise()
let promise2: EventLoopPromise<Void> = loop.makePromise()
let semaphore = DispatchSemaphore(value: 0)
loop.scheduleRepeatedTask(initialDelay: .milliseconds(0), delay: .milliseconds(0), notifying: promise1) { task -> Void in
task.cancel(promise: promise2)
semaphore.wait()
}
let expectFail1 = XCTestExpectation(description: "Initializer promise was wrongly fulfilled")
let expectFail2 = XCTestExpectation(description: "Cancellation-specific promise was wrongly fulfilled")
let expect1 = XCTestExpectation(description: "Initializer promise was fulfilled")
let expect2 = XCTestExpectation(description: "Cancellation-specific promise was fulfilled")
promise1.futureResult.whenSuccess { expectFail1.fulfill(); expect1.fulfill() }
promise2.futureResult.whenSuccess { expectFail2.fulfill(); expect2.fulfill() }
XCTAssertEqual(XCTWaiter.wait(for: [expectFail1, expectFail2], timeout: 0.5), .timedOut)
semaphore.signal()
XCTAssertEqual(XCTWaiter.wait(for: [expect1, expect2], timeout: 0.5), .completed)
}
}