From 6431296d6b820b6d99069d2f640704130d3a01cd Mon Sep 17 00:00:00 2001 From: Franz Busch Date: Fri, 2 Sep 2022 10:01:11 +0100 Subject: [PATCH] Call finish once the Source is deinited (#2258) * Call finish once the Source is deinited # Motivation We **MUST** call `finish()` when the `Source` deinits otherwise we can have a suspended continuation that never gets resumed. # Modification Introduce an internal class to both `Source`s and call `finish()` in their `deinit`s. # Result We are now resuming all continuations. * Remove @unchecked --- .../NIOAsyncSequenceProducer.swift | 71 ++++++++++++------- .../NIOThrowingAsyncSequenceProducer.swift | 42 ++++++++--- .../NIOAsyncSequenceTests.swift | 68 ++++++++++++++++++ .../NIOThrowingAsyncSequenceTests.swift | 68 ++++++++++++++++++ 4 files changed, 215 insertions(+), 34 deletions(-) diff --git a/Sources/NIOCore/AsyncSequences/NIOAsyncSequenceProducer.swift b/Sources/NIOCore/AsyncSequences/NIOAsyncSequenceProducer.swift index 0528a661..fb109bdb 100644 --- a/Sources/NIOCore/AsyncSequences/NIOAsyncSequenceProducer.swift +++ b/Sources/NIOCore/AsyncSequences/NIOAsyncSequenceProducer.swift @@ -90,7 +90,7 @@ public struct NIOAsyncSequenceProducer< Strategy: NIOAsyncSequenceProducerBackPressureStrategy, Delegate: NIOAsyncSequenceProducerDelegate >: Sendable { - /// Simple struct for the return type of ``NIOAsyncSequenceProducer/makeSequence(of:backPressureStrategy:delegate:)``. + /// Simple struct for the return type of ``NIOAsyncSequenceProducer/makeSequence(elementType:backPressureStrategy:delegate:)``. /// /// This struct contains two properties: /// 1. The ``source`` which should be retained by the producer and is used @@ -198,30 +198,51 @@ extension NIOAsyncSequenceProducer { @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) extension NIOAsyncSequenceProducer { /// A struct to interface between the synchronous code of the producer and the asynchronous consumer. - /// This type allows the producer to synchronously `yield` new elements to the ``NIOThrowingAsyncSequenceProducer`` + /// This type allows the producer to synchronously `yield` new elements to the ``NIOAsyncSequenceProducer`` /// and to `finish` the sequence. public struct Source { + /// This class is needed to hook the deinit to observe once all references to the ``NIOAsyncSequenceProducer/Source`` are dropped. + /// + /// - Important: This is safe to be unchecked ``Sendable`` since the `storage` is ``Sendable`` and `immutable`. @usableFromInline - /* fileprivate */ internal let _throwingSource: NIOThrowingAsyncSequenceProducer< - Element, - Never, - Strategy, - Delegate - >.Source - - @usableFromInline - /* fileprivate */ internal init( - throwingSource: NIOThrowingAsyncSequenceProducer< + /* fileprivate */ internal final class InternalClass: Sendable { + @usableFromInline + typealias ThrowingSource = NIOThrowingAsyncSequenceProducer< Element, Never, Strategy, Delegate >.Source - ) { - self._throwingSource = throwingSource + + @usableFromInline + /* fileprivate */ internal let _throwingSource: ThrowingSource + + @inlinable + init(throwingSource: ThrowingSource) { + self._throwingSource = throwingSource + } + + @inlinable + deinit { + // We need to call finish here to resume any suspended continuation. + self._throwingSource.finish() + } } - /// The result of a call to ``NIOThrowingAsyncSequenceProducer/Source/yield(_:)``. + @usableFromInline + /* private */ internal let _internalClass: InternalClass + + @usableFromInline + /* private */ internal var _throwingSource: InternalClass.ThrowingSource { + self._internalClass._throwingSource + } + + @usableFromInline + /* fileprivate */ internal init(throwingSource: InternalClass.ThrowingSource) { + self._internalClass = .init(throwingSource: throwingSource) + } + + /// The result of a call to ``NIOAsyncSequenceProducer/Source/yield(_:)``. public enum YieldResult: Hashable { /// Indicates that the caller should produce more elements. case produceMore @@ -231,19 +252,19 @@ extension NIOAsyncSequenceProducer { case dropped } - /// Yields a sequence of new elements to the ``NIOThrowingAsyncSequenceProducer``. + /// Yields a sequence of new elements to the ``NIOAsyncSequenceProducer``. /// - /// If there is an ``NIOThrowingAsyncSequenceProducer/AsyncIterator`` awaiting the next element, it will get resumed right away. + /// If there is an ``NIOAsyncSequenceProducer/AsyncIterator`` awaiting the next element, it will get resumed right away. /// Otherwise, the element will get buffered. /// - /// If the ``NIOThrowingAsyncSequenceProducer`` is terminated this will drop the elements + /// If the ``NIOAsyncSequenceProducer`` is terminated this will drop the elements /// and return ``YieldResult/dropped``. /// /// This can be called more than once and returns to the caller immediately /// without blocking for any awaiting consumption from the iteration. /// /// - Parameter contentsOf: The sequence to yield. - /// - Returns: A ``NIOThrowingAsyncSequenceProducer/Source/YieldResult`` that indicates if the yield was successful + /// - Returns: A ``NIOAsyncSequenceProducer/Source/YieldResult`` that indicates if the yield was successful /// and if more elements should be produced. @inlinable public func yield(contentsOf sequence: S) -> YieldResult where S.Element == Element { @@ -257,19 +278,19 @@ extension NIOAsyncSequenceProducer { } } - /// Yields a new elements to the ``NIOThrowingAsyncSequenceProducer``. + /// Yields a new elements to the ``NIOAsyncSequenceProducer``. /// - /// If there is an ``NIOThrowingAsyncSequenceProducer/AsyncIterator`` awaiting the next element, it will get resumed right away. + /// If there is an ``NIOAsyncSequenceProducer/AsyncIterator`` awaiting the next element, it will get resumed right away. /// Otherwise, the element will get buffered. /// - /// If the ``NIOThrowingAsyncSequenceProducer`` is terminated this will drop the elements + /// If the ``NIOAsyncSequenceProducer`` is terminated this will drop the elements /// and return ``YieldResult/dropped``. /// /// This can be called more than once and returns to the caller immediately /// without blocking for any awaiting consumption from the iteration. /// /// - Parameter element: The element to yield. - /// - Returns: A ``NIOThrowingAsyncSequenceProducer/Source/YieldResult`` that indicates if the yield was successful + /// - Returns: A ``NIOAsyncSequenceProducer/Source/YieldResult`` that indicates if the yield was successful /// and if more elements should be produced. @inlinable public func yield(_ element: Element) -> YieldResult { @@ -280,8 +301,8 @@ extension NIOAsyncSequenceProducer { /// /// Calling this function signals the sequence that there won't be any subsequent elements yielded. /// - /// If there are still buffered elements and there is an ``NIOThrowingAsyncSequenceProducer/AsyncIterator`` consuming the sequence, - /// then termination of the sequence only happens once all elements have been consumed by the ``NIOThrowingAsyncSequenceProducer/AsyncIterator``. + /// If there are still buffered elements and there is an ``NIOAsyncSequenceProducer/AsyncIterator`` consuming the sequence, + /// then termination of the sequence only happens once all elements have been consumed by the ``NIOAsyncSequenceProducer/AsyncIterator``. /// Otherwise, the buffered elements will be dropped. /// /// - Note: Calling this function more than once has no effect. diff --git a/Sources/NIOCore/AsyncSequences/NIOThrowingAsyncSequenceProducer.swift b/Sources/NIOCore/AsyncSequences/NIOThrowingAsyncSequenceProducer.swift index 2c43cdd9..2bea7738 100644 --- a/Sources/NIOCore/AsyncSequences/NIOThrowingAsyncSequenceProducer.swift +++ b/Sources/NIOCore/AsyncSequences/NIOThrowingAsyncSequenceProducer.swift @@ -62,10 +62,8 @@ public struct NIOThrowingAsyncSequenceProducer< /// This class is needed to hook the deinit to observe once all references to the ``NIOThrowingAsyncSequenceProducer`` are dropped. /// /// If we get move-only types we should be able to drop this class and use the `deinit` of the ``AsyncIterator`` struct itself. - /// - /// - Important: This is safe to be unchecked ``Sendable`` since the `storage` is ``Sendable`` and `immutable`. @usableFromInline - /* fileprivate */ internal final class InternalClass: @unchecked Sendable { + /* fileprivate */ internal final class InternalClass: Sendable { @usableFromInline internal let _storage: Storage @@ -143,10 +141,8 @@ extension NIOThrowingAsyncSequenceProducer { /// This class is needed to hook the deinit to observe once all references to an instance of the ``AsyncIterator`` are dropped. /// /// If we get move-only types we should be able to drop this class and use the `deinit` of the ``AsyncIterator`` struct itself. - /// - /// - Important: This is safe to be unchecked ``Sendable`` since the `storage` is ``Sendable`` and `immutable`. @usableFromInline - /* private */ internal final class InternalClass: @unchecked Sendable { + /* private */ internal final class InternalClass: Sendable { @usableFromInline /* private */ internal let _storage: Storage @@ -185,13 +181,41 @@ extension NIOThrowingAsyncSequenceProducer { /// A struct to interface between the synchronous code of the producer and the asynchronous consumer. /// This type allows the producer to synchronously `yield` new elements to the ``NIOThrowingAsyncSequenceProducer`` /// and to `finish` the sequence. - public struct Source { + /// + /// - Note: This struct has reference semantics. Once all copies of a source have been dropped ``NIOThrowingAsyncSequenceProducer/Source/finish()``. + /// This will resume any suspended continuation. + public struct Source: Sendable { + /// This class is needed to hook the deinit to observe once all references to the ``NIOThrowingAsyncSequenceProducer/Source`` are dropped. + /// + /// - Important: This is safe to be unchecked ``Sendable`` since the `storage` is ``Sendable`` and `immutable`. @usableFromInline - /* fileprivate */ internal let _storage: Storage + /* fileprivate */ internal final class InternalClass: Sendable { + @usableFromInline + internal let _storage: Storage + + @inlinable + init(storage: Storage) { + self._storage = storage + } + + @inlinable + deinit { + // We need to call finish here to resume any suspended continuation. + self._storage.finish(nil) + } + } + + @usableFromInline + /* private */ internal let _internalClass: InternalClass + + @usableFromInline + /* private */ internal var _storage: Storage { + self._internalClass._storage + } @usableFromInline /* fileprivate */ internal init(storage: Storage) { - self._storage = storage + self._internalClass = .init(storage: storage) } /// The result of a call to ``NIOThrowingAsyncSequenceProducer/Source/yield(_:)``. diff --git a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncSequenceTests.swift b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncSequenceTests.swift index ed0faa1b..37b9e98c 100644 --- a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncSequenceTests.swift +++ b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncSequenceTests.swift @@ -299,6 +299,74 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { XCTAssertEqual(self.delegate.didTerminateCallCount, 1) } + // MARK: - Source Deinited + + func testSourceDeinited_whenInitial() async { + self.source = nil + + XCTAssertEqual(self.delegate.didTerminateCallCount, 0) + } + + func testSourceDeinited_whenStreaming_andSuspended() async throws { + // We are registering our demand and sleeping a bit to make + // sure the other child task runs when the demand is registered + let sequence = try XCTUnwrap(self.sequence) + let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in + group.addTask { + let element = await sequence.first { _ in true } + return element + } + + try await Task.sleep(nanoseconds: 1_000_000) + + self.source = nil + + return try await group.next() ?? nil + } + + XCTAssertEqual(element, nil) + XCTAssertEqual(self.delegate.didTerminateCallCount, 1) + } + + func testSourceDeinited_whenStreaming_andNotSuspended_andBufferEmpty() async throws { + _ = self.source.yield(contentsOf: []) + + self.source = nil + + let sequence = try XCTUnwrap(self.sequence) + let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in + group.addTask { + return await sequence.first { _ in true } + } + + return try await group.next() ?? nil + } + + XCTAssertNil(element) + XCTAssertEqual(self.delegate.didTerminateCallCount, 1) + } + + func testSourceDeinited_whenStreaming_andNotSuspended_andBufferNotEmpty() async throws { + _ = self.source.yield(contentsOf: [1]) + + self.source = nil + + XCTAssertEqual(self.delegate.didTerminateCallCount, 0) + + let sequence = try XCTUnwrap(self.sequence) + let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in + group.addTask { + return await sequence.first { _ in true } + } + + return try await group.next() ?? nil + } + + XCTAssertEqual(element, 1) + + XCTAssertEqual(self.delegate.didTerminateCallCount, 1) + } + // MARK: - Task cancel func testTaskCancel_whenStreaming_andSuspended() async throws { diff --git a/Tests/NIOCoreTests/AsyncSequences/NIOThrowingAsyncSequenceTests.swift b/Tests/NIOCoreTests/AsyncSequences/NIOThrowingAsyncSequenceTests.swift index daf52f30..c6c6d5e4 100644 --- a/Tests/NIOCoreTests/AsyncSequences/NIOThrowingAsyncSequenceTests.swift +++ b/Tests/NIOCoreTests/AsyncSequences/NIOThrowingAsyncSequenceTests.swift @@ -403,6 +403,74 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { XCTAssertEqual(self.delegate.didTerminateCallCount, 1) } + // MARK: - Source Deinited + + func testSourceDeinited_whenInitial() async { + self.source = nil + + XCTAssertEqual(self.delegate.didTerminateCallCount, 0) + } + + func testSourceDeinited_whenStreaming_andSuspended() async throws { + // We are registering our demand and sleeping a bit to make + // sure the other child task runs when the demand is registered + let sequence = try XCTUnwrap(self.sequence) + let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in + group.addTask { + let element = try await sequence.first { _ in true } + return element + } + + try await Task.sleep(nanoseconds: 1_000_000) + + self.source = nil + + return try await group.next() ?? nil + } + + XCTAssertEqual(element, nil) + XCTAssertEqual(self.delegate.didTerminateCallCount, 1) + } + + func testSourceDeinited_whenStreaming_andNotSuspended_andBufferEmpty() async throws { + _ = self.source.yield(contentsOf: []) + + self.source = nil + + let sequence = try XCTUnwrap(self.sequence) + let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in + group.addTask { + return try await sequence.first { _ in true } + } + + return try await group.next() ?? nil + } + + XCTAssertNil(element) + XCTAssertEqual(self.delegate.didTerminateCallCount, 1) + } + + func testSourceDeinited_whenStreaming_andNotSuspended_andBufferNotEmpty() async throws { + _ = self.source.yield(contentsOf: [1]) + + self.source = nil + + XCTAssertEqual(self.delegate.didTerminateCallCount, 0) + + let sequence = try XCTUnwrap(self.sequence) + let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in + group.addTask { + return try await sequence.first { _ in true } + } + + return try await group.next() ?? nil + } + + XCTAssertEqual(element, 1) + + XCTAssertEqual(self.delegate.didTerminateCallCount, 1) + } + // MARK: - Task cancel func testTaskCancel_whenStreaming_andSuspended() async throws {