356 lines
18 KiB
Swift
356 lines
18 KiB
Swift
//
|
|
// Copyright Amazon.com Inc. or its affiliates.
|
|
// All Rights Reserved.
|
|
//
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
//
|
|
|
|
import XCTest
|
|
import SQLite
|
|
|
|
import Combine
|
|
@testable import Amplify
|
|
@testable import AmplifyTestCommon
|
|
@testable import AWSDataStorePlugin
|
|
|
|
class RemoteSyncEngineTests: XCTestCase {
|
|
var apiPlugin: MockAPICategoryPlugin!
|
|
|
|
var amplifyConfig: AmplifyConfiguration!
|
|
var storageAdapter: StorageEngineAdapter!
|
|
var remoteSyncEngine: RemoteSyncEngine!
|
|
var mockRequestRetryablePolicy: MockRequestRetryablePolicy!
|
|
|
|
let defaultAsyncWaitTimeout = 2.0
|
|
|
|
override func setUp() {
|
|
super.setUp()
|
|
apiPlugin = MockAPICategoryPlugin()
|
|
MockAWSInitialSyncOrchestrator.reset()
|
|
storageAdapter = MockSQLiteStorageEngineAdapter()
|
|
let mockOutgoingMutationQueue = MockOutgoingMutationQueue()
|
|
mockRequestRetryablePolicy = MockRequestRetryablePolicy()
|
|
do {
|
|
remoteSyncEngine = try RemoteSyncEngine(
|
|
storageAdapter: storageAdapter,
|
|
dataStoreConfiguration: .default,
|
|
outgoingMutationQueue: mockOutgoingMutationQueue,
|
|
initialSyncOrchestratorFactory: MockAWSInitialSyncOrchestrator.factory,
|
|
reconciliationQueueFactory: MockAWSIncomingEventReconciliationQueue.factory,
|
|
requestRetryablePolicy: mockRequestRetryablePolicy
|
|
)
|
|
} catch {
|
|
XCTFail("Failed to setup")
|
|
return
|
|
}
|
|
}
|
|
|
|
func testErrorOnNilStorageAdapter() throws {
|
|
guard let remoteSyncEngine = remoteSyncEngine else {
|
|
XCTFail("Failed to initialize remoteSyncEngine")
|
|
return
|
|
}
|
|
|
|
let failureOnStorageAdapter = expectation(description: "Expect receiveCompletion on storageAdapterFailure")
|
|
|
|
storageAdapter = nil
|
|
let remoteSyncEngineSink = remoteSyncEngine
|
|
.publisher
|
|
.sink(receiveCompletion: { _ in
|
|
failureOnStorageAdapter.fulfill()
|
|
}, receiveValue: { _ in
|
|
XCTFail("We should not expect the sync engine not to continue")
|
|
})
|
|
|
|
remoteSyncEngine.start(api: MockAPICategoryPlugin(), auth: nil)
|
|
|
|
wait(for: [failureOnStorageAdapter], timeout: defaultAsyncWaitTimeout)
|
|
remoteSyncEngineSink.cancel()
|
|
}
|
|
|
|
func testFailureOnInitialSync() async throws {
|
|
let storageAdapterAvailable = expectation(description: "storageAdapterAvailable")
|
|
let subscriptionsPaused = expectation(description: "subscriptionsPaused")
|
|
let mutationsPaused = expectation(description: "mutationsPaused")
|
|
let stateMutationsCleared = expectation(description: "stateMutationsCleared")
|
|
let subscriptionsInitialized = expectation(description: "subscriptionsInitialized")
|
|
let subscriptionsEstablishedReceived = expectation(description: "subscriptionsEstablished received")
|
|
let cleanedup = expectation(description: "cleanedup")
|
|
let failureOnInitialSync = expectation(description: "failureOnInitialSync")
|
|
let retryAdviceReceivedNetworkError = expectation(description: "retry advice received network error")
|
|
var currCount = 1
|
|
|
|
let advice = RequestRetryAdvice.init(shouldRetry: false)
|
|
mockRequestRetryablePolicy.pushOnRetryRequestAdvice(response: advice)
|
|
mockRequestRetryablePolicy.setOnRetryRequestAdvice { urlError, _, _ in
|
|
XCTAssertNotNil(urlError)
|
|
retryAdviceReceivedNetworkError.fulfill()
|
|
}
|
|
|
|
let filter = HubFilters.forEventName(HubPayload.EventName.DataStore.subscriptionsEstablished)
|
|
let hubListener = Amplify.Hub.listen(to: .dataStore, isIncluded: filter) { payload in
|
|
XCTAssertNil(payload.data)
|
|
subscriptionsEstablishedReceived.fulfill()
|
|
}
|
|
|
|
guard try await HubListenerTestUtilities.waitForListener(with: hubListener, timeout: 5.0) else {
|
|
XCTFail("Listener not registered for hub")
|
|
return
|
|
}
|
|
|
|
let remoteSyncEngineSink = remoteSyncEngine
|
|
.publisher
|
|
.sink(receiveCompletion: { _ in
|
|
currCount = self.checkAndFulfill(currCount, 7, expectation: failureOnInitialSync)
|
|
}, receiveValue: { (event: RemoteSyncEngineEvent) in
|
|
switch event {
|
|
case .storageAdapterAvailable:
|
|
currCount = self.checkAndFulfill(currCount, 1, expectation: storageAdapterAvailable)
|
|
case .subscriptionsPaused:
|
|
currCount = self.checkAndFulfill(currCount, 2, expectation: subscriptionsPaused)
|
|
case .mutationsPaused:
|
|
currCount = self.checkAndFulfill(currCount, 3, expectation: mutationsPaused)
|
|
DispatchQueue.global().asyncAfter(deadline: DispatchTime.now() + .milliseconds(500)) {
|
|
MockAWSIncomingEventReconciliationQueue.mockSend(event: .initialized)
|
|
}
|
|
case .clearedStateOutgoingMutations:
|
|
currCount = self.checkAndFulfill(currCount, 4, expectation: stateMutationsCleared)
|
|
case .subscriptionsInitialized:
|
|
currCount = self.checkAndFulfill(currCount, 5, expectation: subscriptionsInitialized)
|
|
case .performedInitialSync:
|
|
XCTFail("performedInitialQueries should not be successful")
|
|
case .cleanedUp:
|
|
currCount = self.checkAndFulfill(currCount, 6, expectation: cleanedup)
|
|
default:
|
|
XCTFail("Unexpected case gets hit")
|
|
}
|
|
})
|
|
MockAWSInitialSyncOrchestrator.setResponseOnSync(result: .failure(
|
|
DataStoreError.internalOperation("forceError", "none", URLError(.notConnectedToInternet))))
|
|
|
|
remoteSyncEngine.start(api: apiPlugin, auth: nil)
|
|
|
|
await waitForExpectations(timeout: defaultAsyncWaitTimeout)
|
|
remoteSyncEngineSink.cancel()
|
|
Amplify.Hub.removeListener(hubListener)
|
|
}
|
|
|
|
func testRemoteSyncEngineHappyPath() throws {
|
|
let storageAdapterAvailable = expectation(description: "storageAdapterAvailable")
|
|
let subscriptionsPaused = expectation(description: "subscriptionsPaused")
|
|
let mutationsPaused = expectation(description: "mutationsPaused")
|
|
let stateMutationsCleared = expectation(description: "stateMutationsCleared")
|
|
let subscriptionsInitialized = expectation(description: "subscriptionsInitialized")
|
|
let performedInitialSync = expectation(description: "performedInitialSync")
|
|
let subscriptionActivation = expectation(description: "failureOnSubscriptionActivation")
|
|
let mutationQueueStarted = expectation(description: "mutationQueueStarted")
|
|
let syncStarted = expectation(description: "sync started")
|
|
|
|
var currCount = 1
|
|
|
|
let remoteSyncEngineSink = remoteSyncEngine
|
|
.publisher
|
|
.sink(receiveCompletion: { _ in
|
|
XCTFail("Completion should never happen")
|
|
}, receiveValue: { (event: RemoteSyncEngineEvent) in
|
|
switch event {
|
|
case .storageAdapterAvailable:
|
|
currCount = self.checkAndFulfill(currCount, 1, expectation: storageAdapterAvailable)
|
|
case .subscriptionsPaused:
|
|
currCount = self.checkAndFulfill(currCount, 2, expectation: subscriptionsPaused)
|
|
case .mutationsPaused:
|
|
currCount = self.checkAndFulfill(currCount, 3, expectation: mutationsPaused)
|
|
DispatchQueue.global().asyncAfter(deadline: DispatchTime.now() + .milliseconds(500)) {
|
|
MockAWSIncomingEventReconciliationQueue.mockSend(event: .initialized)
|
|
}
|
|
case .clearedStateOutgoingMutations:
|
|
currCount = self.checkAndFulfill(currCount, 4, expectation: stateMutationsCleared)
|
|
case .subscriptionsInitialized:
|
|
currCount = self.checkAndFulfill(currCount, 5, expectation: subscriptionsInitialized)
|
|
case .performedInitialSync:
|
|
currCount = self.checkAndFulfill(currCount, 6, expectation: performedInitialSync)
|
|
case .subscriptionsActivated:
|
|
currCount = self.checkAndFulfill(currCount, 7, expectation: subscriptionActivation)
|
|
case .mutationQueueStarted:
|
|
currCount = self.checkAndFulfill(currCount, 8, expectation: mutationQueueStarted)
|
|
case .syncStarted:
|
|
currCount = self.checkAndFulfill(currCount, 9, expectation: syncStarted)
|
|
default:
|
|
XCTFail("unexpected call")
|
|
}
|
|
})
|
|
|
|
remoteSyncEngine.start(api: apiPlugin, auth: nil)
|
|
|
|
wait(for: [storageAdapterAvailable,
|
|
subscriptionsPaused,
|
|
mutationsPaused,
|
|
stateMutationsCleared,
|
|
subscriptionsInitialized,
|
|
performedInitialSync,
|
|
subscriptionActivation,
|
|
mutationQueueStarted,
|
|
syncStarted], timeout: defaultAsyncWaitTimeout)
|
|
remoteSyncEngineSink.cancel()
|
|
}
|
|
|
|
// swiftlint:disable:next cyclomatic_complexity
|
|
func testCatastrophicErrorEndsRemoteSyncEngine() throws {
|
|
let storageAdapterAvailable = expectation(description: "storageAdapterAvailable")
|
|
let subscriptionsPaused = expectation(description: "subscriptionsPaused")
|
|
let mutationsPaused = expectation(description: "mutationsPaused")
|
|
let stateMutationsCleared = expectation(description: "stateMutationsCleared")
|
|
let subscriptionsInitialized = expectation(description: "subscriptionsInitialized")
|
|
let performedInitialSync = expectation(description: "performedInitialSync")
|
|
let subscriptionActivation = expectation(description: "failureOnSubscriptionActivation")
|
|
let mutationQueueStarted = expectation(description: "mutationQueueStarted")
|
|
let syncStarted = expectation(description: "syncStarted")
|
|
let cleanedUp = expectation(description: "cleanedUp")
|
|
let forceFailToNotRestartSyncEngine = expectation(description: "forceFailToNotRestartSyncEngine")
|
|
|
|
var currCount = 1
|
|
|
|
let advice = RequestRetryAdvice.init(shouldRetry: false)
|
|
mockRequestRetryablePolicy.pushOnRetryRequestAdvice(response: advice)
|
|
|
|
let remoteSyncEngineSink = remoteSyncEngine
|
|
.publisher
|
|
.sink(receiveCompletion: { _ in
|
|
currCount = self.checkAndFulfill(currCount, 11, expectation: forceFailToNotRestartSyncEngine)
|
|
}, receiveValue: { (event: RemoteSyncEngineEvent) in
|
|
switch event {
|
|
case .storageAdapterAvailable:
|
|
currCount = self.checkAndFulfill(currCount, 1, expectation: storageAdapterAvailable)
|
|
case .subscriptionsPaused:
|
|
currCount = self.checkAndFulfill(currCount, 2, expectation: subscriptionsPaused)
|
|
case .mutationsPaused:
|
|
currCount = self.checkAndFulfill(currCount, 3, expectation: mutationsPaused)
|
|
DispatchQueue.global().asyncAfter(deadline: DispatchTime.now() + .milliseconds(500)) {
|
|
MockAWSIncomingEventReconciliationQueue.mockSend(event: .initialized)
|
|
}
|
|
case .clearedStateOutgoingMutations:
|
|
currCount = self.checkAndFulfill(currCount, 4, expectation: stateMutationsCleared)
|
|
case .subscriptionsInitialized:
|
|
currCount = self.checkAndFulfill(currCount, 5, expectation: subscriptionsInitialized)
|
|
case .performedInitialSync:
|
|
currCount = self.checkAndFulfill(currCount, 6, expectation: performedInitialSync)
|
|
case .subscriptionsActivated:
|
|
currCount = self.checkAndFulfill(currCount, 7, expectation: subscriptionActivation)
|
|
case .mutationQueueStarted:
|
|
currCount = self.checkAndFulfill(currCount, 8, expectation: mutationQueueStarted)
|
|
case .syncStarted:
|
|
currCount = self.checkAndFulfill(currCount, 9, expectation: syncStarted)
|
|
DispatchQueue.global().asyncAfter(deadline: DispatchTime.now() + .milliseconds(500)) {
|
|
MockAWSIncomingEventReconciliationQueue
|
|
.mockSendCompletion(completion: .failure(DataStoreError.unknown("", "", nil)))
|
|
}
|
|
case .cleanedUp:
|
|
currCount = self.checkAndFulfill(currCount, 10, expectation: cleanedUp)
|
|
default:
|
|
XCTFail("unexpected call")
|
|
}
|
|
})
|
|
|
|
remoteSyncEngine.start(api: apiPlugin, auth: nil)
|
|
|
|
wait(for: [storageAdapterAvailable,
|
|
subscriptionsPaused,
|
|
mutationsPaused,
|
|
stateMutationsCleared,
|
|
subscriptionsInitialized,
|
|
performedInitialSync,
|
|
subscriptionActivation,
|
|
mutationQueueStarted,
|
|
syncStarted,
|
|
cleanedUp,
|
|
forceFailToNotRestartSyncEngine], timeout: defaultAsyncWaitTimeout)
|
|
remoteSyncEngineSink.cancel()
|
|
}
|
|
|
|
// swiftlint:disable:next cyclomatic_complexity
|
|
func testStopEndsRemoteSyncEngine() throws {
|
|
let storageAdapterAvailable = expectation(description: "storageAdapterAvailable")
|
|
let subscriptionsPaused = expectation(description: "subscriptionsPaused")
|
|
let mutationsPaused = expectation(description: "mutationsPaused")
|
|
let stateMutationsCleared = expectation(description: "stateMutationsCleared")
|
|
let subscriptionsInitialized = expectation(description: "subscriptionsInitialized")
|
|
let performedInitialSync = expectation(description: "performedInitialSync")
|
|
let subscriptionActivation = expectation(description: "failureOnSubscriptionActivation")
|
|
let mutationQueueStarted = expectation(description: "mutationQueueStarted")
|
|
let syncStarted = expectation(description: "syncStarted")
|
|
let cleanedUpForTermination = expectation(description: "cleanedUpForTermination")
|
|
let forceFailToNotRestartSyncEngine = expectation(description: "forceFailToNotRestartSyncEngine")
|
|
let completionBlockCalled = expectation(description: "Completion block is called")
|
|
|
|
var currCount = 1
|
|
|
|
let advice = RequestRetryAdvice.init(shouldRetry: false)
|
|
mockRequestRetryablePolicy.pushOnRetryRequestAdvice(response: advice)
|
|
|
|
let remoteSyncEngineSink = remoteSyncEngine
|
|
.publisher
|
|
.sink(receiveCompletion: { _ in
|
|
currCount = self.checkAndFulfill(currCount, 11, expectation: forceFailToNotRestartSyncEngine)
|
|
}, receiveValue: { (event: RemoteSyncEngineEvent) in
|
|
switch event {
|
|
case .storageAdapterAvailable:
|
|
currCount = self.checkAndFulfill(currCount, 1, expectation: storageAdapterAvailable)
|
|
case .subscriptionsPaused:
|
|
currCount = self.checkAndFulfill(currCount, 2, expectation: subscriptionsPaused)
|
|
case .mutationsPaused:
|
|
currCount = self.checkAndFulfill(currCount, 3, expectation: mutationsPaused)
|
|
DispatchQueue.global().asyncAfter(deadline: DispatchTime.now() + .milliseconds(500)) {
|
|
MockAWSIncomingEventReconciliationQueue.mockSend(event: .initialized)
|
|
}
|
|
case .clearedStateOutgoingMutations:
|
|
currCount = self.checkAndFulfill(currCount, 4, expectation: stateMutationsCleared)
|
|
case .subscriptionsInitialized:
|
|
currCount = self.checkAndFulfill(currCount, 5, expectation: subscriptionsInitialized)
|
|
case .performedInitialSync:
|
|
currCount = self.checkAndFulfill(currCount, 6, expectation: performedInitialSync)
|
|
case .subscriptionsActivated:
|
|
currCount = self.checkAndFulfill(currCount, 7, expectation: subscriptionActivation)
|
|
case .mutationQueueStarted:
|
|
currCount = self.checkAndFulfill(currCount, 8, expectation: mutationQueueStarted)
|
|
case .syncStarted:
|
|
currCount = self.checkAndFulfill(currCount, 9, expectation: syncStarted)
|
|
DispatchQueue.global().asyncAfter(deadline: DispatchTime.now() + .milliseconds(500)) {
|
|
self.remoteSyncEngine.stop(completion: { result in
|
|
if case .success = result {
|
|
currCount = self.checkAndFulfill(currCount, 12, expectation: completionBlockCalled)
|
|
}
|
|
})
|
|
}
|
|
case .cleanedUpForTermination:
|
|
currCount = self.checkAndFulfill(currCount, 10, expectation: cleanedUpForTermination)
|
|
default:
|
|
XCTFail("unexpected call")
|
|
}
|
|
})
|
|
|
|
remoteSyncEngine.start(api: apiPlugin, auth: nil)
|
|
|
|
wait(for: [storageAdapterAvailable,
|
|
subscriptionsPaused,
|
|
mutationsPaused,
|
|
stateMutationsCleared,
|
|
subscriptionsInitialized,
|
|
performedInitialSync,
|
|
subscriptionActivation,
|
|
mutationQueueStarted,
|
|
syncStarted,
|
|
cleanedUpForTermination,
|
|
completionBlockCalled,
|
|
forceFailToNotRestartSyncEngine], timeout: defaultAsyncWaitTimeout)
|
|
remoteSyncEngineSink.cancel()
|
|
}
|
|
|
|
private func checkAndFulfill(_ currCount: Int, _ expectedCount: Int, expectation: XCTestExpectation) -> Int {
|
|
if currCount == expectedCount {
|
|
expectation.fulfill()
|
|
return currCount + 1
|
|
}
|
|
return currCount
|
|
}
|
|
}
|