611 lines
28 KiB
Swift
611 lines
28 KiB
Swift
//
|
|
// Copyright Amazon.com Inc. or its affiliates.
|
|
// All Rights Reserved.
|
|
//
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
//
|
|
|
|
import Foundation
|
|
import XCTest
|
|
import Combine
|
|
|
|
@testable import Amplify
|
|
@testable import AmplifyTestCommon
|
|
@testable import AWSPluginsCore
|
|
@testable import AWSDataStorePlugin
|
|
|
|
class ObserveQueryTaskRunnerTests: XCTestCase {
|
|
var storageEngine: MockStorageEngineBehavior!
|
|
var dataStorePublisher: ModelSubcriptionBehavior!
|
|
var dataStoreStateSubject = PassthroughSubject<DataStoreState, DataStoreError>()
|
|
|
|
override func setUp() {
|
|
ModelRegistry.register(modelType: Post.self)
|
|
storageEngine = MockStorageEngineBehavior()
|
|
dataStorePublisher = DataStorePublisher()
|
|
}
|
|
|
|
/// An item changed observed will be returned in a single snapshot
|
|
///
|
|
/// - Given: The operation has started and the first query has completed.
|
|
/// - When:
|
|
/// - Observe an item change.
|
|
/// - Then:
|
|
/// - The item observed will be returned in the second snapshot
|
|
///
|
|
func testItemChangedWillGenerateSnapshot() async throws {
|
|
let firstSnapshot = asyncExpectation(description: "first query snapshots")
|
|
let secondSnapshot = asyncExpectation(description: "second query snapshots")
|
|
let taskRunner = ObserveQueryTaskRunner(
|
|
modelType: Post.self,
|
|
modelSchema: Post.schema,
|
|
predicate: nil,
|
|
sortInput: nil,
|
|
storageEngine: storageEngine,
|
|
dataStorePublisher: dataStorePublisher,
|
|
dataStoreConfiguration: .default,
|
|
dispatchedModelSyncedEvent: AtomicValue(initialValue: false),
|
|
dataStoreStatePublisher: dataStoreStateSubject.eraseToAnyPublisher())
|
|
let snapshots = taskRunner.sequence
|
|
Task {
|
|
var querySnapshots = [DataStoreQuerySnapshot<Post>]()
|
|
do {
|
|
for try await querySnapshot in snapshots {
|
|
querySnapshots.append(querySnapshot)
|
|
if querySnapshots.count == 1 {
|
|
XCTAssertEqual(querySnapshot.items.count, 0)
|
|
await firstSnapshot.fulfill()
|
|
} else if querySnapshots.count == 2 {
|
|
XCTAssertEqual(querySnapshot.items.count, 1)
|
|
await secondSnapshot.fulfill()
|
|
}
|
|
}
|
|
} catch {
|
|
XCTFail("Failed with error \(error)")
|
|
}
|
|
}
|
|
|
|
await waitForExpectations([firstSnapshot], timeout: 1)
|
|
|
|
let post = try createPost(id: "1")
|
|
dataStorePublisher.send(input: post)
|
|
await waitForExpectations([secondSnapshot], timeout: 10)
|
|
}
|
|
|
|
/// ObserveQuery will send a single snapshot when the sync state toggles
|
|
/// from false to true. The operation internally listens to `.modelSynced` event from
|
|
/// the Hub.
|
|
///
|
|
/// - Given: ObserveQuery has started and the first snapshot has been received.
|
|
/// - When:
|
|
/// - modelSyncedEvent is sent to the Hub
|
|
/// - Then:
|
|
/// - ObserveQuery will send a second snapshot
|
|
///
|
|
func testGenerateSnapshotOnObserveQueryWhenModelSynced() async throws {
|
|
let firstSnapshot = asyncExpectation(description: "first query snapshots")
|
|
let secondSnapshot = asyncExpectation(description: "second query snapshots")
|
|
let thirdSnapshot = asyncExpectation(description: "third query snapshot", isInverted: true)
|
|
let dispatchedModelSyncedEvent = AtomicValue(initialValue: false)
|
|
let taskRunner = ObserveQueryTaskRunner(
|
|
modelType: Post.self,
|
|
modelSchema: Post.schema,
|
|
predicate: nil,
|
|
sortInput: nil,
|
|
storageEngine: storageEngine,
|
|
dataStorePublisher: dataStorePublisher,
|
|
dataStoreConfiguration: .default,
|
|
dispatchedModelSyncedEvent: dispatchedModelSyncedEvent,
|
|
dataStoreStatePublisher: dataStoreStateSubject.eraseToAnyPublisher())
|
|
let snapshots = taskRunner.sequence
|
|
Task {
|
|
var querySnapshots = [DataStoreQuerySnapshot<Post>]()
|
|
do {
|
|
for try await querySnapshot in snapshots {
|
|
querySnapshots.append(querySnapshot)
|
|
if querySnapshots.count == 1 {
|
|
XCTAssertEqual(querySnapshot.items.count, 0)
|
|
XCTAssertEqual(querySnapshot.isSynced, false)
|
|
await firstSnapshot.fulfill()
|
|
} else if querySnapshots.count == 2 {
|
|
XCTAssertEqual(querySnapshot.items.count, 0)
|
|
XCTAssertEqual(querySnapshot.isSynced, true)
|
|
await secondSnapshot.fulfill()
|
|
} else if querySnapshots.count == 3 {
|
|
XCTFail("Should not receive third snapshot for a Model change")
|
|
await thirdSnapshot.fulfill()
|
|
}
|
|
}
|
|
} catch {
|
|
XCTFail("Failed with error \(error)")
|
|
}
|
|
}
|
|
|
|
await waitForExpectations([firstSnapshot], timeout: 5)
|
|
|
|
dispatchedModelSyncedEvent.set(true)
|
|
let modelSyncedEventPayload = HubPayload(eventName: HubPayload.EventName.DataStore.modelSynced,
|
|
data: ModelSyncedEvent(modelName: Post.modelName, isFullSync: true,
|
|
isDeltaSync: false, added: 0, updated: 0,
|
|
deleted: 0))
|
|
Amplify.Hub.dispatch(to: .dataStore, payload: modelSyncedEventPayload)
|
|
await waitForExpectations([secondSnapshot], timeout: 10)
|
|
|
|
let modelSyncedEventNotMatch = HubPayload(eventName: HubPayload.EventName.DataStore.modelSynced,
|
|
data: ModelSyncedEvent.Builder().modelName)
|
|
Amplify.Hub.dispatch(to: .dataStore, payload: modelSyncedEventNotMatch)
|
|
await waitForExpectations([thirdSnapshot], timeout: 10)
|
|
}
|
|
|
|
/// ObserveQuery will send the first snapshot with 2 items when storage engine
|
|
/// is mocked to return 2 items.
|
|
///
|
|
/// - Given: ObserveQuery starts
|
|
/// - When:
|
|
/// - ObserveQuery performs the initial query, two posts are queried through StorageEngine
|
|
/// - Then:
|
|
/// - The items queried will return two posts in the first snapshot
|
|
///
|
|
func testFirstSnapshotFromStorageQueryReturnsTwoPosts() async {
|
|
let firstSnapshot = asyncExpectation(description: "firstSnapshot received")
|
|
let taskRunner = ObserveQueryTaskRunner(
|
|
modelType: Post.self,
|
|
modelSchema: Post.schema,
|
|
predicate: nil,
|
|
sortInput: nil,
|
|
storageEngine: storageEngine,
|
|
dataStorePublisher: dataStorePublisher,
|
|
dataStoreConfiguration: .default,
|
|
dispatchedModelSyncedEvent: AtomicValue(initialValue: false),
|
|
dataStoreStatePublisher: dataStoreStateSubject.eraseToAnyPublisher())
|
|
let post = Post(title: "model1",
|
|
content: "content1",
|
|
createdAt: .now())
|
|
storageEngine.responders[.query] = QueryResponder<Post>(callback: { _ in
|
|
return .success([post, Post(title: "model1", content: "content1", createdAt: .now())])
|
|
})
|
|
let snapshots = taskRunner.sequence
|
|
Task {
|
|
var querySnapshots = [DataStoreQuerySnapshot<Post>]()
|
|
do {
|
|
for try await querySnapshot in snapshots {
|
|
querySnapshots.append(querySnapshot)
|
|
if querySnapshots.count == 1 {
|
|
XCTAssertEqual(querySnapshot.items.count, 2)
|
|
await firstSnapshot.fulfill()
|
|
}
|
|
}
|
|
|
|
} catch {
|
|
XCTFail("Failed with error \(error)")
|
|
}
|
|
}
|
|
await waitForExpectations([firstSnapshot], timeout: 10)
|
|
}
|
|
|
|
/// Multiple item changed observed will be returned in a single snapshot
|
|
///
|
|
/// - Given: The operation has started and the first query has completed.
|
|
/// - When:
|
|
/// - Observe multiple item changes.
|
|
/// - Then:
|
|
/// - The items observed will be returned in the second snapshot
|
|
///
|
|
func testMultipleItemChangesWillGenerateSecondSnapshot() async throws {
|
|
let firstSnapshot = asyncExpectation(description: "first query snapshot")
|
|
let secondSnapshot = asyncExpectation(description: "second query snapshot")
|
|
|
|
let taskRunner = ObserveQueryTaskRunner(
|
|
modelType: Post.self,
|
|
modelSchema: Post.schema,
|
|
predicate: nil,
|
|
sortInput: nil,
|
|
storageEngine: storageEngine,
|
|
dataStorePublisher: dataStorePublisher,
|
|
dataStoreConfiguration: .default,
|
|
dispatchedModelSyncedEvent: AtomicValue(initialValue: false),
|
|
dataStoreStatePublisher: dataStoreStateSubject.eraseToAnyPublisher())
|
|
let snapshots = taskRunner.sequence
|
|
Task {
|
|
var querySnapshots = [DataStoreQuerySnapshot<Post>]()
|
|
do {
|
|
for try await querySnapshot in snapshots {
|
|
querySnapshots.append(querySnapshot)
|
|
if querySnapshots.count == 1 {
|
|
XCTAssertEqual(querySnapshot.items.count, 0)
|
|
await firstSnapshot.fulfill()
|
|
} else if querySnapshots.count == 2 {
|
|
XCTAssertEqual(querySnapshot.items.count, 3)
|
|
await secondSnapshot.fulfill()
|
|
}
|
|
}
|
|
} catch {
|
|
XCTFail("Failed with error \(error)")
|
|
}
|
|
}
|
|
|
|
await waitForExpectations([firstSnapshot], timeout: 1)
|
|
|
|
let post1 = try createPost(id: "1")
|
|
let post2 = try createPost(id: "2")
|
|
let post3 = try createPost(id: "3")
|
|
dataStorePublisher.send(input: post1)
|
|
dataStorePublisher.send(input: post2)
|
|
dataStorePublisher.send(input: post3)
|
|
await waitForExpectations([secondSnapshot], timeout: 10)
|
|
}
|
|
|
|
/// Multiple published objects (more than the `.collect` count of 1000) in a relatively short time window
|
|
/// will cause the operation in test to exceed the limit of 1000 in its collection of items before sending a snapshot.
|
|
/// The first snapshot will have 1000 items, and subsequent snapshots will follow as the remaining objects are published and processed.
|
|
///
|
|
/// - Given: The operation has started and the first query has completed.
|
|
/// - When:
|
|
/// - Observe 1100 item changes (beyond the `.collect` count of 1000)
|
|
/// - Then:
|
|
/// - The items observed will perform a query and return 1000 items changed in the second query and the
|
|
/// remaining in the third query
|
|
///
|
|
func testCollectOverMaxItemCountLimit() async throws {
|
|
let firstSnapshot = asyncExpectation(description: "first query snapshot")
|
|
let secondSnapshot = asyncExpectation(description: "second query snapshot")
|
|
let thirdSnapshot = asyncExpectation(description: "third query snapshot")
|
|
let validateSnapshotsComplete = asyncExpectation(description: "validate snapshots")
|
|
let taskRunner = ObserveQueryTaskRunner(
|
|
modelType: Post.self,
|
|
modelSchema: Post.schema,
|
|
predicate: nil,
|
|
sortInput: nil,
|
|
storageEngine: storageEngine,
|
|
dataStorePublisher: dataStorePublisher,
|
|
dataStoreConfiguration: .default,
|
|
dispatchedModelSyncedEvent: AtomicValue(initialValue: false),
|
|
dataStoreStatePublisher: dataStoreStateSubject.eraseToAnyPublisher())
|
|
let snapshots = taskRunner.sequence
|
|
Task {
|
|
var querySnapshots = [DataStoreQuerySnapshot<Post>]()
|
|
do {
|
|
for try await querySnapshot in snapshots {
|
|
querySnapshots.append(querySnapshot)
|
|
if querySnapshots.count == 1 {
|
|
await firstSnapshot.fulfill()
|
|
} else if querySnapshots.count == 2 {
|
|
await secondSnapshot.fulfill()
|
|
} else if querySnapshots.count == 3 {
|
|
await thirdSnapshot.fulfill()
|
|
}
|
|
}
|
|
|
|
XCTAssertTrue(querySnapshots.count >= 3)
|
|
XCTAssertTrue(querySnapshots[0].items.count <= querySnapshots[1].items.count)
|
|
XCTAssertTrue(querySnapshots[1].items.count <= querySnapshots[2].items.count)
|
|
XCTAssertTrue(querySnapshots[2].items.count <= 1_100)
|
|
await validateSnapshotsComplete.fulfill()
|
|
} catch {
|
|
XCTFail("Failed with error \(error)")
|
|
}
|
|
}
|
|
|
|
await waitForExpectations([firstSnapshot], timeout: 1)
|
|
|
|
for postId in 1 ... 1_100 {
|
|
let post = try createPost(id: "\(postId)")
|
|
dataStorePublisher.send(input: post)
|
|
}
|
|
|
|
await waitForExpectations([secondSnapshot, thirdSnapshot], timeout: 10)
|
|
snapshots.cancel()
|
|
await waitForExpectations([validateSnapshotsComplete], timeout: 1.0)
|
|
}
|
|
|
|
/// Cancelling the subscription will no longer receive snapshots
|
|
///
|
|
/// - Given: subscriber to the operation
|
|
/// - When:
|
|
/// - subscriber is cancelled
|
|
/// - Then:
|
|
/// - no further snapshots are received
|
|
///
|
|
func testSuccessfulSubscriptionCancel() async throws {
|
|
let firstSnapshot = asyncExpectation(description: "first query snapshot")
|
|
let secondSnapshot = asyncExpectation(description: "second query snapshot", isInverted: true)
|
|
let taskRunner = ObserveQueryTaskRunner(
|
|
modelType: Post.self,
|
|
modelSchema: Post.schema,
|
|
predicate: nil,
|
|
sortInput: nil,
|
|
storageEngine: storageEngine,
|
|
dataStorePublisher: dataStorePublisher,
|
|
dataStoreConfiguration: .default,
|
|
dispatchedModelSyncedEvent: AtomicValue(initialValue: false),
|
|
dataStoreStatePublisher: dataStoreStateSubject.eraseToAnyPublisher())
|
|
let snapshots = taskRunner.sequence
|
|
Task {
|
|
var querySnapshots = [DataStoreQuerySnapshot<Post>]()
|
|
do {
|
|
for try await querySnapshot in snapshots {
|
|
querySnapshots.append(querySnapshot)
|
|
if querySnapshots.count == 1 {
|
|
await firstSnapshot.fulfill()
|
|
} else if querySnapshots.count == 2 {
|
|
XCTFail("Should not receive second snapshot after cancelling")
|
|
await secondSnapshot.fulfill()
|
|
}
|
|
}
|
|
} catch {
|
|
XCTFail("Failed with error \(error)")
|
|
}
|
|
}
|
|
|
|
await waitForExpectations([firstSnapshot], timeout: 1)
|
|
snapshots.cancel()
|
|
let post1 = try createPost(id: "1")
|
|
dataStorePublisher.send(input: post1)
|
|
await waitForExpectations([secondSnapshot], timeout: 1)
|
|
}
|
|
|
|
/// Cancelling the underlying operation will emit a completion to the subscribers
|
|
///
|
|
/// - Given: subscriber to the operation
|
|
/// - When:
|
|
/// - operation is cancelled
|
|
/// - Then:
|
|
/// - the subscriber receives a cancellation
|
|
///
|
|
func testSuccessfulSequenceCancel() async throws {
|
|
let firstSnapshot = asyncExpectation(description: "first query snapshot")
|
|
let secondSnapshot = asyncExpectation(description: "second query snapshot", isInverted: true)
|
|
let completedEvent = asyncExpectation(description: "should have completed")
|
|
let taskRunner = ObserveQueryTaskRunner(
|
|
modelType: Post.self,
|
|
modelSchema: Post.schema,
|
|
predicate: nil,
|
|
sortInput: nil,
|
|
storageEngine: storageEngine,
|
|
dataStorePublisher: dataStorePublisher,
|
|
dataStoreConfiguration: .default,
|
|
dispatchedModelSyncedEvent: AtomicValue(initialValue: false),
|
|
dataStoreStatePublisher: dataStoreStateSubject.eraseToAnyPublisher())
|
|
let snapshots = taskRunner.sequence
|
|
Task {
|
|
var querySnapshots = [DataStoreQuerySnapshot<Post>]()
|
|
do {
|
|
for try await querySnapshot in snapshots {
|
|
querySnapshots.append(querySnapshot)
|
|
if querySnapshots.count == 1 {
|
|
await firstSnapshot.fulfill()
|
|
} else if querySnapshots.count == 2 {
|
|
XCTFail("Should not receive second snapshot after cancelling")
|
|
await secondSnapshot.fulfill()
|
|
}
|
|
}
|
|
await completedEvent.fulfill()
|
|
} catch {
|
|
XCTFail("Failed with error \(error)")
|
|
}
|
|
}
|
|
await waitForExpectations([firstSnapshot], timeout: 1)
|
|
snapshots.cancel()
|
|
let post1 = try createPost(id: "1")
|
|
dataStorePublisher.send(input: post1)
|
|
|
|
await waitForExpectations([secondSnapshot, completedEvent], timeout: 1)
|
|
}
|
|
|
|
/// ObserveQuery's state should be able to be reset and initial query able to be started again.
|
|
func testObserveQueryResetStateThenStartObserveQuery() async {
|
|
let firstSnapshot = asyncExpectation(description: "first query snapshot")
|
|
let secondSnapshot = asyncExpectation(description: "second query snapshot")
|
|
let taskRunner = ObserveQueryTaskRunner(
|
|
modelType: Post.self,
|
|
modelSchema: Post.schema,
|
|
predicate: nil,
|
|
sortInput: nil,
|
|
storageEngine: storageEngine,
|
|
dataStorePublisher: dataStorePublisher,
|
|
dataStoreConfiguration: .default,
|
|
dispatchedModelSyncedEvent: AtomicValue(initialValue: false),
|
|
dataStoreStatePublisher: dataStoreStateSubject.eraseToAnyPublisher())
|
|
let snapshots = taskRunner.sequence
|
|
Task {
|
|
var querySnapshots = [DataStoreQuerySnapshot<Post>]()
|
|
do {
|
|
for try await querySnapshot in snapshots {
|
|
querySnapshots.append(querySnapshot)
|
|
if querySnapshots.count == 1 {
|
|
await firstSnapshot.fulfill()
|
|
} else if querySnapshots.count == 2 {
|
|
await secondSnapshot.fulfill()
|
|
}
|
|
}
|
|
} catch {
|
|
XCTFail("Failed with error \(error)")
|
|
}
|
|
}
|
|
|
|
await waitForExpectations([firstSnapshot], timeout: 1)
|
|
dataStoreStateSubject.send(.stop)
|
|
dataStoreStateSubject.send(.start(storageEngine: storageEngine))
|
|
await waitForExpectations([secondSnapshot], timeout: 1)
|
|
}
|
|
|
|
/// Multiple calls to start the observeQuery should not start again
|
|
///
|
|
/// - Given: ObserverQuery operation is created, and then reset
|
|
/// - When:
|
|
/// - operation.startObserveQuery twice
|
|
/// - Then:
|
|
/// - Only one query should be performed / only one snapshot should be returned
|
|
func testObserveQueryStaredShouldNotStartAgain() async {
|
|
let firstSnapshot = asyncExpectation(description: "first query snapshot")
|
|
let secondSnapshot = asyncExpectation(description: "second query snapshot")
|
|
let thirdSnapshot = asyncExpectation(description: "third query snapshot", isInverted: true)
|
|
let taskRunner = ObserveQueryTaskRunner(
|
|
modelType: Post.self,
|
|
modelSchema: Post.schema,
|
|
predicate: nil,
|
|
sortInput: nil,
|
|
storageEngine: storageEngine,
|
|
dataStorePublisher: dataStorePublisher,
|
|
dataStoreConfiguration: .default,
|
|
dispatchedModelSyncedEvent: AtomicValue(initialValue: false),
|
|
dataStoreStatePublisher: dataStoreStateSubject.eraseToAnyPublisher())
|
|
let snapshots = taskRunner.sequence
|
|
Task {
|
|
var querySnapshots = [DataStoreQuerySnapshot<Post>]()
|
|
do {
|
|
for try await querySnapshot in snapshots {
|
|
querySnapshots.append(querySnapshot)
|
|
if querySnapshots.count == 1 {
|
|
await firstSnapshot.fulfill()
|
|
} else if querySnapshots.count == 2 {
|
|
await secondSnapshot.fulfill()
|
|
} else if querySnapshots.count == 3 {
|
|
await thirdSnapshot.fulfill()
|
|
}
|
|
}
|
|
} catch {
|
|
XCTFail("Failed with error \(error)")
|
|
}
|
|
}
|
|
|
|
await waitForExpectations([firstSnapshot], timeout: 1)
|
|
dataStoreStateSubject.send(.stop)
|
|
dataStoreStateSubject.send(.start(storageEngine: storageEngine))
|
|
dataStoreStateSubject.send(.start(storageEngine: storageEngine))
|
|
await waitForExpectations([secondSnapshot, thirdSnapshot], timeout: 1)
|
|
XCTAssertTrue(taskRunner.observeQueryStarted)
|
|
}
|
|
|
|
/// ObserveQuery operation entry points are `resetState`, `startObserveQuery`, and `onItemChanges(mutationEvents)`.
|
|
/// Ensure concurrent random sequences of these API calls do not cause issues such as data race.
|
|
func testConcurrent() async {
|
|
let completeReceived = asyncExpectation(description: "complete received", isInverted: true)
|
|
let taskRunner = ObserveQueryTaskRunner(
|
|
modelType: Post.self,
|
|
modelSchema: Post.schema,
|
|
predicate: nil,
|
|
sortInput: nil,
|
|
storageEngine: storageEngine,
|
|
dataStorePublisher: dataStorePublisher,
|
|
dataStoreConfiguration: .default,
|
|
dispatchedModelSyncedEvent: AtomicValue(initialValue: false),
|
|
dataStoreStatePublisher: dataStoreStateSubject.eraseToAnyPublisher())
|
|
let post = Post(title: "model1",
|
|
content: "content1",
|
|
createdAt: .now())
|
|
storageEngine.responders[.query] = QueryResponder<Post>(callback: { _ in
|
|
return .success([post, Post(title: "model1", content: "content1", createdAt: .now())])
|
|
})
|
|
let snapshots = taskRunner.sequence
|
|
Task {
|
|
do {
|
|
for try await _ in snapshots {
|
|
}
|
|
await completeReceived.fulfill()
|
|
} catch {
|
|
XCTFail("Failed with error \(error)")
|
|
}
|
|
}
|
|
await withTaskGroup(of: Void.self) { group in
|
|
for _ in 0..<3000 {
|
|
group.addTask {
|
|
let index = Int.random(in: 1 ... 5)
|
|
if index == 1 {
|
|
self.dataStoreStateSubject.send(.stop)
|
|
} else if index == 2 {
|
|
self.dataStoreStateSubject.send(.start(storageEngine: self.storageEngine))
|
|
} else {
|
|
do {
|
|
let itemChange = try self.createPost(id: post.id)
|
|
let itemChange2 = try self.createPost()
|
|
taskRunner.onItemsChangeDuringSync(mutationEvents: [itemChange, itemChange2])
|
|
} catch {
|
|
XCTFail("Failed to create post")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
for await _ in group {
|
|
}
|
|
}
|
|
await waitForExpectations([completeReceived], timeout: 10)
|
|
}
|
|
|
|
/// When a predicate like `title.beginsWith("title")` is given, the models that matched the predicate
|
|
/// should be added to the snapshots, like `post` and `post2`. When `post2.title` is updated to no longer
|
|
/// match the predicate, it should be removed from the snapshot.
|
|
func testUpdatedModelNoLongerMatchesPredicateRemovedFromSnapshot() async throws {
|
|
let firstSnapshot = asyncExpectation(description: "first query snapshots")
|
|
let secondSnapshot = asyncExpectation(description: "second query snapshots")
|
|
let thirdSnapshot = asyncExpectation(description: "third query snapshots")
|
|
let fourthSnapshot = asyncExpectation(description: "fourth query snapshots")
|
|
let taskRunner = ObserveQueryTaskRunner(
|
|
modelType: Post.self,
|
|
modelSchema: Post.schema,
|
|
predicate: Post.keys.title.beginsWith("title"),
|
|
sortInput: QuerySortInput.ascending(Post.keys.id).asSortDescriptors(),
|
|
storageEngine: storageEngine,
|
|
dataStorePublisher: dataStorePublisher,
|
|
dataStoreConfiguration: .default,
|
|
dispatchedModelSyncedEvent: AtomicValue(initialValue: true),
|
|
dataStoreStatePublisher: dataStoreStateSubject.eraseToAnyPublisher())
|
|
|
|
let snapshots = taskRunner.sequence
|
|
Task {
|
|
var querySnapshots = [DataStoreQuerySnapshot<Post>]()
|
|
do {
|
|
for try await querySnapshot in snapshots {
|
|
querySnapshots.append(querySnapshot)
|
|
if querySnapshots.count == 1 {
|
|
// First snapshot is empty from the initial query
|
|
XCTAssertEqual(querySnapshot.items.count, 0)
|
|
await firstSnapshot.fulfill()
|
|
} else if querySnapshots.count == 2 {
|
|
// Second snapshot contains `post` since it matches the predicate
|
|
XCTAssertEqual(querySnapshot.items.count, 1)
|
|
XCTAssertEqual(querySnapshot.items[0].id, "1")
|
|
await secondSnapshot.fulfill()
|
|
} else if querySnapshots.count == 3 {
|
|
// Third snapshot contains both posts since they both match the predicate
|
|
XCTAssertEqual(querySnapshot.items.count, 2)
|
|
XCTAssertEqual(querySnapshot.items[0].id, "1")
|
|
XCTAssertEqual(querySnapshot.items[1].id, "2")
|
|
await thirdSnapshot.fulfill()
|
|
} else if querySnapshots.count == 4 {
|
|
// Fourth snapshot no longer has the post2 since it was updated to not match the predicate
|
|
// and deleted at the same time.
|
|
XCTAssertEqual(querySnapshot.items.count, 1)
|
|
XCTAssertEqual(querySnapshot.items[0].id, "1")
|
|
await fourthSnapshot.fulfill()
|
|
}
|
|
}
|
|
} catch {
|
|
XCTFail("Failed with error \(error)")
|
|
}
|
|
}
|
|
await waitForExpectations([firstSnapshot], timeout: 5)
|
|
|
|
let post = try createPost(id: "1", title: "title 1")
|
|
dataStorePublisher.send(input: post)
|
|
let post2 = try createPost(id: "2", title: "title 2")
|
|
dataStorePublisher.send(input: post2)
|
|
var updatedPost2 = try createPost(id: "2", title: "Does not match predicate")
|
|
updatedPost2.mutationType = MutationEvent.MutationType.update.rawValue
|
|
dataStorePublisher.send(input: updatedPost2)
|
|
await waitForExpectations([secondSnapshot, thirdSnapshot, fourthSnapshot], timeout: 10)
|
|
}
|
|
|
|
|
|
// MARK: - Helpers
|
|
|
|
func createPost(id: String = UUID().uuidString, title: String? = nil) throws -> MutationEvent {
|
|
try MutationEvent(model: Post(id: id,
|
|
title: title ?? "model1",
|
|
content: "content1",
|
|
createdAt: .now()),
|
|
modelSchema: Post.schema,
|
|
mutationType: MutationEvent.MutationType.create)
|
|
}
|
|
}
|