amplify-swift/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/RemoteSyncEngineTests.swift

356 lines
18 KiB
Swift

//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//
import XCTest
import SQLite
import Combine
@testable import Amplify
@testable import AmplifyTestCommon
@testable import AWSDataStorePlugin
class RemoteSyncEngineTests: XCTestCase {
var apiPlugin: MockAPICategoryPlugin!
var amplifyConfig: AmplifyConfiguration!
var storageAdapter: StorageEngineAdapter!
var remoteSyncEngine: RemoteSyncEngine!
var mockRequestRetryablePolicy: MockRequestRetryablePolicy!
let defaultAsyncWaitTimeout = 2.0
override func setUp() {
super.setUp()
apiPlugin = MockAPICategoryPlugin()
MockAWSInitialSyncOrchestrator.reset()
storageAdapter = MockSQLiteStorageEngineAdapter()
let mockOutgoingMutationQueue = MockOutgoingMutationQueue()
mockRequestRetryablePolicy = MockRequestRetryablePolicy()
do {
remoteSyncEngine = try RemoteSyncEngine(
storageAdapter: storageAdapter,
dataStoreConfiguration: .default,
outgoingMutationQueue: mockOutgoingMutationQueue,
initialSyncOrchestratorFactory: MockAWSInitialSyncOrchestrator.factory,
reconciliationQueueFactory: MockAWSIncomingEventReconciliationQueue.factory,
requestRetryablePolicy: mockRequestRetryablePolicy
)
} catch {
XCTFail("Failed to setup")
return
}
}
func testErrorOnNilStorageAdapter() throws {
guard let remoteSyncEngine = remoteSyncEngine else {
XCTFail("Failed to initialize remoteSyncEngine")
return
}
let failureOnStorageAdapter = expectation(description: "Expect receiveCompletion on storageAdapterFailure")
storageAdapter = nil
let remoteSyncEngineSink = remoteSyncEngine
.publisher
.sink(receiveCompletion: { _ in
failureOnStorageAdapter.fulfill()
}, receiveValue: { _ in
XCTFail("We should not expect the sync engine not to continue")
})
remoteSyncEngine.start(api: MockAPICategoryPlugin(), auth: nil)
wait(for: [failureOnStorageAdapter], timeout: defaultAsyncWaitTimeout)
remoteSyncEngineSink.cancel()
}
func testFailureOnInitialSync() async throws {
let storageAdapterAvailable = expectation(description: "storageAdapterAvailable")
let subscriptionsPaused = expectation(description: "subscriptionsPaused")
let mutationsPaused = expectation(description: "mutationsPaused")
let stateMutationsCleared = expectation(description: "stateMutationsCleared")
let subscriptionsInitialized = expectation(description: "subscriptionsInitialized")
let subscriptionsEstablishedReceived = expectation(description: "subscriptionsEstablished received")
let cleanedup = expectation(description: "cleanedup")
let failureOnInitialSync = expectation(description: "failureOnInitialSync")
let retryAdviceReceivedNetworkError = expectation(description: "retry advice received network error")
var currCount = 1
let advice = RequestRetryAdvice.init(shouldRetry: false)
mockRequestRetryablePolicy.pushOnRetryRequestAdvice(response: advice)
mockRequestRetryablePolicy.setOnRetryRequestAdvice { urlError, _, _ in
XCTAssertNotNil(urlError)
retryAdviceReceivedNetworkError.fulfill()
}
let filter = HubFilters.forEventName(HubPayload.EventName.DataStore.subscriptionsEstablished)
let hubListener = Amplify.Hub.listen(to: .dataStore, isIncluded: filter) { payload in
XCTAssertNil(payload.data)
subscriptionsEstablishedReceived.fulfill()
}
guard try await HubListenerTestUtilities.waitForListener(with: hubListener, timeout: 5.0) else {
XCTFail("Listener not registered for hub")
return
}
let remoteSyncEngineSink = remoteSyncEngine
.publisher
.sink(receiveCompletion: { _ in
currCount = self.checkAndFulfill(currCount, 7, expectation: failureOnInitialSync)
}, receiveValue: { (event: RemoteSyncEngineEvent) in
switch event {
case .storageAdapterAvailable:
currCount = self.checkAndFulfill(currCount, 1, expectation: storageAdapterAvailable)
case .subscriptionsPaused:
currCount = self.checkAndFulfill(currCount, 2, expectation: subscriptionsPaused)
case .mutationsPaused:
currCount = self.checkAndFulfill(currCount, 3, expectation: mutationsPaused)
DispatchQueue.global().asyncAfter(deadline: DispatchTime.now() + .milliseconds(500)) {
MockAWSIncomingEventReconciliationQueue.mockSend(event: .initialized)
}
case .clearedStateOutgoingMutations:
currCount = self.checkAndFulfill(currCount, 4, expectation: stateMutationsCleared)
case .subscriptionsInitialized:
currCount = self.checkAndFulfill(currCount, 5, expectation: subscriptionsInitialized)
case .performedInitialSync:
XCTFail("performedInitialQueries should not be successful")
case .cleanedUp:
currCount = self.checkAndFulfill(currCount, 6, expectation: cleanedup)
default:
XCTFail("Unexpected case gets hit")
}
})
MockAWSInitialSyncOrchestrator.setResponseOnSync(result: .failure(
DataStoreError.internalOperation("forceError", "none", URLError(.notConnectedToInternet))))
remoteSyncEngine.start(api: apiPlugin, auth: nil)
await waitForExpectations(timeout: defaultAsyncWaitTimeout)
remoteSyncEngineSink.cancel()
Amplify.Hub.removeListener(hubListener)
}
func testRemoteSyncEngineHappyPath() throws {
let storageAdapterAvailable = expectation(description: "storageAdapterAvailable")
let subscriptionsPaused = expectation(description: "subscriptionsPaused")
let mutationsPaused = expectation(description: "mutationsPaused")
let stateMutationsCleared = expectation(description: "stateMutationsCleared")
let subscriptionsInitialized = expectation(description: "subscriptionsInitialized")
let performedInitialSync = expectation(description: "performedInitialSync")
let subscriptionActivation = expectation(description: "failureOnSubscriptionActivation")
let mutationQueueStarted = expectation(description: "mutationQueueStarted")
let syncStarted = expectation(description: "sync started")
var currCount = 1
let remoteSyncEngineSink = remoteSyncEngine
.publisher
.sink(receiveCompletion: { _ in
XCTFail("Completion should never happen")
}, receiveValue: { (event: RemoteSyncEngineEvent) in
switch event {
case .storageAdapterAvailable:
currCount = self.checkAndFulfill(currCount, 1, expectation: storageAdapterAvailable)
case .subscriptionsPaused:
currCount = self.checkAndFulfill(currCount, 2, expectation: subscriptionsPaused)
case .mutationsPaused:
currCount = self.checkAndFulfill(currCount, 3, expectation: mutationsPaused)
DispatchQueue.global().asyncAfter(deadline: DispatchTime.now() + .milliseconds(500)) {
MockAWSIncomingEventReconciliationQueue.mockSend(event: .initialized)
}
case .clearedStateOutgoingMutations:
currCount = self.checkAndFulfill(currCount, 4, expectation: stateMutationsCleared)
case .subscriptionsInitialized:
currCount = self.checkAndFulfill(currCount, 5, expectation: subscriptionsInitialized)
case .performedInitialSync:
currCount = self.checkAndFulfill(currCount, 6, expectation: performedInitialSync)
case .subscriptionsActivated:
currCount = self.checkAndFulfill(currCount, 7, expectation: subscriptionActivation)
case .mutationQueueStarted:
currCount = self.checkAndFulfill(currCount, 8, expectation: mutationQueueStarted)
case .syncStarted:
currCount = self.checkAndFulfill(currCount, 9, expectation: syncStarted)
default:
XCTFail("unexpected call")
}
})
remoteSyncEngine.start(api: apiPlugin, auth: nil)
wait(for: [storageAdapterAvailable,
subscriptionsPaused,
mutationsPaused,
stateMutationsCleared,
subscriptionsInitialized,
performedInitialSync,
subscriptionActivation,
mutationQueueStarted,
syncStarted], timeout: defaultAsyncWaitTimeout)
remoteSyncEngineSink.cancel()
}
// swiftlint:disable:next cyclomatic_complexity
func testCatastrophicErrorEndsRemoteSyncEngine() throws {
let storageAdapterAvailable = expectation(description: "storageAdapterAvailable")
let subscriptionsPaused = expectation(description: "subscriptionsPaused")
let mutationsPaused = expectation(description: "mutationsPaused")
let stateMutationsCleared = expectation(description: "stateMutationsCleared")
let subscriptionsInitialized = expectation(description: "subscriptionsInitialized")
let performedInitialSync = expectation(description: "performedInitialSync")
let subscriptionActivation = expectation(description: "failureOnSubscriptionActivation")
let mutationQueueStarted = expectation(description: "mutationQueueStarted")
let syncStarted = expectation(description: "syncStarted")
let cleanedUp = expectation(description: "cleanedUp")
let forceFailToNotRestartSyncEngine = expectation(description: "forceFailToNotRestartSyncEngine")
var currCount = 1
let advice = RequestRetryAdvice.init(shouldRetry: false)
mockRequestRetryablePolicy.pushOnRetryRequestAdvice(response: advice)
let remoteSyncEngineSink = remoteSyncEngine
.publisher
.sink(receiveCompletion: { _ in
currCount = self.checkAndFulfill(currCount, 11, expectation: forceFailToNotRestartSyncEngine)
}, receiveValue: { (event: RemoteSyncEngineEvent) in
switch event {
case .storageAdapterAvailable:
currCount = self.checkAndFulfill(currCount, 1, expectation: storageAdapterAvailable)
case .subscriptionsPaused:
currCount = self.checkAndFulfill(currCount, 2, expectation: subscriptionsPaused)
case .mutationsPaused:
currCount = self.checkAndFulfill(currCount, 3, expectation: mutationsPaused)
DispatchQueue.global().asyncAfter(deadline: DispatchTime.now() + .milliseconds(500)) {
MockAWSIncomingEventReconciliationQueue.mockSend(event: .initialized)
}
case .clearedStateOutgoingMutations:
currCount = self.checkAndFulfill(currCount, 4, expectation: stateMutationsCleared)
case .subscriptionsInitialized:
currCount = self.checkAndFulfill(currCount, 5, expectation: subscriptionsInitialized)
case .performedInitialSync:
currCount = self.checkAndFulfill(currCount, 6, expectation: performedInitialSync)
case .subscriptionsActivated:
currCount = self.checkAndFulfill(currCount, 7, expectation: subscriptionActivation)
case .mutationQueueStarted:
currCount = self.checkAndFulfill(currCount, 8, expectation: mutationQueueStarted)
case .syncStarted:
currCount = self.checkAndFulfill(currCount, 9, expectation: syncStarted)
DispatchQueue.global().asyncAfter(deadline: DispatchTime.now() + .milliseconds(500)) {
MockAWSIncomingEventReconciliationQueue
.mockSendCompletion(completion: .failure(DataStoreError.unknown("", "", nil)))
}
case .cleanedUp:
currCount = self.checkAndFulfill(currCount, 10, expectation: cleanedUp)
default:
XCTFail("unexpected call")
}
})
remoteSyncEngine.start(api: apiPlugin, auth: nil)
wait(for: [storageAdapterAvailable,
subscriptionsPaused,
mutationsPaused,
stateMutationsCleared,
subscriptionsInitialized,
performedInitialSync,
subscriptionActivation,
mutationQueueStarted,
syncStarted,
cleanedUp,
forceFailToNotRestartSyncEngine], timeout: defaultAsyncWaitTimeout)
remoteSyncEngineSink.cancel()
}
// swiftlint:disable:next cyclomatic_complexity
func testStopEndsRemoteSyncEngine() throws {
let storageAdapterAvailable = expectation(description: "storageAdapterAvailable")
let subscriptionsPaused = expectation(description: "subscriptionsPaused")
let mutationsPaused = expectation(description: "mutationsPaused")
let stateMutationsCleared = expectation(description: "stateMutationsCleared")
let subscriptionsInitialized = expectation(description: "subscriptionsInitialized")
let performedInitialSync = expectation(description: "performedInitialSync")
let subscriptionActivation = expectation(description: "failureOnSubscriptionActivation")
let mutationQueueStarted = expectation(description: "mutationQueueStarted")
let syncStarted = expectation(description: "syncStarted")
let cleanedUpForTermination = expectation(description: "cleanedUpForTermination")
let forceFailToNotRestartSyncEngine = expectation(description: "forceFailToNotRestartSyncEngine")
let completionBlockCalled = expectation(description: "Completion block is called")
var currCount = 1
let advice = RequestRetryAdvice.init(shouldRetry: false)
mockRequestRetryablePolicy.pushOnRetryRequestAdvice(response: advice)
let remoteSyncEngineSink = remoteSyncEngine
.publisher
.sink(receiveCompletion: { _ in
currCount = self.checkAndFulfill(currCount, 11, expectation: forceFailToNotRestartSyncEngine)
}, receiveValue: { (event: RemoteSyncEngineEvent) in
switch event {
case .storageAdapterAvailable:
currCount = self.checkAndFulfill(currCount, 1, expectation: storageAdapterAvailable)
case .subscriptionsPaused:
currCount = self.checkAndFulfill(currCount, 2, expectation: subscriptionsPaused)
case .mutationsPaused:
currCount = self.checkAndFulfill(currCount, 3, expectation: mutationsPaused)
DispatchQueue.global().asyncAfter(deadline: DispatchTime.now() + .milliseconds(500)) {
MockAWSIncomingEventReconciliationQueue.mockSend(event: .initialized)
}
case .clearedStateOutgoingMutations:
currCount = self.checkAndFulfill(currCount, 4, expectation: stateMutationsCleared)
case .subscriptionsInitialized:
currCount = self.checkAndFulfill(currCount, 5, expectation: subscriptionsInitialized)
case .performedInitialSync:
currCount = self.checkAndFulfill(currCount, 6, expectation: performedInitialSync)
case .subscriptionsActivated:
currCount = self.checkAndFulfill(currCount, 7, expectation: subscriptionActivation)
case .mutationQueueStarted:
currCount = self.checkAndFulfill(currCount, 8, expectation: mutationQueueStarted)
case .syncStarted:
currCount = self.checkAndFulfill(currCount, 9, expectation: syncStarted)
DispatchQueue.global().asyncAfter(deadline: DispatchTime.now() + .milliseconds(500)) {
self.remoteSyncEngine.stop(completion: { result in
if case .success = result {
currCount = self.checkAndFulfill(currCount, 12, expectation: completionBlockCalled)
}
})
}
case .cleanedUpForTermination:
currCount = self.checkAndFulfill(currCount, 10, expectation: cleanedUpForTermination)
default:
XCTFail("unexpected call")
}
})
remoteSyncEngine.start(api: apiPlugin, auth: nil)
wait(for: [storageAdapterAvailable,
subscriptionsPaused,
mutationsPaused,
stateMutationsCleared,
subscriptionsInitialized,
performedInitialSync,
subscriptionActivation,
mutationQueueStarted,
syncStarted,
cleanedUpForTermination,
completionBlockCalled,
forceFailToNotRestartSyncEngine], timeout: defaultAsyncWaitTimeout)
remoteSyncEngineSink.cancel()
}
private func checkAndFulfill(_ currCount: Int, _ expectedCount: Int, expectation: XCTestExpectation) -> Int {
if currCount == expectedCount {
expectation.fulfill()
return currCount + 1
}
return currCount
}
}