amplify-swift/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/AWSDataStorePlugin.swift

258 lines
10 KiB
Swift

//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//
import Amplify
import Combine
import AWSPluginsCore
import Foundation
enum DataStoreState {
case start(storageEngine: StorageEngineBehavior)
case stop
case clear
}
enum InitStorageEngineResult {
case successfullyInitialized
case alreadyInitialized
case failure(DataStoreError)
}
final public class AWSDataStorePlugin: DataStoreCategoryPlugin {
public var key: PluginKey = "awsDataStorePlugin"
/// `true` if any models are syncable. Resolved during configuration phase
var isSyncEnabled: Bool
/// The listener on hub events unsubscribe token
var hubListener: UnsubscribeToken?
/// The Publisher that sends mutation events to subscribers
var dataStorePublisher: ModelSubcriptionBehavior?
var dataStoreStateSubject = PassthroughSubject<DataStoreState, DataStoreError>()
var dispatchedModelSyncedEvents: [ModelName: AtomicValue<Bool>]
let modelRegistration: AmplifyModelRegistration
/// The DataStore configuration
let dataStoreConfiguration: DataStoreConfiguration
/// A queue that regulates the execution of operations. This will be instantiated during initalization phase,
/// and is clearable by `reset()`. This is implicitly unwrapped to be destroyed when resetting.
var operationQueue: OperationQueue!
let validAPIPluginKey: String
let validAuthPluginKey: String
var storageEngine: StorageEngineBehavior!
var storageEngineInitQueue = DispatchQueue(label: "AWSDataStorePlugin.storageEngineInitQueue")
var queue = DispatchQueue(label: "AWSDataStorePlugin.queue", target: DispatchQueue.global())
var storageEngineBehaviorFactory: StorageEngineBehaviorFactory
var iStorageEngineSink: Any?
var storageEngineSink: AnyCancellable? {
get {
if let iStorageEngineSink = iStorageEngineSink as? AnyCancellable {
return iStorageEngineSink
}
return nil
}
set {
iStorageEngineSink = newValue
}
}
/// No-argument init that uses defaults for all providers
public init(modelRegistration: AmplifyModelRegistration,
configuration dataStoreConfiguration: DataStoreConfiguration = .default) {
self.modelRegistration = modelRegistration
self.dataStoreConfiguration = dataStoreConfiguration
self.isSyncEnabled = false
self.operationQueue = OperationQueue()
self.validAPIPluginKey = "awsAPIPlugin"
self.validAuthPluginKey = "awsCognitoAuthPlugin"
self.storageEngineBehaviorFactory =
StorageEngine.init(isSyncEnabled:dataStoreConfiguration:validAPIPluginKey:validAuthPluginKey:modelRegistryVersion:userDefault:)
self.dataStorePublisher = DataStorePublisher()
self.dispatchedModelSyncedEvents = [:]
}
/// Internal initializer for testing
init(modelRegistration: AmplifyModelRegistration,
configuration dataStoreConfiguration: DataStoreConfiguration = .default,
storageEngineBehaviorFactory: StorageEngineBehaviorFactory? = nil,
dataStorePublisher: ModelSubcriptionBehavior,
operationQueue: OperationQueue = OperationQueue(),
validAPIPluginKey: String,
validAuthPluginKey: String) {
self.modelRegistration = modelRegistration
self.dataStoreConfiguration = dataStoreConfiguration
self.operationQueue = operationQueue
self.isSyncEnabled = false
self.storageEngineBehaviorFactory = storageEngineBehaviorFactory ??
StorageEngine.init(isSyncEnabled:dataStoreConfiguration:validAPIPluginKey:validAuthPluginKey:modelRegistryVersion:userDefault:)
self.dataStorePublisher = dataStorePublisher
self.dispatchedModelSyncedEvents = [:]
self.validAPIPluginKey = validAPIPluginKey
self.validAuthPluginKey = validAuthPluginKey
}
/// By the time this method gets called, DataStore will already have invoked
/// `AmplifyModelRegistration.registerModels`, so we can inspect those models to derive isSyncEnabled, and pass
/// them to `StorageEngine.setUp(modelSchemas:)`
public func configure(using amplifyConfiguration: Any?) throws {
modelRegistration.registerModels(registry: ModelRegistry.self)
for modelSchema in ModelRegistry.modelSchemas {
dispatchedModelSyncedEvents[modelSchema.name] = AtomicValue(initialValue: false)
}
resolveSyncEnabled()
ModelListDecoderRegistry.registerDecoder(DataStoreListDecoder.self)
}
/// Initializes the underlying storage engine
/// - Returns: success if the engine is successfully initialized or
/// a failure with a DataStoreError
func initStorageEngine() -> InitStorageEngineResult {
storageEngineInitQueue.sync {
if storageEngine != nil {
return .alreadyInitialized
}
do {
if self.dataStorePublisher == nil {
self.dataStorePublisher = DataStorePublisher()
}
try resolveStorageEngine(dataStoreConfiguration: dataStoreConfiguration)
try storageEngine.setUp(modelSchemas: ModelRegistry.modelSchemas)
try storageEngine.applyModelMigrations(modelSchemas: ModelRegistry.modelSchemas)
return .successfullyInitialized
} catch {
log.error(error: error)
return .failure(.invalidOperation(causedBy: error))
}
}
}
/// Initializes the underlying storage engine and starts the syncing process
/// - Parameter completion: completion handler called with a success if the sync process started
/// or with a DataStoreError in case of failure
func initStorageEngineAndStartSync(completion: @escaping DataStoreCallback<Void> = { _ in }) {
switch initStorageEngine() {
case .alreadyInitialized:
completion(.successfulVoid)
case .successfullyInitialized:
storageEngine.startSync { result in
self.dataStoreStateSubject.send(.start(storageEngine: self.storageEngine))
completion(result)
}
case .failure(let error):
completion(.failure(causedBy: error))
}
}
func resolveStorageEngine(dataStoreConfiguration: DataStoreConfiguration) throws {
guard storageEngine == nil else {
return
}
storageEngine = try storageEngineBehaviorFactory(isSyncEnabled,
dataStoreConfiguration,
validAPIPluginKey,
validAuthPluginKey,
modelRegistration.version,
UserDefaults.standard)
setupStorageSink()
}
// MARK: Private
private func resolveSyncEnabled() {
isSyncEnabled = ModelRegistry.hasSyncableModels
}
private func setupStorageSink() {
storageEngineSink = storageEngine
.publisher
.sink(
receiveCompletion: { [weak self] in self?.onReceiveCompletion(completed: $0) },
receiveValue: { [weak self] in self?.onReceiveValue(receiveValue: $0) }
)
}
private func onReceiveCompletion(completed: Subscribers.Completion<DataStoreError>) {
switch completed {
case .failure(let dataStoreError):
log.error("StorageEngine completed with error: \(dataStoreError)")
case .finished:
break
}
stop { result in
switch result {
case .success:
self.log.info("Stopping DataStore successful.")
return
case .failure(let error):
self.log.error("Failed to stop StorageEngine with error: \(error)")
}
}
}
func onReceiveValue(receiveValue: StorageEngineEvent) {
guard let dataStorePublisher = self.dataStorePublisher else {
log.error("Data store publisher not initalized")
return
}
switch receiveValue {
case .started:
break
case .mutationEvent(let mutationEvent):
dataStorePublisher.send(input: mutationEvent)
case .modelSyncedEvent(let modelSyncedEvent):
log.verbose("Emitting DataStore event: modelSyncedEvent \(modelSyncedEvent)")
dispatchedModelSyncedEvents[modelSyncedEvent.modelName]?.set(true)
let modelSyncedEventPayload = HubPayload(eventName: HubPayload.EventName.DataStore.modelSynced,
data: modelSyncedEvent)
Amplify.Hub.dispatch(to: .dataStore, payload: modelSyncedEventPayload)
case .syncQueriesReadyEvent:
log.verbose("[Lifecycle event 4]: syncQueriesReady")
let syncQueriesReadyEventPayload = HubPayload(eventName: HubPayload.EventName.DataStore.syncQueriesReady)
Amplify.Hub.dispatch(to: .dataStore, payload: syncQueriesReadyEventPayload)
case .readyEvent:
log.verbose("[Lifecycle event 6]: ready")
let readyEventPayload = HubPayload(eventName: HubPayload.EventName.DataStore.ready)
Amplify.Hub.dispatch(to: .dataStore, payload: readyEventPayload)
}
}
public func reset() async {
if operationQueue != nil {
operationQueue = nil
}
dispatchedModelSyncedEvents = [:]
if let listener = hubListener {
Amplify.Hub.removeListener(listener)
hubListener = nil
}
if let resettable = storageEngine as? Resettable {
log.verbose("Resetting storageEngine")
await resettable.reset()
self.log.verbose("Resetting storageEngine: finished")
}
}
}
extension AWSDataStorePlugin: AmplifyVersionable { }