amplify-swift/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/CascadeDeleteOperation.swift

516 lines
25 KiB
Swift

//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//
import Foundation
import Amplify
import AWSPluginsCore
import Combine
/// CascadeDeleteOperation has the following logic:
/// 1. Query models from local store based on the following use cases:
/// 1a. If the use case is Delete with id, then query by `id`
/// 1b. or Delete with id and condition, then query by `id` and `condition`. If the model given the condition does not exist,
/// check if the model exists. if the model exists, then fail with `DataStoreError.invalidCondition`.
/// 1c. or Delete with filter, then query by `filter`.
/// 2. If there are at least one item to delete, query for all its associated models recursively.
/// 3. Delete the original queried items from local store. This performs a cascade delete by default (See
/// **CreateTableStatement** for more details, `on delete cascade` when creating the SQL table enables this behavior).
/// 4. If sync is enabled, then submit the delete mutations to the sync engine, in the order of children to parent models.
public class CascadeDeleteOperation<M: Model>: AsynchronousOperation {
let storageAdapter: StorageEngineAdapter
var syncEngine: RemoteSyncEngineBehavior?
let modelType: M.Type
let modelSchema: ModelSchema
let deleteInput: DeleteInput
let completionForWithId: ((DataStoreResult<M?>) -> Void)?
let completionForWithFilter: ((DataStoreResult<[M]>) -> Void)?
private let serialQueueSyncDeletions: DispatchQueue
private var modelName: ModelName
init(storageAdapter: StorageEngineAdapter,
syncEngine: RemoteSyncEngineBehavior?,
modelType: M.Type,
modelSchema: ModelSchema,
withIdentifier identifier: ModelIdentifierProtocol,
condition: QueryPredicate? = nil,
completion: @escaping (DataStoreResult<M?>) -> Void) {
self.storageAdapter = storageAdapter
self.syncEngine = syncEngine
self.modelType = modelType
self.modelSchema = modelSchema
var deleteInput = DeleteInput.withIdentifier(id: identifier)
if let condition = condition {
deleteInput = .withIdentifierAndCondition(id: identifier,
condition: condition)
}
self.deleteInput = deleteInput
self.completionForWithId = completion
self.completionForWithFilter = nil
self.serialQueueSyncDeletions = DispatchQueue(label: "com.amazoncom.Storage.CascadeDeleteOperation.concurrency")
self.modelName = self.modelSchema.name
super.init()
}
init(storageAdapter: StorageEngineAdapter,
syncEngine: RemoteSyncEngineBehavior?,
modelType: M.Type,
modelSchema: ModelSchema,
filter: QueryPredicate,
completion: @escaping (DataStoreResult<[M]>) -> Void) {
self.storageAdapter = storageAdapter
self.syncEngine = syncEngine
self.modelType = modelType
self.modelSchema = modelSchema
self.deleteInput = .withFilter(filter)
self.completionForWithId = nil
self.completionForWithFilter = completion
self.serialQueueSyncDeletions = DispatchQueue(label: "com.amazoncom.Storage.CascadeDeleteOperation.concurrency")
self.modelName = self.modelSchema.name
super.init()
}
override public func main() {
queryAndDelete()
}
struct QueryAndDeleteResult<M: Model> {
let deletedModels: [M]
let associatedModels: [(ModelName, Model)]
}
func queryAndDelete() {
do {
try storageAdapter.transaction {
Task {
let transactionResult = await queryAndDeleteTransaction()
syncIfNeededAndFinish(transactionResult)
}
}
} catch {
syncIfNeededAndFinish(.failure(causedBy: error))
}
}
func queryAndDeleteTransaction() async -> DataStoreResult<QueryAndDeleteResult<M>> {
var queriedResult: DataStoreResult<[M]>?
var deletedResult: DataStoreResult<[M]>?
var associatedModels: [(ModelName, Model)] = []
queriedResult = await withCheckedContinuation { continuation in
self.storageAdapter.query(self.modelType,
modelSchema: self.modelSchema,
predicate: self.deleteInput.predicate,
sort: nil,
paginationInput: nil) { result in
continuation.resume(returning: result)
}
}
guard case .success(let queriedModels) = queriedResult else {
return collapseResults(queryResult: queriedResult,
deleteResult: deletedResult,
associatedModels: associatedModels)
}
guard !queriedModels.isEmpty else {
guard case .withIdentifierAndCondition(let identifier, _) = self.deleteInput else {
// Query did not return any results, treat this as a successful no-op delete.
deletedResult = .success([M]())
return collapseResults(queryResult: queriedResult,
deleteResult: deletedResult,
associatedModels: associatedModels)
}
// Query using the computed predicate did not return any results, check if model actually exists.
do {
if try self.storageAdapter.exists(self.modelSchema, withIdentifier: identifier, predicate: nil) {
queriedResult = .failure(
DataStoreError.invalidCondition(
"Delete failed due to condition did not match existing model instance.",
"Subsequent deletes will continue to fail until the model instance is updated."))
} else {
deletedResult = .success([M]())
}
} catch {
queriedResult = .failure(DataStoreError.invalidOperation(causedBy: error))
}
return collapseResults(queryResult: queriedResult,
deleteResult: deletedResult,
associatedModels: associatedModels)
}
let modelIds = queriedModels.map { $0.identifier(schema: self.modelSchema).stringValue }
if modelName != "MutationEvent" {
self.log.debug("[CascadeDelete.1] Deleting \(modelName) with identifiers: \(modelIds)")
}
associatedModels = await self.recurseQueryAssociatedModels(modelSchema: self.modelSchema, ids: modelIds)
if self.log.logLevel >= .debug {
if modelName != "MutationEvent" {
self.log.debug("[CascadeDelete.2] Queried for \(modelName) associated models, found \(associatedModels.compactMap { $1.modelName })")
self.log.debug("[CascadeDelete.2] [Query ids of associated Models] Queried for \(associatedModels.compactMap({$1.modelName})), retrieved ids for deletion: \(associatedModels.map { $0.1.identifier(schema: modelSchema).stringValue })")
}
}
deletedResult = await withCheckedContinuation { continuation in
self.storageAdapter.delete(self.modelType,
modelSchema: self.modelSchema,
filter: self.deleteInput.predicate) { result in
continuation.resume(returning: result)
}
}
return collapseResults(queryResult: queriedResult,
deleteResult: deletedResult,
associatedModels: associatedModels)
}
func recurseQueryAssociatedModels(modelSchema: ModelSchema, ids: [String]) async -> [(ModelName, Model)] {
var associatedModels: [(ModelName, Model)] = []
for (_, modelField) in modelSchema.fields {
guard modelField.hasAssociation,
modelField.isOneToOne || modelField.isOneToMany,
let associatedModelName = modelField.associatedModelName,
let associatedField = modelField.associatedField,
let associatedModelSchema = ModelRegistry.modelSchema(from: associatedModelName) else {
continue
}
guard let modelSchema = ModelRegistry.modelSchema(from: associatedModelName) else {
log.error("Failed to lookup associate model \(associatedModelName)")
return []
}
let queriedModels = await queryAssociatedModels(associatedModelSchema: modelSchema,
associatedField: associatedField,
ids: ids)
let associatedModelIds = queriedModels.map { $0.1.identifier(schema: modelSchema).stringValue }
associatedModels.append(contentsOf: queriedModels)
associatedModels.append(contentsOf: await recurseQueryAssociatedModels(modelSchema: associatedModelSchema, ids: associatedModelIds))
}
return associatedModels
}
func queryAssociatedModels(associatedModelSchema modelSchema: ModelSchema,
associatedField: ModelField,
ids: [String]) async -> [(ModelName, Model)] {
var queriedModels: [(ModelName, Model)] = []
let chunkedArrays = ids.chunked(into: SQLiteStorageEngineAdapter.maxNumberOfPredicates)
for chunkedArray in chunkedArrays {
// TODO: Add conveinence to queryPredicate where we have a list of items, to be all or'ed
var queryPredicates: [QueryPredicateOperation] = []
for id in chunkedArray {
queryPredicates.append(QueryPredicateOperation(field: associatedField.name, operator: .equals(id)))
}
let groupedQueryPredicates = QueryPredicateGroup(type: .or, predicates: queryPredicates)
do {
let models = try await withCheckedThrowingContinuation { continuation in
storageAdapter.query(modelSchema: modelSchema, predicate: groupedQueryPredicates) { result in
continuation.resume(with: result)
}
}
queriedModels.append(contentsOf: models.map { model in
(modelSchema.name, model)
})
} catch {
log.error("Failed to query \(modelSchema) on mutation event generation: \(error)")
}
}
return queriedModels
}
private func collapseResults<M: Model>(queryResult: DataStoreResult<[M]>?,
deleteResult: DataStoreResult<[M]>?,
associatedModels: [(ModelName, Model)]) -> DataStoreResult<QueryAndDeleteResult<M>> {
if let queryResult = queryResult {
switch queryResult {
case .success(let models):
if let deleteResult = deleteResult {
switch deleteResult {
case .success:
if modelName != "MutationEvent" {
self.log.debug("[CascadeDelete.3] [Delete from Local Store] Local cascade delete of \(modelName) successful!")
}
return .success(QueryAndDeleteResult(deletedModels: models,
associatedModels: associatedModels))
case .failure(let error):
return .failure(error)
}
} else {
return .failure(.unknown("deleteResult not set during transaction", "coding error", nil))
}
case .failure(let error):
return .failure(error)
}
} else {
return .failure(.unknown("queryResult not set during transaction", "coding error", nil))
}
}
func syncIfNeededAndFinish(_ transactionResult: DataStoreResult<QueryAndDeleteResult<M>>) {
switch transactionResult {
case .success(let queryAndDeleteResult):
switch deleteInput {
case .withIdentifier, .withIdentifierAndCondition:
guard queryAndDeleteResult.deletedModels.count <= 1 else {
completionForWithId?(.failure(.unknown("delete with id returned more than one result", "", nil)))
finish()
return
}
guard queryAndDeleteResult.deletedModels.first != nil else {
completionForWithId?(.success(nil))
finish()
return
}
case .withFilter:
guard !queryAndDeleteResult.deletedModels.isEmpty else {
completionForWithFilter?(.success(queryAndDeleteResult.deletedModels))
finish()
return
}
}
guard modelSchema.isSyncable, let syncEngine = self.syncEngine else {
if !modelSchema.isSystem {
log.error("Unable to sync model (\(modelSchema.name)) where isSyncable is false")
}
if self.syncEngine == nil {
log.error("Unable to sync because syncEngine is nil")
}
completionForWithId?(.success(queryAndDeleteResult.deletedModels.first))
completionForWithFilter?(.success(queryAndDeleteResult.deletedModels))
finish()
return
}
guard #available(iOS 13.0, *) else {
completionForWithId?(.success(queryAndDeleteResult.deletedModels.first))
completionForWithFilter?(.success(queryAndDeleteResult.deletedModels))
finish()
return
}
// TODO: This requires follow up.
// In the current code, when deleting a single model instance conditionally, the `condition` predicate is
// first applied locally to determine whether the item should be deleted or not. If met, the local item is
// deleted. When syncing this deleted model with the delete mutation event, the `condition` is not passed
// to the delete mutation. Should it be passed to the delete mutation as well?
//
// When deleting all models that match the `filter` predicate, the `filter` is passed to the
// delete mutation event. Since the item was originally retrieved using the filter as a way to narrow
// down which items should be deleted, then does it still need to be passed as the "condition" for the
// delete mutation if it will always be met? (Perhaps, this is needed as a way to guard against updates
// that move the model out of the filtered results). Should we stop passing the `filter` to the delete
// mutation?
switch deleteInput {
case .withIdentifier, .withIdentifierAndCondition:
let syncDeletionsCount = (queryAndDeleteResult.associatedModels.count + queryAndDeleteResult.deletedModels.count)
self.log.debug("[CascadeDelete.4] sending a total of \(syncDeletionsCount) delete mutations")
syncDeletions(withModels: queryAndDeleteResult.deletedModels,
associatedModels: queryAndDeleteResult.associatedModels,
syncEngine: syncEngine) {
switch $0 {
case .success:
self.completionForWithId?(.success(queryAndDeleteResult.deletedModels.first))
case .failure(let error):
self.completionForWithId?(.failure(error))
}
self.finish()
}
case .withFilter(let filter):
let syncDeletionsCount = (queryAndDeleteResult.associatedModels.count + queryAndDeleteResult.deletedModels.count)
self.log.debug("[CascadeDelete.4] sending a total of \(syncDeletionsCount) delete mutations")
syncDeletions(withModels: queryAndDeleteResult.deletedModels,
predicate: filter,
associatedModels: queryAndDeleteResult.associatedModels,
syncEngine: syncEngine) {
switch $0 {
case .success:
self.completionForWithFilter?(.success(queryAndDeleteResult.deletedModels))
case .failure(let error):
self.completionForWithFilter?(.failure(error))
}
self.finish()
}
}
case .failure(let error):
completionForWithId?(.failure(error))
completionForWithFilter?(.failure(error))
finish()
}
}
// `syncDeletions` will first sync all associated models in reversed order so the lowest level of children models
// are synced first, before its parent models. See `recurseQueryAssociatedModels()` for more details on the
// ordering of the results in `associatedModels`. Once all the associated models are synced, sync the `models`,
// finishing the sequence of deletions from children to parent.
//
// For example, A has-many B and C, B has-many D, D has-many E. The query will result in associatedModels with
// the order [B, D, E, C]. Sync deletions will be performed the back to the front from C, E, D, B, then finally the
// parent models A.
//
// `.reversed()` will not allocate new space for its elements (what we want) by wrapping the underlying
// collection and provide access in reverse order.
// For more details: https://developer.apple.com/documentation/swift/array/1690025-reversed
@available(iOS 13.0, *)
private func syncDeletions(withModels models: [M],
predicate: QueryPredicate? = nil,
associatedModels: [(ModelName, Model)],
syncEngine: RemoteSyncEngineBehavior,
completion: @escaping DataStoreCallback<Void>) {
self.log.debug("[CascadeDelete.4] Begin syncing \(models.count) \(modelName) model for deletion")
var savedDataStoreError: DataStoreError?
guard !associatedModels.isEmpty else {
syncDeletions(withModels: models,
predicate: predicate,
syncEngine: syncEngine,
dataStoreError: savedDataStoreError,
completion: completion)
return
}
self.log.debug("[CascadeDelete.4] Begin syncing \(associatedModels.count) associated models for deletion. ")
var mutationEventsSubmitCompleted = 0
for (modelName, associatedModel) in associatedModels.reversed() {
let mutationEvent: MutationEvent
do {
mutationEvent = try MutationEvent(untypedModel: associatedModel,
modelName: modelName,
mutationType: .delete)
} catch {
let dataStoreError = DataStoreError(error: error)
completion(.failure(dataStoreError))
return
}
let mutationEventCallback: DataStoreCallback<MutationEvent> = { result in
self.serialQueueSyncDeletions.async {
mutationEventsSubmitCompleted += 1
switch result {
case .failure(let dataStoreError):
self.log.error("\(#function) failed to submit to sync engine \(mutationEvent)")
if savedDataStoreError == nil {
savedDataStoreError = dataStoreError
}
case .success(let mutationEvent):
self.log.verbose("\(#function) successfully submitted \(mutationEvent.modelName) to sync engine \(mutationEvent)")
}
if mutationEventsSubmitCompleted == associatedModels.count {
self.syncDeletions(withModels: models,
predicate: predicate,
syncEngine: syncEngine,
dataStoreError: savedDataStoreError,
completion: completion)
}
}
}
submitToSyncEngine(mutationEvent: mutationEvent,
syncEngine: syncEngine,
completion: mutationEventCallback)
}
}
@available(iOS 13.0, *)
private func syncDeletions(withModels models: [M],
predicate: QueryPredicate? = nil,
syncEngine: RemoteSyncEngineBehavior,
dataStoreError: DataStoreError?,
completion: @escaping DataStoreCallback<Void>) {
var graphQLFilterJSON: String?
if let predicate = predicate {
do {
graphQLFilterJSON = try GraphQLFilterConverter.toJSON(predicate,
modelSchema: modelSchema)
} catch {
let dataStoreError = DataStoreError(error: error)
completion(.failure(dataStoreError))
return
}
}
var mutationEventsSubmitCompleted = 0
var savedDataStoreError = dataStoreError
for model in models {
let mutationEvent: MutationEvent
do {
mutationEvent = try MutationEvent(model: model,
modelSchema: modelSchema,
mutationType: .delete,
graphQLFilterJSON: graphQLFilterJSON)
} catch {
let dataStoreError = DataStoreError(error: error)
completion(.failure(dataStoreError))
return
}
let mutationEventCallback: DataStoreCallback<MutationEvent> = { result in
self.serialQueueSyncDeletions.async {
mutationEventsSubmitCompleted += 1
switch result {
case .failure(let dataStoreError):
self.log.error("\(#function) failed to submit to sync engine \(mutationEvent)")
if savedDataStoreError == nil {
savedDataStoreError = dataStoreError
}
case .success:
self.log.verbose("\(#function) successfully submitted to sync engine \(mutationEvent)")
}
if mutationEventsSubmitCompleted == models.count {
if let lastEmittedDataStoreError = savedDataStoreError {
completion(.failure(lastEmittedDataStoreError))
} else {
completion(.successfulVoid)
}
}
}
}
submitToSyncEngine(mutationEvent: mutationEvent,
syncEngine: syncEngine,
completion: mutationEventCallback)
}
}
private func submitToSyncEngine(mutationEvent: MutationEvent,
syncEngine: RemoteSyncEngineBehavior,
completion: @escaping DataStoreCallback<MutationEvent>) {
syncEngine.submit(mutationEvent, completion: completion)
}
}
// MARK: - CascadeDeleteOperation.DeleteInput
extension CascadeDeleteOperation {
enum DeleteInput {
case withIdentifier(id: ModelIdentifierProtocol)
case withIdentifierAndCondition(id: ModelIdentifierProtocol, condition: QueryPredicate)
case withFilter(_ filter: QueryPredicate)
/// Returns a computed predicate based on the type of delete scenario it is.
var predicate: QueryPredicate {
switch self {
case .withIdentifier(let identifier):
return identifier.predicate
case .withIdentifierAndCondition(let identifier, let predicate):
return QueryPredicateGroup(type: .and,
predicates: [identifier.predicate,
predicate])
case .withFilter(let predicate):
return predicate
}
}
}
}
extension CascadeDeleteOperation: DefaultLogger { }