Call finish once the Source is deinited (#2258)

* Call finish once the Source is deinited

# Motivation
We **MUST** call `finish()` when the `Source` deinits otherwise we can have a suspended continuation that never gets resumed.

# Modification
Introduce an internal class to both `Source`s and call `finish()` in their `deinit`s.

# Result
We are now resuming all continuations.

* Remove @unchecked
This commit is contained in:
Franz Busch 2022-09-02 10:01:11 +01:00 committed by GitHub
parent 028cf7e606
commit 6431296d6b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 215 additions and 34 deletions

View File

@ -90,7 +90,7 @@ public struct NIOAsyncSequenceProducer<
Strategy: NIOAsyncSequenceProducerBackPressureStrategy,
Delegate: NIOAsyncSequenceProducerDelegate
>: Sendable {
/// Simple struct for the return type of ``NIOAsyncSequenceProducer/makeSequence(of:backPressureStrategy:delegate:)``.
/// Simple struct for the return type of ``NIOAsyncSequenceProducer/makeSequence(elementType:backPressureStrategy:delegate:)``.
///
/// This struct contains two properties:
/// 1. The ``source`` which should be retained by the producer and is used
@ -198,30 +198,51 @@ extension NIOAsyncSequenceProducer {
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension NIOAsyncSequenceProducer {
/// A struct to interface between the synchronous code of the producer and the asynchronous consumer.
/// This type allows the producer to synchronously `yield` new elements to the ``NIOThrowingAsyncSequenceProducer``
/// This type allows the producer to synchronously `yield` new elements to the ``NIOAsyncSequenceProducer``
/// and to `finish` the sequence.
public struct Source {
/// This class is needed to hook the deinit to observe once all references to the ``NIOAsyncSequenceProducer/Source`` are dropped.
///
/// - Important: This is safe to be unchecked ``Sendable`` since the `storage` is ``Sendable`` and `immutable`.
@usableFromInline
/* fileprivate */ internal let _throwingSource: NIOThrowingAsyncSequenceProducer<
Element,
Never,
Strategy,
Delegate
>.Source
@usableFromInline
/* fileprivate */ internal init(
throwingSource: NIOThrowingAsyncSequenceProducer<
/* fileprivate */ internal final class InternalClass: Sendable {
@usableFromInline
typealias ThrowingSource = NIOThrowingAsyncSequenceProducer<
Element,
Never,
Strategy,
Delegate
>.Source
) {
self._throwingSource = throwingSource
@usableFromInline
/* fileprivate */ internal let _throwingSource: ThrowingSource
@inlinable
init(throwingSource: ThrowingSource) {
self._throwingSource = throwingSource
}
@inlinable
deinit {
// We need to call finish here to resume any suspended continuation.
self._throwingSource.finish()
}
}
/// The result of a call to ``NIOThrowingAsyncSequenceProducer/Source/yield(_:)``.
@usableFromInline
/* private */ internal let _internalClass: InternalClass
@usableFromInline
/* private */ internal var _throwingSource: InternalClass.ThrowingSource {
self._internalClass._throwingSource
}
@usableFromInline
/* fileprivate */ internal init(throwingSource: InternalClass.ThrowingSource) {
self._internalClass = .init(throwingSource: throwingSource)
}
/// The result of a call to ``NIOAsyncSequenceProducer/Source/yield(_:)``.
public enum YieldResult: Hashable {
/// Indicates that the caller should produce more elements.
case produceMore
@ -231,19 +252,19 @@ extension NIOAsyncSequenceProducer {
case dropped
}
/// Yields a sequence of new elements to the ``NIOThrowingAsyncSequenceProducer``.
/// Yields a sequence of new elements to the ``NIOAsyncSequenceProducer``.
///
/// If there is an ``NIOThrowingAsyncSequenceProducer/AsyncIterator`` awaiting the next element, it will get resumed right away.
/// If there is an ``NIOAsyncSequenceProducer/AsyncIterator`` awaiting the next element, it will get resumed right away.
/// Otherwise, the element will get buffered.
///
/// If the ``NIOThrowingAsyncSequenceProducer`` is terminated this will drop the elements
/// If the ``NIOAsyncSequenceProducer`` is terminated this will drop the elements
/// and return ``YieldResult/dropped``.
///
/// This can be called more than once and returns to the caller immediately
/// without blocking for any awaiting consumption from the iteration.
///
/// - Parameter contentsOf: The sequence to yield.
/// - Returns: A ``NIOThrowingAsyncSequenceProducer/Source/YieldResult`` that indicates if the yield was successful
/// - Returns: A ``NIOAsyncSequenceProducer/Source/YieldResult`` that indicates if the yield was successful
/// and if more elements should be produced.
@inlinable
public func yield<S: Sequence>(contentsOf sequence: S) -> YieldResult where S.Element == Element {
@ -257,19 +278,19 @@ extension NIOAsyncSequenceProducer {
}
}
/// Yields a new elements to the ``NIOThrowingAsyncSequenceProducer``.
/// Yields a new elements to the ``NIOAsyncSequenceProducer``.
///
/// If there is an ``NIOThrowingAsyncSequenceProducer/AsyncIterator`` awaiting the next element, it will get resumed right away.
/// If there is an ``NIOAsyncSequenceProducer/AsyncIterator`` awaiting the next element, it will get resumed right away.
/// Otherwise, the element will get buffered.
///
/// If the ``NIOThrowingAsyncSequenceProducer`` is terminated this will drop the elements
/// If the ``NIOAsyncSequenceProducer`` is terminated this will drop the elements
/// and return ``YieldResult/dropped``.
///
/// This can be called more than once and returns to the caller immediately
/// without blocking for any awaiting consumption from the iteration.
///
/// - Parameter element: The element to yield.
/// - Returns: A ``NIOThrowingAsyncSequenceProducer/Source/YieldResult`` that indicates if the yield was successful
/// - Returns: A ``NIOAsyncSequenceProducer/Source/YieldResult`` that indicates if the yield was successful
/// and if more elements should be produced.
@inlinable
public func yield(_ element: Element) -> YieldResult {
@ -280,8 +301,8 @@ extension NIOAsyncSequenceProducer {
///
/// Calling this function signals the sequence that there won't be any subsequent elements yielded.
///
/// If there are still buffered elements and there is an ``NIOThrowingAsyncSequenceProducer/AsyncIterator`` consuming the sequence,
/// then termination of the sequence only happens once all elements have been consumed by the ``NIOThrowingAsyncSequenceProducer/AsyncIterator``.
/// If there are still buffered elements and there is an ``NIOAsyncSequenceProducer/AsyncIterator`` consuming the sequence,
/// then termination of the sequence only happens once all elements have been consumed by the ``NIOAsyncSequenceProducer/AsyncIterator``.
/// Otherwise, the buffered elements will be dropped.
///
/// - Note: Calling this function more than once has no effect.

View File

@ -62,10 +62,8 @@ public struct NIOThrowingAsyncSequenceProducer<
/// This class is needed to hook the deinit to observe once all references to the ``NIOThrowingAsyncSequenceProducer`` are dropped.
///
/// If we get move-only types we should be able to drop this class and use the `deinit` of the ``AsyncIterator`` struct itself.
///
/// - Important: This is safe to be unchecked ``Sendable`` since the `storage` is ``Sendable`` and `immutable`.
@usableFromInline
/* fileprivate */ internal final class InternalClass: @unchecked Sendable {
/* fileprivate */ internal final class InternalClass: Sendable {
@usableFromInline
internal let _storage: Storage
@ -143,10 +141,8 @@ extension NIOThrowingAsyncSequenceProducer {
/// This class is needed to hook the deinit to observe once all references to an instance of the ``AsyncIterator`` are dropped.
///
/// If we get move-only types we should be able to drop this class and use the `deinit` of the ``AsyncIterator`` struct itself.
///
/// - Important: This is safe to be unchecked ``Sendable`` since the `storage` is ``Sendable`` and `immutable`.
@usableFromInline
/* private */ internal final class InternalClass: @unchecked Sendable {
/* private */ internal final class InternalClass: Sendable {
@usableFromInline
/* private */ internal let _storage: Storage
@ -185,13 +181,41 @@ extension NIOThrowingAsyncSequenceProducer {
/// A struct to interface between the synchronous code of the producer and the asynchronous consumer.
/// This type allows the producer to synchronously `yield` new elements to the ``NIOThrowingAsyncSequenceProducer``
/// and to `finish` the sequence.
public struct Source {
///
/// - Note: This struct has reference semantics. Once all copies of a source have been dropped ``NIOThrowingAsyncSequenceProducer/Source/finish()``.
/// This will resume any suspended continuation.
public struct Source: Sendable {
/// This class is needed to hook the deinit to observe once all references to the ``NIOThrowingAsyncSequenceProducer/Source`` are dropped.
///
/// - Important: This is safe to be unchecked ``Sendable`` since the `storage` is ``Sendable`` and `immutable`.
@usableFromInline
/* fileprivate */ internal let _storage: Storage
/* fileprivate */ internal final class InternalClass: Sendable {
@usableFromInline
internal let _storage: Storage
@inlinable
init(storage: Storage) {
self._storage = storage
}
@inlinable
deinit {
// We need to call finish here to resume any suspended continuation.
self._storage.finish(nil)
}
}
@usableFromInline
/* private */ internal let _internalClass: InternalClass
@usableFromInline
/* private */ internal var _storage: Storage {
self._internalClass._storage
}
@usableFromInline
/* fileprivate */ internal init(storage: Storage) {
self._storage = storage
self._internalClass = .init(storage: storage)
}
/// The result of a call to ``NIOThrowingAsyncSequenceProducer/Source/yield(_:)``.

View File

@ -299,6 +299,74 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
XCTAssertEqual(self.delegate.didTerminateCallCount, 1)
}
// MARK: - Source Deinited
func testSourceDeinited_whenInitial() async {
self.source = nil
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
}
func testSourceDeinited_whenStreaming_andSuspended() async throws {
// We are registering our demand and sleeping a bit to make
// sure the other child task runs when the demand is registered
let sequence = try XCTUnwrap(self.sequence)
let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in
group.addTask {
let element = await sequence.first { _ in true }
return element
}
try await Task.sleep(nanoseconds: 1_000_000)
self.source = nil
return try await group.next() ?? nil
}
XCTAssertEqual(element, nil)
XCTAssertEqual(self.delegate.didTerminateCallCount, 1)
}
func testSourceDeinited_whenStreaming_andNotSuspended_andBufferEmpty() async throws {
_ = self.source.yield(contentsOf: [])
self.source = nil
let sequence = try XCTUnwrap(self.sequence)
let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in
group.addTask {
return await sequence.first { _ in true }
}
return try await group.next() ?? nil
}
XCTAssertNil(element)
XCTAssertEqual(self.delegate.didTerminateCallCount, 1)
}
func testSourceDeinited_whenStreaming_andNotSuspended_andBufferNotEmpty() async throws {
_ = self.source.yield(contentsOf: [1])
self.source = nil
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
let sequence = try XCTUnwrap(self.sequence)
let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in
group.addTask {
return await sequence.first { _ in true }
}
return try await group.next() ?? nil
}
XCTAssertEqual(element, 1)
XCTAssertEqual(self.delegate.didTerminateCallCount, 1)
}
// MARK: - Task cancel
func testTaskCancel_whenStreaming_andSuspended() async throws {

View File

@ -403,6 +403,74 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
XCTAssertEqual(self.delegate.didTerminateCallCount, 1)
}
// MARK: - Source Deinited
func testSourceDeinited_whenInitial() async {
self.source = nil
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
}
func testSourceDeinited_whenStreaming_andSuspended() async throws {
// We are registering our demand and sleeping a bit to make
// sure the other child task runs when the demand is registered
let sequence = try XCTUnwrap(self.sequence)
let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in
group.addTask {
let element = try await sequence.first { _ in true }
return element
}
try await Task.sleep(nanoseconds: 1_000_000)
self.source = nil
return try await group.next() ?? nil
}
XCTAssertEqual(element, nil)
XCTAssertEqual(self.delegate.didTerminateCallCount, 1)
}
func testSourceDeinited_whenStreaming_andNotSuspended_andBufferEmpty() async throws {
_ = self.source.yield(contentsOf: [])
self.source = nil
let sequence = try XCTUnwrap(self.sequence)
let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in
group.addTask {
return try await sequence.first { _ in true }
}
return try await group.next() ?? nil
}
XCTAssertNil(element)
XCTAssertEqual(self.delegate.didTerminateCallCount, 1)
}
func testSourceDeinited_whenStreaming_andNotSuspended_andBufferNotEmpty() async throws {
_ = self.source.yield(contentsOf: [1])
self.source = nil
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
let sequence = try XCTUnwrap(self.sequence)
let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in
group.addTask {
return try await sequence.first { _ in true }
}
return try await group.next() ?? nil
}
XCTAssertEqual(element, 1)
XCTAssertEqual(self.delegate.didTerminateCallCount, 1)
}
// MARK: - Task cancel
func testTaskCancel_whenStreaming_andSuspended() async throws {