366 lines
11 KiB
Swift
366 lines
11 KiB
Swift
//
|
|
// Copyright Amazon.com Inc. or its affiliates.
|
|
// All Rights Reserved.
|
|
//
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
//
|
|
|
|
import XCTest
|
|
@testable import Amplify
|
|
@testable import AmplifyTestCommon
|
|
|
|
final class AmplifyAsyncSequenceTests: XCTestCase {
|
|
enum Failure: Error {
|
|
case unluckyNumber
|
|
}
|
|
|
|
let queue = OperationQueue()
|
|
|
|
actor Output<Element> {
|
|
var elements: [Element] = []
|
|
func append(_ element: Element) {
|
|
elements.append(element)
|
|
}
|
|
}
|
|
|
|
let sleepSeconds = 0.01
|
|
|
|
func testNumberSequence() async throws {
|
|
let input = [1, 2, 3, 4, 5]
|
|
let channel = AmplifyAsyncSequence<Int>()
|
|
|
|
// load all numbers into the channel with delays
|
|
Task {
|
|
try await send(elements: input, channel: channel, sleepSeconds: sleepSeconds)
|
|
}
|
|
|
|
let output = await channel.reduce(into: []) { array, value in
|
|
array.append(value)
|
|
}
|
|
|
|
XCTAssertEqual(input, output)
|
|
}
|
|
|
|
func testStringSequence() async throws {
|
|
let input = ["one", "two", "three", "four", "five"]
|
|
let channel = AmplifyAsyncSequence<String>()
|
|
|
|
// load all strings into the channel with delays
|
|
Task {
|
|
try await send(elements: input, channel: channel, sleepSeconds: sleepSeconds)
|
|
}
|
|
|
|
let output = await channel.reduce(into: []) { array, value in
|
|
array.append(value)
|
|
}
|
|
|
|
XCTAssertEqual(input, output)
|
|
}
|
|
|
|
func testSucceedingSequence() async throws {
|
|
let input = [3, 7, 14, 21]
|
|
let channel = AmplifyAsyncThrowingSequence<Int>()
|
|
|
|
// load all numbers into the channel with delays
|
|
Task {
|
|
try await send(elements: input, channel: channel, sleepSeconds: sleepSeconds) { element in
|
|
if element == 13 {
|
|
throw Failure.unluckyNumber
|
|
} else {
|
|
return element
|
|
}
|
|
}
|
|
}
|
|
|
|
var output: [Int] = []
|
|
var thrown: Error? = nil
|
|
|
|
do {
|
|
for try await element in channel {
|
|
output.append(element)
|
|
}
|
|
} catch {
|
|
thrown = error
|
|
}
|
|
|
|
XCTAssertNil(thrown)
|
|
XCTAssertEqual(input, output)
|
|
}
|
|
|
|
func testFailingSequence() async throws {
|
|
let input = [3, 7, 13, 21]
|
|
let channel = AmplifyAsyncThrowingSequence<Int>()
|
|
|
|
// load all numbers into the channel with delays
|
|
Task {
|
|
try await send(elements: input, channel: channel, sleepSeconds: sleepSeconds) { element in
|
|
if element == 13 {
|
|
throw Failure.unluckyNumber
|
|
} else {
|
|
return element
|
|
}
|
|
}
|
|
}
|
|
|
|
var output: [Int] = []
|
|
var thrown: Error? = nil
|
|
|
|
do {
|
|
for try await element in channel {
|
|
output.append(element)
|
|
}
|
|
} catch {
|
|
thrown = error
|
|
}
|
|
|
|
XCTAssertNotNil(thrown)
|
|
let expected = Array(input[0..<2])
|
|
XCTAssertEqual(expected, output)
|
|
}
|
|
|
|
func testChannelCancelled() async throws {
|
|
// parent task is canceled while reducing values from sequence
|
|
// before a value is sent which should result in a sum of zero
|
|
let input = 2006
|
|
let reduced = asyncExpectation(description: "reduced")
|
|
let done = asyncExpectation(description: "done")
|
|
let channel = AmplifyAsyncSequence<Int>()
|
|
|
|
let task = Task<Int, Never> {
|
|
let sum = await channel.reduce(0, +)
|
|
await reduced.fulfill()
|
|
return sum
|
|
}
|
|
|
|
// cancel before value is sent
|
|
task.cancel()
|
|
|
|
await waitForExpectations([reduced])
|
|
channel.send(input)
|
|
|
|
Task {
|
|
let output = await task.value
|
|
XCTAssertNotEqual(input, output)
|
|
XCTAssertEqual(0, output)
|
|
await done.fulfill()
|
|
}
|
|
|
|
await waitForExpectations([done])
|
|
}
|
|
|
|
func testThrowingChannelCancelled() async throws {
|
|
// parent task is canceled while reducing values from sequence
|
|
// before a value is sent which should result in a sum of zero
|
|
let input = 2006
|
|
let reduced = asyncExpectation(description: "reduced")
|
|
let done = asyncExpectation(description: "done")
|
|
let channel = AmplifyAsyncThrowingSequence<Int>()
|
|
|
|
let task = Task<Int, Error> {
|
|
let sum = try await channel.reduce(0, +)
|
|
await reduced.fulfill()
|
|
return sum
|
|
}
|
|
|
|
// cancel before any value is sent
|
|
task.cancel()
|
|
await waitForExpectations([reduced])
|
|
channel.send(input)
|
|
|
|
Task {
|
|
let output = try await task.value
|
|
XCTAssertNotEqual(input, output)
|
|
XCTAssertEqual(0, output)
|
|
await done.fulfill()
|
|
}
|
|
|
|
await waitForExpectations([done])
|
|
}
|
|
|
|
func testValueProducingParentOperation() async throws {
|
|
let sent = asyncExpectation(description: "sent")
|
|
let received = asyncExpectation(description: "received")
|
|
let steps = 10
|
|
let delay = 0.01
|
|
let request = LongOperationRequest(steps: steps, delay: delay)
|
|
var count = 0
|
|
let operation = LongOperation(request: request)
|
|
let channel = AmplifyAsyncSequence<LongOperation.InProcess>(parent: operation)
|
|
let token = operation.subscribe { (value: LongOperation.InProcess) in
|
|
count += 1
|
|
channel.send(value)
|
|
if value.totalUnitCount == value.completedUnitCount {
|
|
channel.finish()
|
|
Task {
|
|
await sent.fulfill()
|
|
}
|
|
}
|
|
}
|
|
queue.addOperation(operation)
|
|
Task {
|
|
let values = Output<LongOperation.InProcess>()
|
|
for await value in channel {
|
|
await values.append(value)
|
|
}
|
|
let count = await values.elements.count
|
|
XCTAssertGreaterThanOrEqual(count, steps)
|
|
await received.fulfill()
|
|
}
|
|
|
|
await waitForExpectations([sent, received])
|
|
|
|
XCTAssertFalse(operation.isCancelled)
|
|
XCTAssertGreaterThanOrEqual(count, steps)
|
|
|
|
Amplify.Hub.removeListener(token)
|
|
}
|
|
|
|
func testCancellingWithParentOperation() async throws {
|
|
let sent = asyncExpectation(description: "sent")
|
|
let received = asyncExpectation(description: "received")
|
|
let steps = 10
|
|
let delay = 0.01
|
|
let request = LongOperationRequest(steps: steps, delay: delay)
|
|
var count = 0
|
|
let operation = LongOperation(request: request)
|
|
let channel = AmplifyAsyncSequence<LongOperation.InProcess>(parent: operation)
|
|
let token = operation.subscribe { (value: LongOperation.InProcess) in
|
|
count += 1
|
|
channel.send(value)
|
|
if value.completedUnitCount >= steps/2 {
|
|
channel.cancel()
|
|
Task {
|
|
await sent.fulfill()
|
|
}
|
|
}
|
|
}
|
|
queue.addOperation(operation)
|
|
Task {
|
|
let values = Output<LongOperation.InProcess>()
|
|
for await value in channel {
|
|
await values.append(value)
|
|
}
|
|
let count = await values.elements.count
|
|
XCTAssertLessThan(count, steps)
|
|
await received.fulfill()
|
|
}
|
|
|
|
await waitForExpectations([sent, received])
|
|
|
|
XCTAssertTrue(operation.isCancelled)
|
|
XCTAssertLessThan(count, steps)
|
|
|
|
Amplify.Hub.removeListener(token)
|
|
}
|
|
|
|
func testThrowingValueProducingParentOperation() async throws {
|
|
let sent = asyncExpectation(description: "sent")
|
|
let received = asyncExpectation(description: "received")
|
|
let steps = 10
|
|
let delay = 0.01
|
|
let request = LongOperationRequest(steps: steps, delay: delay)
|
|
var count = 0
|
|
let operation = LongOperation(request: request)
|
|
let channel = AmplifyAsyncThrowingSequence<LongOperation.InProcess>(parent: operation)
|
|
let token = operation.subscribe { (value: LongOperation.InProcess) in
|
|
count += 1
|
|
channel.send(value)
|
|
if value.totalUnitCount == value.completedUnitCount {
|
|
channel.finish()
|
|
Task {
|
|
await sent.fulfill()
|
|
}
|
|
}
|
|
}
|
|
queue.addOperation(operation)
|
|
Task {
|
|
let values = Output<LongOperation.InProcess>()
|
|
for try await value in channel {
|
|
await values.append(value)
|
|
}
|
|
let count = await values.elements.count
|
|
XCTAssertGreaterThanOrEqual(count, steps)
|
|
await received.fulfill()
|
|
}
|
|
|
|
await waitForExpectations([sent, received])
|
|
|
|
XCTAssertFalse(operation.isCancelled)
|
|
XCTAssertGreaterThanOrEqual(count, steps)
|
|
|
|
Amplify.Hub.removeListener(token)
|
|
}
|
|
|
|
func testThrowingCancellingWithParentOperation() async throws {
|
|
let sent = asyncExpectation(description: "sent")
|
|
let received = asyncExpectation(description: "received")
|
|
let steps = 10
|
|
let delay = 0.01
|
|
let request = LongOperationRequest(steps: steps, delay: delay)
|
|
var count = 0
|
|
let operation = LongOperation(request: request)
|
|
let channel = AmplifyAsyncThrowingSequence<LongOperation.InProcess>(parent: operation)
|
|
let token = operation.subscribe { (value: LongOperation.InProcess) in
|
|
count += 1
|
|
channel.send(value)
|
|
if value.completedUnitCount >= steps/2 {
|
|
channel.cancel()
|
|
Task {
|
|
await sent.fulfill()
|
|
}
|
|
}
|
|
}
|
|
queue.addOperation(operation)
|
|
Task {
|
|
let values = Output<LongOperation.InProcess>()
|
|
for try await value in channel {
|
|
await values.append(value)
|
|
}
|
|
let count = await values.elements.count
|
|
XCTAssertLessThan(count, steps)
|
|
await received.fulfill()
|
|
}
|
|
|
|
await waitForExpectations([sent, received])
|
|
|
|
XCTAssertTrue(operation.isCancelled)
|
|
XCTAssertLessThan(count, steps)
|
|
|
|
Amplify.Hub.removeListener(token)
|
|
}
|
|
|
|
private func send<Element>(elements: [Element], channel: AmplifyAsyncSequence<Element>, sleepSeconds: Double = 0.1) async throws {
|
|
var index = 0
|
|
while index < elements.count {
|
|
try await Task.sleep(seconds: sleepSeconds)
|
|
let element = elements[index]
|
|
channel.send(element)
|
|
|
|
index += 1
|
|
}
|
|
channel.finish()
|
|
}
|
|
|
|
private func send<Element>(elements: [Element], channel: AmplifyAsyncThrowingSequence<Element>, sleepSeconds: Double = 0.1, processor: ((Element) throws -> Element)? = nil) async throws {
|
|
var index = 0
|
|
while index < elements.count {
|
|
try await Task.sleep(seconds: sleepSeconds)
|
|
let element = elements[index]
|
|
if let processor = processor {
|
|
do {
|
|
let processed = try processor(element)
|
|
channel.send(processed)
|
|
} catch {
|
|
channel.fail(error)
|
|
}
|
|
} else {
|
|
channel.send(element)
|
|
}
|
|
|
|
index += 1
|
|
}
|
|
channel.finish()
|
|
}
|
|
|
|
}
|