From 8222af4aad3c3b29c4422e8b446f34b44ba365a6 Mon Sep 17 00:00:00 2001 From: Gwynne Raskind Date: Tue, 29 Jan 2019 07:19:12 -0600 Subject: [PATCH] Add cancel notification to RepeatedTask (#783) * Add cancel notification to RepeatedTask Motivation: When cancelling a RepeatedTask, it is often desirable to wait until any still in-progress execution of the task is finished - for example, to know when a periodic state save is done before shutting down. The current API provides no easy way to perform such a wait. With this addition, repeating tasks can report completion of a cancellation request regardless of in what state the task was in when the request was issued. Modifications: A default-nil parameter is added to EventLoop.scheduleRepeatedTask(), to which a promise can be provided; the promise will be fulfilled at the time of the task's cancellation, plus any time required for the task to complete running if it was cancelled during execution. The promise is signaled even if the cancellation is the result of the task throwing an error. A default-nil paramter is also added to RepeatedTask.cancel(), to which a second promise can be passed. This promise is fulfilled under the same conditions as the first, even if the first was not given; neither of the promises is dependent upon the other and both are fulfilled at the same time if provided. Further calls to cancel() with different promises will result in fulfillment of each of those promises as well. Result: The observed behavior of RepeatedTask.cancel() now optionally includes deterministic knowledge of when the task is no longer executing or scheduled to be executed. The API changes are designed to be additive in form, but the function signatures do change. Source compatibility is maintained; binary compatibility is not. --- Sources/NIO/EventLoop.swift | 48 +++++++++++---- Tests/NIOTests/EventLoopTest+XCTest.swift | 3 + Tests/NIOTests/EventLoopTest.swift | 73 +++++++++++++++++++++++ 3 files changed, 114 insertions(+), 10 deletions(-) diff --git a/Sources/NIO/EventLoop.swift b/Sources/NIO/EventLoop.swift index b5da90bc..72dfc3f4 100644 --- a/Sources/NIO/EventLoop.swift +++ b/Sources/NIO/EventLoop.swift @@ -56,12 +56,14 @@ public struct Scheduled { public final class RepeatedTask { private let delay: TimeAmount private let eventLoop: EventLoop + private let cancellationPromise: EventLoopPromise? private var scheduled: Scheduled>? private var task: ((RepeatedTask) -> EventLoopFuture)? - internal init(interval: TimeAmount, eventLoop: EventLoop, task: @escaping (RepeatedTask) -> EventLoopFuture) { + internal init(interval: TimeAmount, eventLoop: EventLoop, cancellationPromise: EventLoopPromise? = nil, task: @escaping (RepeatedTask) -> EventLoopFuture) { self.delay = interval self.eventLoop = eventLoop + self.cancellationPromise = cancellationPromise self.task = task } @@ -92,21 +94,45 @@ public final class RepeatedTask { /// This means immediate cancellation is not guaranteed. /// /// The safest way to cancel is by using the passed reference of `RepeatedTask` inside the task closure. - public func cancel() { + /// + /// If the promise parameter is not `nil`, the passed promise is fulfilled when cancellation is complete. + /// Passing a promise does not prevent fulfillment of any promise provided on original task creation. + public func cancel(promise: EventLoopPromise? = nil) { if self.eventLoop.inEventLoop { - self.cancel0() + self.cancel0(localCancellationPromise: promise) } else { self.eventLoop.execute { - self.cancel0() + self.cancel0(localCancellationPromise: promise) } } } - private func cancel0() { + private func cancel0(localCancellationPromise: EventLoopPromise?) { self.eventLoop.assertInEventLoop() self.scheduled?.cancel() self.scheduled = nil self.task = nil + + // Possible states at this time are: + // 1) Task is scheduled but has not yet executed. + // 2) Task is currently executing and invoked `cancel()` on itself. + // 3) Task is currently executing and `cancel0()` has been reentrantly invoked. + // 4) NOT VALID: Task is currently executing and has NOT invoked `cancel()` (`EventLoop` guarantees serial execution) + // 5) NOT VALID: Task has completed execution in a success state (`reschedule()` ensures state #2). + // 6) Task has completed execution in a failure state. + // 7) Task has been fully cancelled at a previous time. + // + // It is desirable that the task has fully completed any execution before any cancellation promise is + // fulfilled. States 2 and 3 occur during execution, so the requirement is implemented by deferring + // fulfillment to the next `EventLoop` cycle. The delay is harmless to other states and distinguishing + // them from 2 and 3 is not practical (or necessarily possible), so is used unconditionally. Check the + // promises for nil so as not to otherwise invoke `execute()` unnecessarily. + if self.cancellationPromise != nil || localCancellationPromise != nil { + self.eventLoop.execute { + self.cancellationPromise?.succeed(()) + localCancellationPromise?.succeed(()) + } + } } private func reschedule() { @@ -122,7 +148,7 @@ public final class RepeatedTask { } scheduled.futureResult.whenFailure { (_: Error) in - self.cancel0() + self.cancel0(localCancellationPromise: nil) } } @@ -441,10 +467,11 @@ extension EventLoop { /// - parameters: /// - initialDelay: The delay after which the first task is executed. /// - delay: The delay between the end of one task and the start of the next. + /// - promise: If non-nil, a promise to fulfill when the task is cancelled and all execution is complete. /// - task: The closure that will be executed. /// - return: `RepeatedTask` @discardableResult - public func scheduleRepeatedTask(initialDelay: TimeAmount, delay: TimeAmount, _ task: @escaping (RepeatedTask) throws -> Void) -> RepeatedTask { + public func scheduleRepeatedTask(initialDelay: TimeAmount, delay: TimeAmount, notifying promise: EventLoopPromise? = nil, _ task: @escaping (RepeatedTask) throws -> Void) -> RepeatedTask { let futureTask: (RepeatedTask) -> EventLoopFuture = { repeatedTask in do { try task(repeatedTask) @@ -453,7 +480,7 @@ extension EventLoop { return self.makeFailedFuture(error) } } - return self.scheduleRepeatedTask(initialDelay: initialDelay, delay: delay, futureTask) + return self.scheduleRepeatedTask(initialDelay: initialDelay, delay: delay, notifying: promise, futureTask) } /// Schedule a repeated task to be executed by the `EventLoop` with a fixed delay between the end and start of each task. @@ -461,11 +488,12 @@ extension EventLoop { /// - parameters: /// - initialDelay: The delay after which the first task is executed. /// - delay: The delay between the end of one task and the start of the next. + /// - promise: If non-nil, a promise to fulfill when the task is cancelled and all execution is complete. /// - task: The closure that will be executed. /// - return: `RepeatedTask` @discardableResult - public func scheduleRepeatedTask(initialDelay: TimeAmount, delay: TimeAmount, _ task: @escaping (RepeatedTask) -> EventLoopFuture) -> RepeatedTask { - let repeated = RepeatedTask(interval: delay, eventLoop: self, task: task) + public func scheduleRepeatedTask(initialDelay: TimeAmount, delay: TimeAmount, notifying promise: EventLoopPromise? = nil, _ task: @escaping (RepeatedTask) -> EventLoopFuture) -> RepeatedTask { + let repeated = RepeatedTask(interval: delay, eventLoop: self, cancellationPromise: promise, task: task) repeated.begin(in: initialDelay) return repeated } diff --git a/Tests/NIOTests/EventLoopTest+XCTest.swift b/Tests/NIOTests/EventLoopTest+XCTest.swift index 838490e8..34cfaaa9 100644 --- a/Tests/NIOTests/EventLoopTest+XCTest.swift +++ b/Tests/NIOTests/EventLoopTest+XCTest.swift @@ -46,6 +46,9 @@ extension EventLoopTest { ("testShutdownWhileScheduledTasksNotReady", testShutdownWhileScheduledTasksNotReady), ("testCloseFutureNotifiedBeforeUnblock", testCloseFutureNotifiedBeforeUnblock), ("testScheduleMultipleTasks", testScheduleMultipleTasks), + ("testRepeatedTaskThatIsImmediatelyCancelledNotifies", testRepeatedTaskThatIsImmediatelyCancelledNotifies), + ("testRepeatedTaskThatIsCancelledAfterRunningAtLeastTwiceNotifies", testRepeatedTaskThatIsCancelledAfterRunningAtLeastTwiceNotifies), + ("testRepeatedTaskThatCancelsItselfNotifiesOnlyWhenFinished", testRepeatedTaskThatCancelsItselfNotifiesOnlyWhenFinished), ] } } diff --git a/Tests/NIOTests/EventLoopTest.swift b/Tests/NIOTests/EventLoopTest.swift index 4532ca61..f5f9f10b 100644 --- a/Tests/NIOTests/EventLoopTest.swift +++ b/Tests/NIOTests/EventLoopTest.swift @@ -570,4 +570,77 @@ public class EventLoopTest : XCTestCase { XCTAssertTrue(result.isEmpty) } + + public func testRepeatedTaskThatIsImmediatelyCancelledNotifies() throws { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { + XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) + } + + let loop = eventLoopGroup.next() + let promise1: EventLoopPromise = loop.makePromise() + let promise2: EventLoopPromise = loop.makePromise() + let expect1 = XCTestExpectation(description: "Initializer promise was fulfilled") + let expect2 = XCTestExpectation(description: "Cancellation-specific promise was fulfilled") + promise1.futureResult.whenSuccess { expect1.fulfill() } + promise2.futureResult.whenSuccess { expect2.fulfill() } + loop.execute { + let task = loop.scheduleRepeatedTask(initialDelay: .milliseconds(0), delay: .milliseconds(0), notifying: promise1) { task in + XCTFail() + } + task.cancel(promise: promise2) + } + Thread.sleep(until: .init(timeIntervalSinceNow: 0.1)) + let res = XCTWaiter.wait(for: [expect1, expect2], timeout: 1.0) + XCTAssertEqual(res, .completed) + } + + public func testRepeatedTaskThatIsCancelledAfterRunningAtLeastTwiceNotifies() throws { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { + XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) + } + + let loop = eventLoopGroup.next() + let promise1: EventLoopPromise = loop.makePromise() + let promise2: EventLoopPromise = loop.makePromise() + let expectRuns = XCTestExpectation(description: "Repeated task has run") + expectRuns.expectedFulfillmentCount = 2 + let task = loop.scheduleRepeatedTask(initialDelay: .milliseconds(0), delay: .milliseconds(10), notifying: promise1) { task in + expectRuns.fulfill() + } + XCTAssertEqual(XCTWaiter.wait(for: [expectRuns], timeout: 0.05), .completed) + let expect1 = XCTestExpectation(description: "Initializer promise was fulfilled") + let expect2 = XCTestExpectation(description: "Cancellation-specific promise was fulfilled") + promise1.futureResult.whenSuccess { expect1.fulfill() } + promise2.futureResult.whenSuccess { expect2.fulfill() } + task.cancel(promise: promise2) + XCTAssertEqual(XCTWaiter.wait(for: [expect1, expect2], timeout: 1.0), .completed) + } + + public func testRepeatedTaskThatCancelsItselfNotifiesOnlyWhenFinished() throws { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { + XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) + } + + let loop = eventLoopGroup.next() + let promise1: EventLoopPromise = loop.makePromise() + let promise2: EventLoopPromise = loop.makePromise() + let semaphore = DispatchSemaphore(value: 0) + loop.scheduleRepeatedTask(initialDelay: .milliseconds(0), delay: .milliseconds(0), notifying: promise1) { task -> Void in + task.cancel(promise: promise2) + semaphore.wait() + } + let expectFail1 = XCTestExpectation(description: "Initializer promise was wrongly fulfilled") + let expectFail2 = XCTestExpectation(description: "Cancellation-specific promise was wrongly fulfilled") + let expect1 = XCTestExpectation(description: "Initializer promise was fulfilled") + let expect2 = XCTestExpectation(description: "Cancellation-specific promise was fulfilled") + promise1.futureResult.whenSuccess { expectFail1.fulfill(); expect1.fulfill() } + promise2.futureResult.whenSuccess { expectFail2.fulfill(); expect2.fulfill() } + XCTAssertEqual(XCTWaiter.wait(for: [expectFail1, expectFail2], timeout: 0.5), .timedOut) + semaphore.signal() + XCTAssertEqual(XCTWaiter.wait(for: [expect1, expect2], timeout: 0.5), .completed) + } + }