diff --git a/package.json b/package.json index e8339d950..dd03426ce 100644 --- a/package.json +++ b/package.json @@ -3,10 +3,12 @@ "description": "realtime collaborative visual editor with zero knowlege server", "version": "1.25.0", "dependencies": { - "chainpad-server": "^1.0.1", + "chainpad-server": "^2.0.0", "express": "~4.10.1", "nthen": "~0.1.0", + "pull-stream": "^3.6.1", "saferphore": "0.0.1", + "stream-to-pull-stream": "^1.7.2", "tweetnacl": "~0.12.2", "ws": "^1.0.1" }, diff --git a/storage/file.js b/storage/file.js index 5d5d8c3db..55027d214 100644 --- a/storage/file.js +++ b/storage/file.js @@ -1,6 +1,11 @@ +/*@flow*/ +/* jshint esversion: 6 */ +/* global Buffer */ var Fs = require("fs"); var Path = require("path"); var nThen = require("nthen"); +const ToPull = require('stream-to-pull-stream'); +const Pull = require('pull-stream'); var mkPath = function (env, channelId) { return Path.join(env.root, channelId.slice(0, 2), channelId) + '.ndjson'; @@ -8,7 +13,7 @@ var mkPath = function (env, channelId) { var getMetadataAtPath = function (Env, path, cb) { var remainder = ''; - var stream = Fs.createReadStream(path, 'utf8'); + var stream = Fs.createReadStream(path, { encoding: 'utf8' }); var complete = function (err, data) { var _cb = cb; cb = undefined; @@ -25,16 +30,16 @@ var getMetadataAtPath = function (Env, path, cb) { var parsed = null; try { parsed = JSON.parse(metadata); - complete(void 0, parsed); + complete(undefined, parsed); } catch (e) { - console.log(); + console.log("getMetadataAtPath"); console.error(e); complete('INVALID_METADATA'); } }); stream.on('end', function () { - complete(null); + complete(); }); stream.on('error', function (e) { complete(e); }); }; @@ -59,7 +64,7 @@ var closeChannel = function (env, channelName, cb) { var clearChannel = function (env, channelId, cb) { var path = mkPath(env, channelId); getMetadataAtPath(env, path, function (e, metadata) { - if (e) { return cb(e); } + if (e) { return cb(new Error(e)); } if (!metadata) { return void Fs.truncate(path, 0, function (err) { if (err) { @@ -87,7 +92,7 @@ var clearChannel = function (env, channelId, cb) { var readMessages = function (path, msgHandler, cb) { var remainder = ''; - var stream = Fs.createReadStream(path, 'utf8'); + var stream = Fs.createReadStream(path, { encoding: 'utf8' }); var complete = function (err) { var _cb = cb; cb = undefined; @@ -106,6 +111,60 @@ var readMessages = function (path, msgHandler, cb) { stream.on('error', function (e) { complete(e); }); }; +const NEWLINE_CHR = ('\n').charCodeAt(0); +const mkBufferSplit = () => { + let remainder = null; + return Pull((read) => { + return (abort, cb) => { + read(abort, function (end, data) { + if (end) { + cb(end, remainder ? [remainder, data] : [data]); + remainder = null; + return; + } + const queue = []; + for (;;) { + const offset = data.indexOf(NEWLINE_CHR); + if (offset < 0) { + remainder = remainder ? Buffer.concat([remainder, data]) : data; + break; + } + let subArray = data.slice(0, offset); + if (remainder) { + subArray = Buffer.concat([remainder, subArray]); + remainder = null; + } + queue.push(subArray); + data = data.slice(offset + 1); + } + cb(end, queue); + }); + }; + }, Pull.flatten()); +}; + +const mkOffsetCounter = () => { + let offset = 0; + return Pull.map((buff) => { + const out = { offset: offset, buff: buff }; + // +1 for the eaten newline + offset += buff.length + 1; + return out; + }); +}; + +const readMessagesBin = (env, id, start, msgHandler, cb) => { + const stream = Fs.createReadStream(mkPath(env, id), { start: start }); + let keepReading = true; + Pull( + ToPull.read(stream), + mkBufferSplit(), + mkOffsetCounter(), + Pull.asyncMap((data, moreCb) => { msgHandler(data, moreCb, ()=>{ keepReading = false; moreCb(); }); }), + Pull.drain(()=>(keepReading), cb) + ); +}; + var checkPath = function (path, callback) { // TODO check if we actually need to use stat at all Fs.stat(path, function (err) { @@ -117,7 +176,8 @@ var checkPath = function (path, callback) { callback(err); return; } - Fs.mkdir(Path.dirname(path), function (err) { + // 511 -> octal 777 + Fs.mkdir(Path.dirname(path), 511, function (err) { if (err && err.code !== 'EEXIST') { callback(err); return; @@ -154,7 +214,28 @@ var flushUnusedChannels = function (env, cb, frame) { cb(); }; -var getChannel = function (env, id, callback) { +var channelBytes = function (env, chanName, cb) { + var path = mkPath(env, chanName); + Fs.stat(path, function (err, stats) { + if (err) { return void cb(err); } + cb(undefined, stats.size); + }); +}; + +/*:: +export type ChainPadServer_ChannelInternal_t = { + atime: number, + writeStream: typeof(process.stdout), + whenLoaded: ?Array<(err:?Error, chan:?ChainPadServer_ChannelInternal_t)=>void>, + onError: Array<(?Error)=>void>, + path: string +}; +*/ +var getChannel = function ( + env, + id, + callback /*:(err:?Error, chan:?ChainPadServer_ChannelInternal_t)=>void*/ +) { if (env.channels[id]) { var chan = env.channels[id]; chan.atime = +new Date(); @@ -178,9 +259,9 @@ var getChannel = function (env, id, callback) { }); } var path = mkPath(env, id); - var channel = env.channels[id] = { + var channel /*:ChainPadServer_ChannelInternal_t*/ = env.channels[id] = { atime: +new Date(), - writeStream: undefined, + writeStream: (undefined /*:any*/), whenLoaded: [ callback ], onError: [ ], path: path @@ -193,6 +274,9 @@ var getChannel = function (env, id, callback) { if (err) { delete env.channels[id]; } + if (!channel.writeStream) { + throw new Error("getChannel() complete called without channel writeStream"); + } whenLoaded.forEach(function (wl) { wl(err, (err) ? undefined : channel); }); }; var fileExists; @@ -211,7 +295,7 @@ var getChannel = function (env, id, callback) { var stream = channel.writeStream = Fs.createWriteStream(path, { flags: 'a' }); env.openFiles++; stream.on('open', waitFor()); - stream.on('error', function (err) { + stream.on('error', function (err /*:?Error*/) { env.openFiles--; // this might be called after this nThen block closes. if (channel.whenLoaded) { @@ -228,20 +312,22 @@ var getChannel = function (env, id, callback) { }); }; -var message = function (env, chanName, msg, cb) { +const messageBin = (env, chanName, msgBin, cb) => { getChannel(env, chanName, function (err, chan) { - if (err) { + if (!chan) { cb(err); return; } + let called = false; var complete = function (err) { - var _cb = cb; - cb = undefined; - if (_cb) { _cb(err); } + if (called) { return; } + called = true; + cb(err); }; chan.onError.push(complete); - chan.writeStream.write(msg + '\n', function () { - chan.onError.splice(chan.onError.indexOf(complete) - 1, 1); + chan.writeStream.write(msgBin, function () { + /*::if (!chan) { throw new Error("Flow unreachable"); }*/ + chan.onError.splice(chan.onError.indexOf(complete), 1); if (!cb) { return; } //chan.messages.push(msg); chan.atime = +new Date(); @@ -250,9 +336,13 @@ var message = function (env, chanName, msg, cb) { }); }; +var message = function (env, chanName, msg, cb) { + messageBin(env, chanName, new Buffer(msg + '\n', 'utf8'), cb); +}; + var getMessages = function (env, chanName, handler, cb) { getChannel(env, chanName, function (err, chan) { - if (err) { + if (!chan) { cb(err); return; } @@ -271,21 +361,39 @@ var getMessages = function (env, chanName, handler, cb) { errorState = true; return void cb(err); } + if (!chan) { throw new Error("impossible, flow checking"); } chan.atime = +new Date(); cb(); }); }); }; -var channelBytes = function (env, chanName, cb) { - var path = mkPath(env, chanName); - Fs.stat(path, function (err, stats) { - if (err) { return void cb(err); } - cb(void 0, stats.size); - }); +/*:: +export type ChainPadServer_MessageObj_t = { buff: Buffer, offset: number }; +export type ChainPadServer_Storage_t = { + readMessagesBin: ( + channelName:string, + start:number, + asyncMsgHandler:(msg:ChainPadServer_MessageObj_t, moreCb:()=>void, abortCb:()=>void)=>void, + cb:(err:?Error)=>void + )=>void, + message: (channelName:string, content:string, cb:(err:?Error)=>void)=>void, + messageBin: (channelName:string, content:Buffer, cb:(err:?Error)=>void)=>void, + getMessages: (channelName:string, msgHandler:(msg:string)=>void, cb:(err:?Error)=>void)=>void, + removeChannel: (channelName:string, cb:(err:?Error)=>void)=>void, + closeChannel: (channelName:string, cb:(err:?Error)=>void)=>void, + flushUnusedChannels: (cb:()=>void)=>void, + getChannelSize: (channelName:string, cb:(err:?Error, size:?number)=>void)=>void, + getChannelMetadata: (channelName:string, cb:(err:?Error|string, data:?any)=>void)=>void, + clearChannel: (channelName:string, (err:?Error)=>void)=>void }; - -module.exports.create = function (conf, cb) { +const flow_Config = require('../config.example.js'); +type Config_t = typeof(flow_Config); +*/ +module.exports.create = function ( + conf /*:Config_t*/, + cb /*:(store:ChainPadServer_Storage_t)=>void*/ +) { var env = { root: conf.filePath || './datastore', channels: { }, @@ -294,15 +402,22 @@ module.exports.create = function (conf, cb) { openFiles: 0, openFileLimit: conf.openFileLimit || 2048, }; - Fs.mkdir(env.root, function (err) { + // 0x1ff -> 777 + Fs.mkdir(env.root, 0x1ff, function (err) { if (err && err.code !== 'EEXIST') { // TODO: somehow return a nice error throw err; } cb({ + readMessagesBin: (channelName, start, asyncMsgHandler, cb) => { + readMessagesBin(env, channelName, start, asyncMsgHandler, cb); + }, message: function (channelName, content, cb) { message(env, channelName, content, cb); }, + messageBin: (channelName, content, cb) => { + messageBin(env, channelName, content, cb); + }, getMessages: function (channelName, msgHandler, cb) { getMessages(env, channelName, msgHandler, cb); }, @@ -331,4 +446,4 @@ module.exports.create = function (conf, cb) { setInterval(function () { flushUnusedChannels(env, function () { }); }, 5000); -}; +}; \ No newline at end of file