381 lines
12 KiB
Swift
381 lines
12 KiB
Swift
/*
|
|
* Copyright 2022 The Yorkie Authors. All rights reserved.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License")
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
import Combine
|
|
import Foundation
|
|
|
|
/**
|
|
* `DocumentStatus` represents the status of the document.
|
|
*/
|
|
public enum DocumentStatus: String {
|
|
/**
|
|
* Detached means that the document is not attached to the client.
|
|
* The actor of the ticket is created without being assigned.
|
|
*/
|
|
case detached
|
|
|
|
/**
|
|
* Attached means that this document is attached to the client.
|
|
* The actor of the ticket is created with being assigned by the client.
|
|
*/
|
|
case attached
|
|
|
|
/**
|
|
* Removed means that this document is removed. If the document is removed,
|
|
* it cannot be edited.
|
|
*/
|
|
case removed
|
|
}
|
|
|
|
/**
|
|
* Presence key, value dictionary
|
|
* Similar to an Indexable in JS SDK
|
|
*/
|
|
public typealias Presence = [String: Any]
|
|
|
|
public typealias DocumentKey = String
|
|
public typealias DocumentID = String
|
|
|
|
/**
|
|
* A CRDT-based data type. We can representing the model
|
|
* of the application. And we can edit it even while offline.
|
|
*
|
|
*/
|
|
public actor Document {
|
|
private let key: DocumentKey
|
|
private(set) var status: DocumentStatus
|
|
private var root: CRDTRoot
|
|
private var clone: CRDTRoot?
|
|
private var changeID: ChangeID
|
|
private var checkpoint: Checkpoint
|
|
private var localChanges: [Change]
|
|
|
|
public let eventStream: PassthroughSubject<DocEvent, Never>
|
|
|
|
public init(key: String) {
|
|
self.key = key
|
|
self.status = .detached
|
|
self.root = CRDTRoot()
|
|
self.changeID = ChangeID.initial
|
|
self.checkpoint = Checkpoint.initial
|
|
self.localChanges = []
|
|
self.eventStream = PassthroughSubject()
|
|
}
|
|
|
|
/**
|
|
* `update` executes the given updater to update this document.
|
|
*/
|
|
public func update(_ updater: (_ root: JSONObject) -> Void, message: String? = nil) throws {
|
|
guard self.status != .removed else {
|
|
throw YorkieError.documentRemoved(message: "\(self) is removed.")
|
|
}
|
|
|
|
let clone = self.cloned
|
|
let context = ChangeContext(id: self.changeID.next(), root: clone, message: message)
|
|
|
|
let proxy = JSONObject(target: clone.object, context: context)
|
|
updater(proxy)
|
|
|
|
if context.hasOperations() {
|
|
Logger.trace("trying to update a local change: \(self.toJSON())")
|
|
|
|
let change = context.getChange()
|
|
try? change.execute(root: self.root)
|
|
self.localChanges.append(change)
|
|
self.changeID = change.id
|
|
|
|
let changeInfo = ChangeInfo(change: change, paths: self.createPaths(change: change))
|
|
let changeEvent = LocalChangeEvent(value: [changeInfo])
|
|
self.eventStream.send(changeEvent)
|
|
|
|
Logger.trace("after update a local change: \(self.toJSON())")
|
|
}
|
|
}
|
|
|
|
/**
|
|
* `applyChangePack` applies the given change pack into this document.
|
|
* 1. Remove local changes applied to server.
|
|
* 2. Update the checkpoint.
|
|
* 3. Do Garbage collection.
|
|
*
|
|
* - Parameter pack: change pack
|
|
*/
|
|
func applyChangePack(pack: ChangePack, clientID: ActorID) throws {
|
|
// 0. Check Sequences.
|
|
|
|
let minClientSequence = self.localChanges.first?.id.getClientSeq() ?? self.checkpoint.getClientSeq()
|
|
let maxClientSequence = self.localChanges.last?.id.getClientSeq() ?? self.checkpoint.getClientSeq()
|
|
|
|
guard (minClientSequence ... maxClientSequence).contains(pack.getCheckpoint().getClientSeq()) else {
|
|
let event = CorruptedEvent()
|
|
self.eventStream.send(event)
|
|
|
|
throw YorkieError.sequenceCorrupted(message: "Local changes are missing. Can't recover Client sequence!")
|
|
}
|
|
|
|
// Increent server sequence by actual data from the server.
|
|
var serverSeq = self.checkpoint.getServerSeq()
|
|
|
|
if let snapshot = pack.getSnapshot() {
|
|
try self.applySnapshot(serverSeq: pack.getCheckpoint().getServerSeq(), snapshot: snapshot)
|
|
serverSeq = pack.getCheckpoint().getServerSeq()
|
|
} else if pack.hasChanges() {
|
|
serverSeq = try self.applyChanges(changes: pack.getChanges(), clientID: clientID)
|
|
|
|
// The server does not apply changes sent by pushPull request to changes of pushPull response.
|
|
// So add size of local changes.
|
|
if serverSeq < pack.getCheckpoint().getClientSeq(), pack.getCheckpoint().getClientSeq() > self.checkpoint.getClientSeq() {
|
|
serverSeq += Int64(pack.getCheckpoint().getClientSeq() - self.checkpoint.getClientSeq())
|
|
}
|
|
}
|
|
|
|
// 01. Remove local changes applied to server.
|
|
while let change = self.localChanges.first, change.id.getClientSeq() <= pack.getCheckpoint().getClientSeq() {
|
|
self.localChanges.removeFirst()
|
|
}
|
|
|
|
// 02. Update the checkpoint.
|
|
self.checkpoint.forward(other: Checkpoint(serverSeq: serverSeq, clientSeq: pack.getCheckpoint().getClientSeq()))
|
|
|
|
// 03. Do Garbage collection.
|
|
if let ticket = pack.getMinSyncedTicket() {
|
|
self.garbageCollect(lessThanOrEqualTo: ticket)
|
|
}
|
|
|
|
// 04. Update the status.
|
|
if pack.isRemoved {
|
|
self.setStatus(.removed)
|
|
}
|
|
|
|
Logger.trace("\(self.root.toJSON())")
|
|
}
|
|
|
|
/**
|
|
* `hasLocalChanges` returns whether this document has local changes or not.
|
|
*
|
|
*/
|
|
func hasLocalChanges() -> Bool {
|
|
return self.localChanges.isEmpty == false
|
|
}
|
|
|
|
/**
|
|
* `ensureClone` make a clone of root.
|
|
*/
|
|
var cloned: CRDTRoot {
|
|
if let clone = self.clone {
|
|
return clone
|
|
}
|
|
|
|
let clone = self.root.deepcopy()
|
|
self.clone = clone
|
|
return clone
|
|
}
|
|
|
|
/**
|
|
* `createChangePack` create change pack of the local changes to send to the
|
|
* remote server.
|
|
*
|
|
*/
|
|
func createChangePack(_ forceToRemoved: Bool = false) -> ChangePack {
|
|
let changes = self.localChanges
|
|
let checkpoint = self.checkpoint.increasedClientSeq(by: UInt32(changes.count))
|
|
return ChangePack(key: self.key, checkpoint: checkpoint, changes: changes, isRemoved: forceToRemoved ? true : self.status == .removed)
|
|
}
|
|
|
|
/**
|
|
* `setActor` sets actor into this document. This is also applied in the local
|
|
* changes the document has.
|
|
*
|
|
*/
|
|
func setActor(_ actorID: ActorID) {
|
|
let changes = self.localChanges.map {
|
|
var new = $0
|
|
new.setActor(actorID)
|
|
return new
|
|
}
|
|
|
|
self.localChanges = changes
|
|
|
|
self.changeID.setActor(actorID)
|
|
|
|
// TODOs also apply into root.
|
|
}
|
|
|
|
/**
|
|
* `getKey` returns the key of this document.
|
|
*
|
|
*/
|
|
nonisolated func getKey() -> String {
|
|
return self.key
|
|
}
|
|
|
|
/**
|
|
* `getClone` return clone object.
|
|
*
|
|
*/
|
|
func getClone() -> CRDTObject? {
|
|
return self.clone?.object
|
|
}
|
|
|
|
/**
|
|
* `getRoot` returns a new proxy of cloned root.
|
|
*/
|
|
public func getRoot() -> JSONObject {
|
|
let clone = self.cloned
|
|
let context = ChangeContext(id: self.changeID.next(), root: clone)
|
|
|
|
return JSONObject(target: clone.object, context: context)
|
|
}
|
|
|
|
/**
|
|
* `garbageCollect` purges elements that were removed before the given time.
|
|
*
|
|
*/
|
|
@discardableResult
|
|
func garbageCollect(lessThanOrEqualTo ticket: TimeTicket) -> Int {
|
|
if let clone = self.clone {
|
|
clone.garbageCollect(lessThanOrEqualTo: ticket)
|
|
}
|
|
return self.root.garbageCollect(lessThanOrEqualTo: ticket)
|
|
}
|
|
|
|
/**
|
|
* `getRootObject` returns root object.
|
|
*
|
|
*/
|
|
func getRootObject() -> CRDTObject {
|
|
return self.root.object
|
|
}
|
|
|
|
/**
|
|
* `getGarbageLength` returns the length of elements should be purged.
|
|
*
|
|
*/
|
|
func getGarbageLength() -> Int {
|
|
return self.root.garbageLength
|
|
}
|
|
|
|
/**
|
|
* `toJSON` returns the JSON encoding of this array.
|
|
*/
|
|
func toJSON() -> String {
|
|
return self.root.toJSON()
|
|
}
|
|
|
|
/**
|
|
* `toSortedJSON` returns the sorted JSON encoding of this array.
|
|
*/
|
|
func toSortedJSON() -> String {
|
|
return self.root.debugDescription
|
|
}
|
|
|
|
/**
|
|
* `applySnapshot` applies the given snapshot into this document.
|
|
*/
|
|
func applySnapshot(serverSeq: Int64, snapshot: Data) throws {
|
|
let obj = try Converter.bytesToObject(bytes: snapshot)
|
|
self.root = CRDTRoot(rootObject: obj)
|
|
self.changeID.syncLamport(with: serverSeq)
|
|
|
|
// drop clone because it is contaminated.
|
|
self.clone = nil
|
|
|
|
let snapshotEvent = SnapshotEvent(value: snapshot)
|
|
self.eventStream.send(snapshotEvent)
|
|
}
|
|
|
|
/**
|
|
* `applyChanges` applies the given changes into this document.
|
|
*/
|
|
func applyChanges(changes: [Change], clientID: ActorID) throws -> Int64 {
|
|
Logger.debug(
|
|
"""
|
|
trying to apply \(changes.count) remote changes.
|
|
elements:\(self.root.elementMapSize),
|
|
removeds:\(self.root.removedElementSetSize)
|
|
""")
|
|
|
|
Logger.trace(changes.map { "\($0.id.structureAsString)\t\($0.structureAsString)" }.joined(separator: "\n"))
|
|
|
|
let clone = self.cloned
|
|
|
|
var newServerSeq = self.checkpoint.getServerSeq()
|
|
|
|
for change in changes {
|
|
// Validate server sequence number.
|
|
guard let serverSeq = change.id.getServerSeq() else {
|
|
throw YorkieError.unexpected(message: "No server seq in the change!!!")
|
|
}
|
|
|
|
guard newServerSeq < serverSeq else {
|
|
// Skip already processed change.
|
|
continue
|
|
}
|
|
|
|
guard newServerSeq + 1 == serverSeq else {
|
|
throw YorkieError.sequenceCorrupted(message: "Bad Server Sequence! expected seq: \(newServerSeq + 1), actual seq : \(String(describing: change.id.getServerSeq()))")
|
|
}
|
|
|
|
newServerSeq = serverSeq
|
|
|
|
// Skip already processed opertaions by local changes.
|
|
if change.id.getActorID() != clientID || change.id.getClientSeq() > self.checkpoint.getClientSeq() {
|
|
try change.execute(root: clone)
|
|
try change.execute(root: self.root)
|
|
}
|
|
|
|
self.changeID.syncLamport(with: change.id.getLamport())
|
|
}
|
|
|
|
let changeInfos = changes.map {
|
|
ChangeInfo(change: $0, paths: self.createPaths(change: $0))
|
|
}
|
|
|
|
let changeEvent = RemoteChangeEvent(value: changeInfos)
|
|
self.eventStream.send(changeEvent)
|
|
|
|
Logger.debug(
|
|
"""
|
|
after appling \(changes.count) remote changes.
|
|
elements:\(self.root.elementMapSize),
|
|
removeds:\(self.root.removedElementSetSize)
|
|
"""
|
|
)
|
|
|
|
return newServerSeq
|
|
}
|
|
|
|
private func createPaths(change: Change) -> [String] {
|
|
let pathTrie = Trie<String>(value: "$")
|
|
for op in change.operations {
|
|
let createdAt = op.effectedCreatedAt
|
|
if var subPaths = try? self.root.createSubPaths(createdAt: createdAt), subPaths.isEmpty == false {
|
|
subPaths.removeFirst()
|
|
pathTrie.insert(values: subPaths)
|
|
}
|
|
}
|
|
return pathTrie.findPrefixes().map { $0.joined(separator: ".") }
|
|
}
|
|
|
|
public func setStatus(_ status: DocumentStatus) {
|
|
self.status = status
|
|
}
|
|
|
|
public nonisolated var debugDescription: String {
|
|
"[\(self.key)]"
|
|
}
|
|
}
|