diff --git a/Sources/NIO/EventLoop.swift b/Sources/NIO/EventLoop.swift index b5da90bc..72dfc3f4 100644 --- a/Sources/NIO/EventLoop.swift +++ b/Sources/NIO/EventLoop.swift @@ -56,12 +56,14 @@ public struct Scheduled { public final class RepeatedTask { private let delay: TimeAmount private let eventLoop: EventLoop + private let cancellationPromise: EventLoopPromise? private var scheduled: Scheduled>? private var task: ((RepeatedTask) -> EventLoopFuture)? - internal init(interval: TimeAmount, eventLoop: EventLoop, task: @escaping (RepeatedTask) -> EventLoopFuture) { + internal init(interval: TimeAmount, eventLoop: EventLoop, cancellationPromise: EventLoopPromise? = nil, task: @escaping (RepeatedTask) -> EventLoopFuture) { self.delay = interval self.eventLoop = eventLoop + self.cancellationPromise = cancellationPromise self.task = task } @@ -92,21 +94,45 @@ public final class RepeatedTask { /// This means immediate cancellation is not guaranteed. /// /// The safest way to cancel is by using the passed reference of `RepeatedTask` inside the task closure. - public func cancel() { + /// + /// If the promise parameter is not `nil`, the passed promise is fulfilled when cancellation is complete. + /// Passing a promise does not prevent fulfillment of any promise provided on original task creation. + public func cancel(promise: EventLoopPromise? = nil) { if self.eventLoop.inEventLoop { - self.cancel0() + self.cancel0(localCancellationPromise: promise) } else { self.eventLoop.execute { - self.cancel0() + self.cancel0(localCancellationPromise: promise) } } } - private func cancel0() { + private func cancel0(localCancellationPromise: EventLoopPromise?) { self.eventLoop.assertInEventLoop() self.scheduled?.cancel() self.scheduled = nil self.task = nil + + // Possible states at this time are: + // 1) Task is scheduled but has not yet executed. + // 2) Task is currently executing and invoked `cancel()` on itself. + // 3) Task is currently executing and `cancel0()` has been reentrantly invoked. + // 4) NOT VALID: Task is currently executing and has NOT invoked `cancel()` (`EventLoop` guarantees serial execution) + // 5) NOT VALID: Task has completed execution in a success state (`reschedule()` ensures state #2). + // 6) Task has completed execution in a failure state. + // 7) Task has been fully cancelled at a previous time. + // + // It is desirable that the task has fully completed any execution before any cancellation promise is + // fulfilled. States 2 and 3 occur during execution, so the requirement is implemented by deferring + // fulfillment to the next `EventLoop` cycle. The delay is harmless to other states and distinguishing + // them from 2 and 3 is not practical (or necessarily possible), so is used unconditionally. Check the + // promises for nil so as not to otherwise invoke `execute()` unnecessarily. + if self.cancellationPromise != nil || localCancellationPromise != nil { + self.eventLoop.execute { + self.cancellationPromise?.succeed(()) + localCancellationPromise?.succeed(()) + } + } } private func reschedule() { @@ -122,7 +148,7 @@ public final class RepeatedTask { } scheduled.futureResult.whenFailure { (_: Error) in - self.cancel0() + self.cancel0(localCancellationPromise: nil) } } @@ -441,10 +467,11 @@ extension EventLoop { /// - parameters: /// - initialDelay: The delay after which the first task is executed. /// - delay: The delay between the end of one task and the start of the next. + /// - promise: If non-nil, a promise to fulfill when the task is cancelled and all execution is complete. /// - task: The closure that will be executed. /// - return: `RepeatedTask` @discardableResult - public func scheduleRepeatedTask(initialDelay: TimeAmount, delay: TimeAmount, _ task: @escaping (RepeatedTask) throws -> Void) -> RepeatedTask { + public func scheduleRepeatedTask(initialDelay: TimeAmount, delay: TimeAmount, notifying promise: EventLoopPromise? = nil, _ task: @escaping (RepeatedTask) throws -> Void) -> RepeatedTask { let futureTask: (RepeatedTask) -> EventLoopFuture = { repeatedTask in do { try task(repeatedTask) @@ -453,7 +480,7 @@ extension EventLoop { return self.makeFailedFuture(error) } } - return self.scheduleRepeatedTask(initialDelay: initialDelay, delay: delay, futureTask) + return self.scheduleRepeatedTask(initialDelay: initialDelay, delay: delay, notifying: promise, futureTask) } /// Schedule a repeated task to be executed by the `EventLoop` with a fixed delay between the end and start of each task. @@ -461,11 +488,12 @@ extension EventLoop { /// - parameters: /// - initialDelay: The delay after which the first task is executed. /// - delay: The delay between the end of one task and the start of the next. + /// - promise: If non-nil, a promise to fulfill when the task is cancelled and all execution is complete. /// - task: The closure that will be executed. /// - return: `RepeatedTask` @discardableResult - public func scheduleRepeatedTask(initialDelay: TimeAmount, delay: TimeAmount, _ task: @escaping (RepeatedTask) -> EventLoopFuture) -> RepeatedTask { - let repeated = RepeatedTask(interval: delay, eventLoop: self, task: task) + public func scheduleRepeatedTask(initialDelay: TimeAmount, delay: TimeAmount, notifying promise: EventLoopPromise? = nil, _ task: @escaping (RepeatedTask) -> EventLoopFuture) -> RepeatedTask { + let repeated = RepeatedTask(interval: delay, eventLoop: self, cancellationPromise: promise, task: task) repeated.begin(in: initialDelay) return repeated } diff --git a/Tests/NIOTests/EventLoopTest+XCTest.swift b/Tests/NIOTests/EventLoopTest+XCTest.swift index 838490e8..34cfaaa9 100644 --- a/Tests/NIOTests/EventLoopTest+XCTest.swift +++ b/Tests/NIOTests/EventLoopTest+XCTest.swift @@ -46,6 +46,9 @@ extension EventLoopTest { ("testShutdownWhileScheduledTasksNotReady", testShutdownWhileScheduledTasksNotReady), ("testCloseFutureNotifiedBeforeUnblock", testCloseFutureNotifiedBeforeUnblock), ("testScheduleMultipleTasks", testScheduleMultipleTasks), + ("testRepeatedTaskThatIsImmediatelyCancelledNotifies", testRepeatedTaskThatIsImmediatelyCancelledNotifies), + ("testRepeatedTaskThatIsCancelledAfterRunningAtLeastTwiceNotifies", testRepeatedTaskThatIsCancelledAfterRunningAtLeastTwiceNotifies), + ("testRepeatedTaskThatCancelsItselfNotifiesOnlyWhenFinished", testRepeatedTaskThatCancelsItselfNotifiesOnlyWhenFinished), ] } } diff --git a/Tests/NIOTests/EventLoopTest.swift b/Tests/NIOTests/EventLoopTest.swift index 4532ca61..f5f9f10b 100644 --- a/Tests/NIOTests/EventLoopTest.swift +++ b/Tests/NIOTests/EventLoopTest.swift @@ -570,4 +570,77 @@ public class EventLoopTest : XCTestCase { XCTAssertTrue(result.isEmpty) } + + public func testRepeatedTaskThatIsImmediatelyCancelledNotifies() throws { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { + XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) + } + + let loop = eventLoopGroup.next() + let promise1: EventLoopPromise = loop.makePromise() + let promise2: EventLoopPromise = loop.makePromise() + let expect1 = XCTestExpectation(description: "Initializer promise was fulfilled") + let expect2 = XCTestExpectation(description: "Cancellation-specific promise was fulfilled") + promise1.futureResult.whenSuccess { expect1.fulfill() } + promise2.futureResult.whenSuccess { expect2.fulfill() } + loop.execute { + let task = loop.scheduleRepeatedTask(initialDelay: .milliseconds(0), delay: .milliseconds(0), notifying: promise1) { task in + XCTFail() + } + task.cancel(promise: promise2) + } + Thread.sleep(until: .init(timeIntervalSinceNow: 0.1)) + let res = XCTWaiter.wait(for: [expect1, expect2], timeout: 1.0) + XCTAssertEqual(res, .completed) + } + + public func testRepeatedTaskThatIsCancelledAfterRunningAtLeastTwiceNotifies() throws { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { + XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) + } + + let loop = eventLoopGroup.next() + let promise1: EventLoopPromise = loop.makePromise() + let promise2: EventLoopPromise = loop.makePromise() + let expectRuns = XCTestExpectation(description: "Repeated task has run") + expectRuns.expectedFulfillmentCount = 2 + let task = loop.scheduleRepeatedTask(initialDelay: .milliseconds(0), delay: .milliseconds(10), notifying: promise1) { task in + expectRuns.fulfill() + } + XCTAssertEqual(XCTWaiter.wait(for: [expectRuns], timeout: 0.05), .completed) + let expect1 = XCTestExpectation(description: "Initializer promise was fulfilled") + let expect2 = XCTestExpectation(description: "Cancellation-specific promise was fulfilled") + promise1.futureResult.whenSuccess { expect1.fulfill() } + promise2.futureResult.whenSuccess { expect2.fulfill() } + task.cancel(promise: promise2) + XCTAssertEqual(XCTWaiter.wait(for: [expect1, expect2], timeout: 1.0), .completed) + } + + public func testRepeatedTaskThatCancelsItselfNotifiesOnlyWhenFinished() throws { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { + XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) + } + + let loop = eventLoopGroup.next() + let promise1: EventLoopPromise = loop.makePromise() + let promise2: EventLoopPromise = loop.makePromise() + let semaphore = DispatchSemaphore(value: 0) + loop.scheduleRepeatedTask(initialDelay: .milliseconds(0), delay: .milliseconds(0), notifying: promise1) { task -> Void in + task.cancel(promise: promise2) + semaphore.wait() + } + let expectFail1 = XCTestExpectation(description: "Initializer promise was wrongly fulfilled") + let expectFail2 = XCTestExpectation(description: "Cancellation-specific promise was wrongly fulfilled") + let expect1 = XCTestExpectation(description: "Initializer promise was fulfilled") + let expect2 = XCTestExpectation(description: "Cancellation-specific promise was fulfilled") + promise1.futureResult.whenSuccess { expectFail1.fulfill(); expect1.fulfill() } + promise2.futureResult.whenSuccess { expectFail2.fulfill(); expect2.fulfill() } + XCTAssertEqual(XCTWaiter.wait(for: [expectFail1, expectFail2], timeout: 0.5), .timedOut) + semaphore.signal() + XCTAssertEqual(XCTWaiter.wait(for: [expect1, expect2], timeout: 0.5), .completed) + } + }