amplify-swift/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/ModelSyncedEventEmitter.swift

179 lines
7.1 KiB
Swift

//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//
import Amplify
import AWSPluginsCore
import Combine
import Foundation
enum IncomingModelSyncedEmitterEvent {
case mutationEventApplied(MutationEvent)
case mutationEventDropped(modelName: String, error: DataStoreError? = nil)
case modelSyncedEvent(ModelSyncedEvent)
}
/// Listens to events published by both the `InitialSyncOrchestrator` and `IncomingEventReconciliationQueue`,
/// and emits a `ModelSyncedEvent` when the initial sync is complete. This class expects
/// `InitialSyncOrchestrator` and `IncomingEventReconciliationQueue` to have matching counts
/// for the events they enqueue and process, respectively. Always send back the reconciled event
/// (`.mutationEventApplied`, `.mutationEventDropped`). The flow also provides a guaranteed sequence of events for the
/// mutation event which causes the `ModelSyncedEvent` to be emitted afterwards by
/// - Check if it `ModelSyncedEvent` should be emitted, if so, emit it.
/// - Then send the mutation event which was used in the check above.
final class ModelSyncedEventEmitter {
private let queue = DispatchQueue(label: "com.amazonaws.ModelSyncedEventEmitterQueue",
target: DispatchQueue.global())
private var syncOrchestratorSink: AnyCancellable?
private var reconciliationQueueSink: AnyCancellable?
private let modelSchema: ModelSchema
private var recordsReceived: Int
private var reconciledReceived: Int
private var initialSyncOperationFinished: Bool
private var modelSyncedEventBuilder: ModelSyncedEvent.Builder
private var modelSyncedEventTopic: PassthroughSubject<IncomingModelSyncedEmitterEvent, Never>
var publisher: AnyPublisher<IncomingModelSyncedEmitterEvent, Never> {
return modelSyncedEventTopic.eraseToAnyPublisher()
}
var shouldSendModelSyncedEvent: Bool {
initialSyncOperationFinished && reconciledReceived == recordsReceived
}
/// Used within ModelSyncedEventEmitter instances, not thread-safe, is accessed serially under DispatchQueue.
var dispatchedModelSyncedEvent: Bool
init(modelSchema: ModelSchema,
initialSyncOrchestrator: InitialSyncOrchestrator?,
reconciliationQueue: IncomingEventReconciliationQueue?) {
self.modelSchema = modelSchema
self.recordsReceived = 0
self.reconciledReceived = 0
self.initialSyncOperationFinished = false
self.dispatchedModelSyncedEvent = false
self.modelSyncedEventBuilder = ModelSyncedEvent.Builder()
self.modelSyncedEventTopic = PassthroughSubject<IncomingModelSyncedEmitterEvent, Never>()
self.syncOrchestratorSink = initialSyncOrchestrator?
.publisher
.receive(on: queue)
.filter(filterSyncOperationEvent(_:))
.sink(receiveCompletion: { _ in },
receiveValue: { [weak self] value in
self?.onReceiveSyncOperationEvent(value: value)
})
self.reconciliationQueueSink = reconciliationQueue?
.publisher
.receive(on: queue)
.filter(filterReconciliationQueueEvent(_:))
.sink(receiveCompletion: { _ in },
receiveValue: { [weak self] value in
self?.onReceiveReconciliationEvent(value: value)
})
}
/// Filtering `InitialSyncOperationEvent`s that come from `InitialSyncOperation` of the same ModelType
private func filterSyncOperationEvent(_ value: InitialSyncOperationEvent) -> Bool {
switch value {
case .started(let modelName, _):
return modelSchema.name == modelName
case .enqueued(_, let modelName):
return modelSchema.name == modelName
case .finished(let modelName, _):
return modelSchema.name == modelName
}
}
/// Filtering `IncomingEventReconciliationQueueEvent`s that come from `ReconciliationAndLocalSaveOperation`
/// of the same ModelType
private func filterReconciliationQueueEvent(_ value: IncomingEventReconciliationQueueEvent) -> Bool {
switch value {
case .mutationEventApplied(let event):
return modelSchema.name == event.modelName
case .mutationEventDropped(let modelName, _):
return modelSchema.name == modelName
case .idle, .initialized, .started, .paused:
return false
}
}
private func onReceiveSyncOperationEvent(value: InitialSyncOperationEvent) {
switch value {
case .started(_, let syncType):
modelSyncedEventBuilder.isFullSync = syncType == .fullSync ? true : false
modelSyncedEventBuilder.isDeltaSync = !modelSyncedEventBuilder.isFullSync
case .enqueued:
recordsReceived += 1
case .finished:
if recordsReceived == 0 || recordsReceived == reconciledReceived {
sendModelSyncedEvent()
} else {
initialSyncOperationFinished = true
}
}
}
private func onReceiveReconciliationEvent(value: IncomingEventReconciliationQueueEvent) {
guard !dispatchedModelSyncedEvent else {
switch value {
case .mutationEventApplied(let event):
modelSyncedEventTopic.send(.mutationEventApplied(event))
case .mutationEventDropped(let modelName, let error):
modelSyncedEventTopic.send(.mutationEventDropped(modelName: modelName, error: error))
case .idle, .initialized, .started, .paused:
return
}
return
}
switch value {
case .mutationEventApplied(let event):
reconciledReceived += 1
switch GraphQLMutationType(rawValue: event.mutationType) {
case .create:
modelSyncedEventBuilder.added += 1
case .update:
modelSyncedEventBuilder.updated += 1
case .delete:
modelSyncedEventBuilder.deleted += 1
default:
log.error("Unexpected mutationType received: \(event.mutationType)")
}
modelSyncedEventTopic.send(.mutationEventApplied(event))
if shouldSendModelSyncedEvent {
sendModelSyncedEvent()
}
case .mutationEventDropped(let modelName, let error):
reconciledReceived += 1
modelSyncedEventTopic.send(.mutationEventDropped(modelName: modelName, error: error))
if shouldSendModelSyncedEvent {
sendModelSyncedEvent()
}
case .idle, .initialized, .started, .paused:
return
}
}
private func sendModelSyncedEvent() {
modelSyncedEventBuilder.modelName = modelSchema.name
let modelSyncedEvent = modelSyncedEventBuilder.build()
log.verbose("[Lifecycle event 3]: modelSyncedEvent model: \(modelSchema.name)")
modelSyncedEventTopic.send(.modelSyncedEvent(modelSyncedEvent))
dispatchedModelSyncedEvent = true
syncOrchestratorSink?.cancel()
}
}
extension ModelSyncedEventEmitter: DefaultLogger { }