Added checkpoints with the new code from ChainPad

This commit is contained in:
Caleb James DeLisle 2016-05-26 17:09:02 +02:00
parent 8885658431
commit da2bfe2de9
3 changed files with 264 additions and 164 deletions

View File

@ -87,7 +87,27 @@ dropUser = function (ctx, user) {
};
const getHistory = function (ctx, channelName, handler, cb) {
ctx.store.getMessages(channelName, function (msgStr) { handler(JSON.parse(msgStr)); }, cb);
var messageBuf = [];
ctx.store.getMessages(channelName, function (msgStr) {
messageBuf.push(JSON.parse(msgStr));
}, function () {
var startPoint;
var cpCount = 0;
var msgBuff2 = [];
for (startPoint = messageBuf.length - 1; startPoint >= 0; startPoint--) {
var msg = messageBuf[startPoint];
msgBuff2.push(msg);
if (msg[2] === 'MSG' && msg[4].indexOf('cp|') === 0) {
cpCount++;
if (cpCount >= 2) {
for (var x = msgBuff2.pop(); x; x = msgBuff2.pop()) { handler(x); }
break;
}
}
//console.log(messageBuf[startPoint]);
}
cb();
});
};
const randName = function () { return Crypto.randomBytes(16).toString('hex'); };

View File

@ -28,7 +28,8 @@ var create = Patch.create = function (parentHash) {
return {
type: 'Patch',
operations: [],
parentHash: parentHash
parentHash: parentHash,
isCheckpoint: false
};
};
@ -45,6 +46,13 @@ var check = Patch.check = function (patch, docLength_opt) {
docLength_opt += Operation.lengthChange(patch.operations[i]);
}
}
if (patch.isCheckpoint) {
Common.assert(patch.operations.length === 1);
Common.assert(patch.operations[0].offset === 0);
if (typeof(docLength_opt) === 'number') {
Common.assert(!docLength_opt || patch.operations[0].toRemove === docLength_opt);
}
}
};
var toObj = Patch.toObj = function (patch) {
@ -104,6 +112,20 @@ var addOperation = Patch.addOperation = function (patch, op) {
if (Common.PARANOIA) { check(patch); }
};
var createCheckpoint = Patch.createCheckpoint =
function (parentContent, checkpointContent, parentContentHash_opt)
{
var op = Operation.create(0, parentContent.length, checkpointContent);
if (Common.PARANOIA && parentContentHash_opt) {
Common.assert(parentContentHash_opt === hash(parentContent));
}
parentContentHash_opt = parentContentHash_opt || hash(parentContent);
var out = create(parentContentHash_opt);
addOperation(out, op);
out.isCheckpoint = true;
return out;
};
var clone = Patch.clone = function (patch) {
if (Common.PARANOIA) { check(patch); }
var out = create();
@ -380,7 +402,7 @@ var PARANOIA = module.exports.PARANOIA = true;
var VALIDATE_ENTIRE_CHAIN_EACH_MSG = module.exports.VALIDATE_ENTIRE_CHAIN_EACH_MSG = false;
/* throw errors over non-compliant messages which would otherwise be treated as invalid */
var TESTING = module.exports.TESTING = true;
var TESTING = module.exports.TESTING = false;
var assert = module.exports.assert = function (expr) {
if (!expr) { throw new Error("Failed assertion"); }
@ -435,10 +457,11 @@ var REGISTER = Message.REGISTER = 0;
var REGISTER_ACK = Message.REGISTER_ACK = 1;
var PATCH = Message.PATCH = 2;
var DISCONNECT = Message.DISCONNECT = 3;
var CHECKPOINT = Message.CHECKPOINT = 4;
var check = Message.check = function(msg) {
Common.assert(msg.type === 'Message');
if (msg.messageType === PATCH) {
if (msg.messageType === PATCH || msg.messageType === CHECKPOINT) {
Patch.check(msg.content);
Common.assert(typeof(msg.lastMsgHash) === 'string');
} else {
@ -459,9 +482,8 @@ var create = Message.create = function (type, content, lastMsgHash) {
var toString = Message.toString = function (msg) {
if (Common.PARANOIA) { check(msg); }
if (msg.messageType === PATCH) {
return JSON.stringify([PATCH, Patch.toObj(msg.content), msg.lastMsgHash]);
if (msg.messageType === PATCH || msg.messageType === CHECKPOINT) {
return JSON.stringify([msg.messageType, Patch.toObj(msg.content), msg.lastMsgHash]);
} else {
throw new Error();
}
@ -478,43 +500,11 @@ var discardBencode = function (msg, arr) {
};
var fromString = Message.fromString = function (str) {
var msg = str;
if (str.charAt(0) === '[') {
var m = JSON.parse(str);
return create(m[0], Patch.fromObj(m[1]), m[2]);
} else {
/* Just in case we receive messages in the old format,
we should try to parse them. We only need the content, though,
so just extract that and throw the rest away */
var last;
var parts = [];
// chop off all the bencoded components
while (msg) {
msg = discardBencode(msg, parts);
}
// grab the last component from the parts
// we don't need anything else
var contentStr = parts.slice(-1)[0];
var content = JSON.parse(contentStr);
var message;
if (content[0] === PATCH) {
message = create(userName, PATCH, Patch.fromObj(content[1]), content[2]);
} else if ([4,5].indexOf(content[0]) !== -1 /* === PING || content[0] === PONG*/) {
// it's a ping or pong, which we don't want to support anymore
message = create(userName, content[0], content[1]);
} else {
message = create(userName, content[0]);
}
// This check validates every operation in the patch.
check(message);
return message
}
var m = JSON.parse(str);
if (m[0] !== CHECKPOINT && m[0] !== PATCH) { throw new Error("invalid message type " + m[0]); }
var msg = create(m[0], Patch.fromObj(m[1]), m[2]);
if (m[0] === CHECKPOINT) { msg.content.isCheckpoint = true; }
return msg;
};
var hashOf = Message.hashOf = function (msg) {
@ -550,8 +540,16 @@ var Sha = module.exports.Sha = require('./SHA256');
var ChainPad = {};
// hex_sha256('')
var EMPTY_STR_HASH = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855';
var ZERO = '0000000000000000000000000000000000000000000000000000000000000000';
var EMPTY_STR_HASH = module.exports.EMPTY_STR_HASH =
'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855';
var ZERO = '0000000000000000000000000000000000000000000000000000000000000000';
// Default number of patches between checkpoints (patches older than this will be pruned)
// default for realtime.config.checkpointInterval
var DEFAULT_CHECKPOINT_INTERVAL = 200;
// Default number of milliseconds to wait before syncing to the server
var DEFAULT_AVERAGE_SYNC_MILLISECONDS = 300;
var enterChainPad = function (realtime, func) {
return function () {
@ -567,8 +565,9 @@ var debug = function (realtime, msg) {
};
var schedule = function (realtime, func, timeout) {
if (realtime.aborted) { return; }
if (!timeout) {
timeout = Math.floor(Math.random() * 2 * realtime.avgSyncTime);
timeout = Math.floor(Math.random() * 2 * realtime.config.avgSyncMilliseconds);
}
var to = setTimeout(enterChainPad(realtime, function () {
realtime.schedules.splice(realtime.schedules.indexOf(to), 1);
@ -598,12 +597,52 @@ var onMessage = function (realtime, message, callback) {
}
};
var sendMessage = function (realtime, msg, callback) {
var strMsg = Message.toString(msg);
onMessage(realtime, strMsg, function (err) {
if (err) {
debug(realtime, "Posting to server failed [" + err + "]");
realtime.pending = null;
} else {
var pending = realtime.pending;
realtime.pending = null;
Common.assert(pending.hash === msg.hashOf);
handleMessage(realtime, strMsg, true);
pending.callback();
}
});
msg.hashOf = msg.hashOf || Message.hashOf(msg);
var timeout = schedule(realtime, function () {
debug(realtime, "Failed to send message [" + msg.hashOf + "] to server");
sync(realtime);
}, 10000 + (Math.random() * 5000));
if (realtime.pending) { throw new Error("there is already a pending message"); }
realtime.pending = {
hash: msg.hashOf,
callback: function () {
if (realtime.initialMessage && realtime.initialMessage.hashOf === msg.hashOf) {
debug(realtime, "initial Ack received [" + msg.hashOf + "]");
realtime.initialMessage = null;
}
unschedule(realtime, timeout);
realtime.syncSchedule = schedule(realtime, function () { sync(realtime); }, 0);
callback();
}
};
if (Common.PARANOIA) { check(realtime); }
};
var sync = function (realtime) {
if (Common.PARANOIA) { check(realtime); }
if (realtime.syncSchedule) {
if (realtime.syncSchedule && !realtime.pending) {
unschedule(realtime, realtime.syncSchedule);
realtime.syncSchedule = null;
} else {
//debug(realtime, "already syncing...");
// we're currently waiting on something from the server.
return;
}
@ -617,6 +656,19 @@ var sync = function (realtime) {
return;
}
if (((parentCount(realtime, realtime.best) + 1) % realtime.config.checkpointInterval) === 0) {
var best = realtime.best;
debug(realtime, "Sending checkpoint");
var cpp = Patch.createCheckpoint(realtime.authDoc,
realtime.authDoc,
realtime.best.content.inverseOf.parentHash);
var cp = Message.create(Message.CHECKPOINT, cpp, realtime.best.hashOf);
sendMessage(realtime, cp, function () {
debug(realtime, "Checkpoint sent and accepted");
});
return;
}
var msg;
if (realtime.best === realtime.initialMessage) {
msg = realtime.initialMessage;
@ -624,39 +676,16 @@ var sync = function (realtime) {
msg = Message.create(Message.PATCH, realtime.uncommitted, realtime.best.hashOf);
}
var strMsg = Message.toString(msg);
onMessage(realtime, strMsg, function (err) {
if (err) {
debug(realtime, "Posting to server failed [" + err + "]");
} else {
handleMessage(realtime, strMsg, true);
}
sendMessage(realtime, msg, function () {
//debug(realtime, "patch sent");
});
var hash = Message.hashOf(msg);
var timeout = schedule(realtime, function () {
debug(realtime, "Failed to send message ["+hash+"] to server");
sync(realtime);
}, 10000 + (Math.random() * 5000));
realtime.pending = {
hash: hash,
callback: function () {
if (realtime.initialMessage && realtime.initialMessage.hashOf === hash) {
debug(realtime, "initial Ack received ["+hash+"]");
realtime.initialMessage = null;
}
unschedule(realtime, timeout);
realtime.syncSchedule = schedule(realtime, function () { sync(realtime); }, 0);
}
};
if (Common.PARANOIA) { check(realtime); }
};
var create = ChainPad.create = function (config) {
config = config || {};
var initialState = config.initialState || '';
config.checkpointInterval = config.checkpointInterval || DEFAULT_CHECKPOINT_INTERVAL;
config.avgSyncMilliseconds = config.avgSyncMilliseconds || DEFAULT_AVERAGE_SYNC_MILLISECONDS;
var realtime = {
type: 'ChainPad',
@ -665,7 +694,7 @@ var create = ChainPad.create = function (config) {
config: config,
logLevel: typeof(config.logLevel) !== 'undefined'? config.logLevel: 1,
logLevel: (typeof(config.logLevel) === 'number') ? config.logLevel : 1,
/** A patch representing all uncommitted work. */
uncommitted: null,
@ -673,18 +702,17 @@ var create = ChainPad.create = function (config) {
uncommittedDocLength: initialState.length,
patchHandlers: [],
opHandlers: [],
changeHandlers: [],
messageHandlers: [],
schedules: [],
aborted: false,
syncSchedule: null,
registered: false,
avgSyncTime: 100,
// this is only used if PARANOIA is enabled.
userInterfaceContent: undefined,
@ -699,12 +727,6 @@ var create = ChainPad.create = function (config) {
rootMessage: null,
userName: config.userName || 'anonymous',
/**
* Set to the message which sets the initialState if applicable.
* Reset to null after the initial message has been successfully broadcasted.
*/
initialMessage: null,
};
if (Common.PARANOIA) {
@ -712,6 +734,10 @@ var create = ChainPad.create = function (config) {
}
var zeroPatch = Patch.create(EMPTY_STR_HASH);
if (initialState !== '') {
var initialOp = Operation.create(0, 0, initialState);
Patch.addOperation(zeroPatch, initialOp);
}
zeroPatch.inverseOf = Patch.invert(zeroPatch, '');
zeroPatch.inverseOf.inverseOf = zeroPatch;
var zeroMsg = Message.create(Message.PATCH, zeroPatch, ZERO);
@ -721,40 +747,12 @@ var create = ChainPad.create = function (config) {
(realtime.messagesByParent[zeroMsg.lastMessageHash] || []).push(zeroMsg);
realtime.rootMessage = zeroMsg;
realtime.best = zeroMsg;
if (initialState === '') {
realtime.uncommitted = Patch.create(zeroPatch.inverseOf.parentHash);
return realtime;
}
var initialOp = Operation.create(0, 0, initialState);
var initialStatePatch = Patch.create(zeroPatch.inverseOf.parentHash);
Patch.addOperation(initialStatePatch, initialOp);
initialStatePatch.inverseOf = Patch.invert(initialStatePatch, '');
initialStatePatch.inverseOf.inverseOf = initialStatePatch;
// flag this patch so it can be handled specially.
// Specifically, we never treat an initialStatePatch as our own,
// we let it be reverted to prevent duplication of data.
initialStatePatch.isInitialStatePatch = true;
initialStatePatch.inverseOf.isInitialStatePatch = true;
realtime.authDoc = initialState;
realtime.uncommitted = Patch.create(zeroPatch.inverseOf.parentHash);
if (Common.PARANOIA) {
realtime.userInterfaceContent = initialState;
}
initialMessage = Message.create(Message.PATCH, initialStatePatch, zeroMsg.hashOf);
initialMessage.hashOf = Message.hashOf(initialMessage);
initialMessage.parentCount = 1;
initialMessage.isFromMe = true;
realtime.messages[initialMessage.hashOf] = initialMessage;
(realtime.messagesByParent[initialMessage.lastMessageHash] || []).push(initialMessage);
realtime.best = initialMessage;
realtime.uncommitted = Patch.create(initialStatePatch.inverseOf.parentHash);
realtime.initialMessage = initialMessage;
return realtime;
};
@ -803,6 +801,17 @@ var doOperation = ChainPad.doOperation = function (realtime, op) {
realtime.uncommittedDocLength += Operation.lengthChange(op);
};
var doPatch = ChainPad.doPatch = function (realtime, patch) {
if (Common.PARANOIA) {
check(realtime);
Common.assert(Patch.invert(realtime.uncommitted).parentHash === patch.parentHash);
realtime.userInterfaceContent = Patch.apply(patch, realtime.userInterfaceContent);
}
Patch.check(patch, realtime.uncommittedDocLength);
realtime.uncommitted = Patch.merge(realtime.uncommitted, patch);
realtime.uncommittedDocLength += Patch.lengthChange(patch);
};
var isAncestorOf = function (realtime, ancestor, decendent) {
if (!decendent || !ancestor) { return false; }
if (ancestor === decendent) { return true; }
@ -858,31 +867,34 @@ var getBestChild = function (realtime, msg) {
return best;
};
var pushUIPatch = function (realtime, patch) {
if (patch.operations.length) {
// push the uncommittedPatch out to the user interface.
for (var i = 0; i < realtime.patchHandlers.length; i++) {
realtime.patchHandlers[i](patch);
}
for (var i = 0; i < realtime.changeHandlers.length; i++) {
for (var j = patch.operations.length; j >= 0; j--) {
var op = patch.operations[j];
realtime.changeHandlers[i](op.offset, op.toRemove, op.toInsert);
}
}
}
};
var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromMe) {
if (Common.PARANOIA) { check(realtime); }
var msg = Message.fromString(msgStr);
// These are all deprecated message types
if (['REGISTER', 'PONG', 'DISCONNECT'].map(function (x) {
return Message[x];
}).indexOf(msg.messageType) !== -1) {
console.log("Deprecated message type: [%s]", msg.messageType);
// otherwise it's a disconnect.
if (msg.messageType !== Message.PATCH && msg.messageType !== Message.CHECKPOINT) {
debug(realtime, "unrecognized message type " + msg.messageType);
return;
}
// otherwise it's a disconnect.
if (msg.messageType !== Message.PATCH) {
console.error("disconnect");
return; }
msg.hashOf = Message.hashOf(msg);
if (realtime.pending && realtime.pending.hash === msg.hashOf) {
realtime.pending.callback();
realtime.pending = null;
}
if (realtime.messages[msg.hashOf]) {
debug(realtime, "Patch [" + msg.hashOf + "] is already known");
if (Common.PARANOIA) { check(realtime); }
@ -894,10 +906,33 @@ var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromM
realtime.messagesByParent[msg.lastMsgHash] || []).push(msg);
if (!isAncestorOf(realtime, realtime.rootMessage, msg)) {
// we'll probably find the missing parent later.
debug(realtime, "Patch [" + msg.hashOf + "] not connected to root");
if (Common.PARANOIA) { check(realtime); }
return;
if (realtime.rootMessage === realtime.best && msg.content.isCheckpoint) {
// We're starting with a trucated chain from a checkpoint, we will adopt this
// as the root message and go with it...
var userDoc = Patch.apply(realtime.uncommitted, realtime.authDoc);
Common.assert(!Common.PARANOIA || realtime.userInterfaceContent === userDoc);
var fixUserDocPatch = Patch.invert(realtime.uncommitted, realtime.authDoc);
Patch.addOperation(fixUserDocPatch,
Operation.create(0, realtime.authDoc.length, msg.content.operations[0].toInsert));
fixUserDocPatch =
Patch.simplify(fixUserDocPatch, userDoc, realtime.config.operationSimplify);
msg.parentCount = 0;
realtime.rootMessage = realtime.best = msg;
realtime.authDoc = msg.content.operations[0].toInsert;
realtime.uncommitted = Patch.create(Sha.hex_sha256(realtime.authDoc));
realtime.uncommittedDocLength = realtime.authDoc.length;
pushUIPatch(realtime, fixUserDocPatch);
if (Common.PARANOIA) { realtime.userInterfaceContent = realtime.authDoc; }
return;
} else {
// we'll probably find the missing parent later.
debug(realtime, "Patch [" + msg.hashOf + "] not connected to root");
if (Common.PARANOIA) { check(realtime); }
return;
}
}
// of this message fills in a hole in the chain which makes another patch better, swap to the
@ -963,14 +998,49 @@ var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromM
return;
}
var simplePatch =
Patch.simplify(patch, authDocAtTimeOfPatch, realtime.config.operationSimplify);
if (!Patch.equals(simplePatch, patch)) {
debug(realtime, "patch [" + msg.hashOf + "] can be simplified");
if (Common.PARANOIA) { check(realtime); }
if (Common.TESTING) { throw new Error(); }
delete realtime.messages[msg.hashOf];
return;
if (patch.isCheckpoint) {
// Ok, we have a checkpoint patch.
// If the chain length is not equal to checkpointInterval then this patch is invalid.
var i = 0;
var checkpointP;
for (var m = getParent(realtime, msg); m; m = getParent(realtime, m)) {
if (m.content.isCheckpoint) {
if (checkpointP) {
checkpointP = m;
break;
}
checkpointP = m;
}
}
if (checkpointP && checkpointP !== realtime.rootMessage) {
var point = parentCount(realtime, checkpointP);
if ((point % realtime.config.checkpointInterval) !== 0) {
debug(realtime, "checkpoint [" + msg.hashOf + "] at invalid point [" + point + "]");
if (Common.PARANOIA) { check(realtime); }
if (Common.TESTING) { throw new Error(); }
delete realtime.messages[msg.hashOf];
return;
}
// Time to prune some old messages from the chain
debug(realtime, "checkpoint [" + msg.hashOf + "]");
for (var m = getParent(realtime, checkpointP); m; m = getParent(realtime, m)) {
debug(realtime, "pruning [" + m.hashOf + "]");
delete realtime.messages[m.hashOf];
delete realtime.messagesByParent[m.hashOf];
}
realtime.rootMessage = checkpointP;
}
} else {
var simplePatch =
Patch.simplify(patch, authDocAtTimeOfPatch, realtime.config.operationSimplify);
if (!Patch.equals(simplePatch, patch)) {
debug(realtime, "patch [" + msg.hashOf + "] can be simplified");
if (Common.PARANOIA) { check(realtime); }
if (Common.TESTING) { throw new Error(); }
delete realtime.messages[msg.hashOf];
return;
}
}
patch.inverseOf = Patch.invert(patch, authDocAtTimeOfPatch);
@ -1012,19 +1082,8 @@ var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromM
Common.assert(newUserInterfaceContent === realtime.userInterfaceContent);
}
if (uncommittedPatch.operations.length) {
// push the uncommittedPatch out to the user interface.
for (var i = 0; i < realtime.patchHandlers.length; i++) {
realtime.patchHandlers[i](uncommittedPatch);
}
if (realtime.opHandlers.length) {
for (var i = uncommittedPatch.operations.length-1; i >= 0; i--) {
for (var j = 0; j < realtime.opHandlers.length; j++) {
realtime.opHandlers[j](uncommittedPatch.operations[i]);
}
}
}
}
pushUIPatch(realtime, uncommittedPatch);
if (Common.PARANOIA) { check(realtime); }
};
@ -1061,13 +1120,26 @@ var getDepthOfState = function (content, minDepth, realtime) {
module.exports.create = function (conf) {
var realtime = ChainPad.create(conf);
return {
var out = {
onPatch: enterChainPad(realtime, function (handler) {
Common.assert(typeof(handler) === 'function');
realtime.patchHandlers.push(handler);
}),
patch: enterChainPad(realtime, function (patch, x, y) {
if (typeof(patch) === 'number') {
// Actually they meant to call realtime.change()
out.change(patch, x, y);
return;
}
doPatch(realtime, patch);
}),
patch: enterChainPad(realtime, function (offset, count, chars) {
onChange: enterChainPad(realtime, function (handler) {
Common.assert(typeof(handler) === 'function');
realtime.changeHandlers.push(handler);
}),
change: enterChainPad(realtime, function (offset, count, chars) {
if (count === 0 && chars === '') { return; }
doOperation(realtime, Operation.create(offset, count, chars));
}),
@ -1075,26 +1147,32 @@ module.exports.create = function (conf) {
Common.assert(typeof(handler) === 'function');
realtime.messageHandlers.push(handler);
}),
message: enterChainPad(realtime, function (message) {
handleMessage(realtime, message, false);
}),
start: enterChainPad(realtime, function () {
if (realtime.syncSchedule) { unschedule(realtime, realtime.syncSchedule); }
realtime.syncSchedule = schedule(realtime, function () { sync(realtime); });
}),
abort: enterChainPad(realtime, function () {
realtime.aborted = true;
realtime.schedules.forEach(function (s) { clearTimeout(s) });
}),
sync: enterChainPad(realtime, function () {
sync(realtime);
}),
sync: enterChainPad(realtime, function () { sync(realtime); }),
getAuthDoc: function () { return realtime.authDoc; },
getUserDoc: function () { return Patch.apply(realtime.uncommitted, realtime.authDoc); },
getDepthOfState: function (content, minDepth) {
return getDepthOfState(content, minDepth, realtime);
}
};
return out;
};
},

View File

@ -142,19 +142,21 @@ define([
// shim between chainpad and netflux
chainpadAdapter = {
msgIn : function(peerId, msg) {
var message = parseMessage(msg);
msg = msg.replace(/^cp\|/, '');
try {
var decryptedMsg = Crypto.decrypt(message, cryptKey);
var decryptedMsg = Crypto.decrypt(msg, cryptKey);
messagesHistory.push(decryptedMsg);
return decryptedMsg;
} catch (err) {
console.error(err);
return message;
return msg;
}
},
msgOut : function(msg, wc) {
try {
return Crypto.encrypt(msg, cryptKey);
var cmsg = Crypto.encrypt(msg, cryptKey);
if (msg.indexOf('[4') === 0) { cmsg = 'cp|' + cmsg; }
return cmsg;
} catch (err) {
console.log(msg);
throw err;