separate more rpc functionality into pinning and core submodules

This commit is contained in:
ansuz 2020-01-24 12:43:11 -05:00
parent 4fd68b672e
commit c93b39c094
3 changed files with 471 additions and 449 deletions

42
lib/commands/core.js Normal file
View File

@ -0,0 +1,42 @@
/*jshint esversion: 6 */
const Core = module.exports;
const Util = require("../common-util");
const escapeKeyCharacters = Util.escapeKeyCharacters;
Core.DEFAULT_LIMIT = 50 * 1024 * 1024;
Core.SESSION_EXPIRATION_TIME = 60 * 1000;
Core.isValidId = function (chan) {
return chan && chan.length && /^[a-zA-Z0-9=+-]*$/.test(chan) &&
[32, 48].indexOf(chan.length) > -1;
};
var makeToken = Core.makeToken = function () {
return Number(Math.floor(Math.random() * Number.MAX_SAFE_INTEGER))
.toString(16);
};
Core.getSession = function (Sessions, key) {
var safeKey = escapeKeyCharacters(key);
if (Sessions[safeKey]) {
Sessions[safeKey].atime = +new Date();
return Sessions[safeKey];
}
var user = Sessions[safeKey] = {};
user.atime = +new Date();
user.tokens = [
makeToken()
];
return user;
};
// getChannelList
// getSession
// getHash
// getMultipleFileSize
// sumChannelSizes
// getFreeSpace
// getLimit

399
lib/commands/pin-rpc.js Normal file
View File

@ -0,0 +1,399 @@
/*jshint esversion: 6 */
const Core = require("./core");
const BatchRead = require("../batch-read");
const Pins = require("../pins");
const Pinning = module.exports;
const Nacl = require("tweetnacl/nacl-fast");
const Util = require("../common-util");
const nThen = require("nthen");
//const escapeKeyCharacters = Util.escapeKeyCharacters;
const unescapeKeyCharacters = Util.unescapeKeyCharacters;
var sumChannelSizes = function (sizes) {
return Object.keys(sizes).map(function (id) { return sizes[id]; })
.filter(function (x) {
// only allow positive numbers
return !(typeof(x) !== 'number' || x <= 0);
})
.reduce(function (a, b) { return a + b; }, 0);
};
// XXX it's possible for this to respond before the server has had a chance
// to fetch the limits. Maybe we should respond with an error...
// or wait until we actually know the limits before responding
var getLimit = Pinning.getLimit = function (Env, publicKey, cb) {
var unescapedKey = unescapeKeyCharacters(publicKey);
var limit = Env.limits[unescapedKey];
var defaultLimit = typeof(Env.defaultStorageLimit) === 'number'?
Env.defaultStorageLimit: Core.DEFAULT_LIMIT;
var toSend = limit && typeof(limit.limit) === "number"?
[limit.limit, limit.plan, limit.note] : [defaultLimit, '', ''];
cb(void 0, toSend);
};
var addPinned = function (
Env,
publicKey /*:string*/,
channelList /*Array<string>*/,
cb /*:()=>void*/)
{
Env.evPinnedPadsReady.reg(() => {
channelList.forEach((c) => {
const x = Env.pinnedPads[c] = Env.pinnedPads[c] || {};
x[publicKey] = 1;
});
cb();
});
};
var removePinned = function (
Env,
publicKey /*:string*/,
channelList /*Array<string>*/,
cb /*:()=>void*/)
{
Env.evPinnedPadsReady.reg(() => {
channelList.forEach((c) => {
const x = Env.pinnedPads[c];
if (!x) { return; }
delete x[publicKey];
});
cb();
});
};
var getMultipleFileSize = function (Env, channels, cb) {
if (!Array.isArray(channels)) { return cb('INVALID_PIN_LIST'); }
if (typeof(Env.msgStore.getChannelSize) !== 'function') {
return cb('GET_CHANNEL_SIZE_UNSUPPORTED');
}
var i = channels.length;
var counts = {};
var done = function () {
i--;
if (i === 0) { return cb(void 0, counts); }
};
channels.forEach(function (channel) {
Pinning.getFileSize(Env, channel, function (e, size) {
if (e) {
// most likely error here is that a file no longer exists
// but a user still has it in their drive, and wants to know
// its size. We should find a way to inform them of this in
// the future. For now we can just tell them it has no size.
//WARN('getFileSize', e);
counts[channel] = 0;
return done();
}
counts[channel] = size;
done();
});
});
};
const batchUserPins = BatchRead("LOAD_USER_PINS");
var loadUserPins = function (Env, publicKey, cb) {
var session = Core.getSession(Env.Sessions, publicKey);
if (session.channels) {
return cb(session.channels);
}
batchUserPins(publicKey, cb, function (done) {
var ref = {};
var lineHandler = Pins.createLineHandler(ref, function (label, data) {
Env.Log.error(label, {
log: publicKey,
data: data,
});
});
// if channels aren't in memory. load them from disk
Env.pinStore.getMessages(publicKey, lineHandler, function () {
// no more messages
// only put this into the cache if it completes
session.channels = ref.pins;
done(ref.pins); // FIXME no error handling?
});
});
};
var truthyKeys = function (O) {
return Object.keys(O).filter(function (k) {
return O[k];
});
};
var getChannelList = Pinning.getChannelList = function (Env, publicKey, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
loadUserPins(Env, publicKey, function (pins) {
cb(truthyKeys(pins));
});
};
const batchTotalSize = BatchRead("GET_TOTAL_SIZE");
Pinning.getTotalSize = function (Env, publicKey, cb) {
var unescapedKey = unescapeKeyCharacters(publicKey);
var limit = Env.limits[unescapedKey];
// Get a common key if multiple users share the same quota, otherwise take the public key
var batchKey = (limit && Array.isArray(limit.users)) ? limit.users.join('') : publicKey;
batchTotalSize(batchKey, cb, function (done) {
var channels = [];
var bytes = 0;
nThen(function (waitFor) {
// Get the channels list for our user account
Pinning.getChannelList(Env, publicKey, waitFor(function (_channels) {
if (!_channels) {
waitFor.abort();
return done('INVALID_PIN_LIST');
}
Array.prototype.push.apply(channels, _channels);
}));
// Get the channels list for users sharing our quota
if (limit && Array.isArray(limit.users) && limit.users.length > 1) {
limit.users.forEach(function (key) {
if (key === unescapedKey) { return; } // Don't count ourselves twice
getChannelList(Env, key, waitFor(function (_channels) {
if (!_channels) { return; } // Broken user, don't count their quota
Array.prototype.push.apply(channels, _channels);
}));
});
}
}).nThen(function (waitFor) {
// Get size of the channels
var list = []; // Contains the channels already counted in the quota to avoid duplicates
channels.forEach(function (channel) { // TODO semaphore?
if (list.indexOf(channel) !== -1) { return; }
list.push(channel);
Pinning.getFileSize(Env, channel, waitFor(function (e, size) {
if (!e) { bytes += size; }
}));
});
}).nThen(function () {
done(void 0, bytes);
});
});
};
/* Users should be able to clear their own pin log with an authenticated RPC
*/
Pinning.removePins = function (Env, safeKey, cb) {
if (typeof(Env.pinStore.removeChannel) !== 'function') {
return void cb("E_NOT_IMPLEMENTED");
}
Env.pinStore.removeChannel(safeKey, function (err) {
Env.Log.info('DELETION_PIN_BY_OWNER_RPC', {
safeKey: safeKey,
status: err? String(err): 'SUCCESS',
});
cb(err);
});
};
Pinning.trimPins = function (Env, safeKey, cb) {
// XXX trim to latest pin checkpoint
cb("NOT_IMPLEMENTED");
};
var getFreeSpace = Pinning.getFreeSpace = function (Env, publicKey, cb) {
getLimit(Env, publicKey, function (e, limit) {
if (e) { return void cb(e); }
Pinning.getTotalSize(Env, publicKey, function (e, size) {
if (typeof(size) === 'undefined') { return void cb(e); }
var rem = limit[0] - size;
if (typeof(rem) !== 'number') {
return void cb('invalid_response');
}
cb(void 0, rem);
});
});
};
var hashChannelList = function (A) {
var uniques = [];
A.forEach(function (a) {
if (uniques.indexOf(a) === -1) { uniques.push(a); }
});
uniques.sort();
var hash = Nacl.util.encodeBase64(Nacl.hash(Nacl
.util.decodeUTF8(JSON.stringify(uniques))));
return hash;
};
var getHash = Pinning.getHash = function (Env, publicKey, cb) {
getChannelList(Env, publicKey, function (channels) {
cb(void 0, hashChannelList(channels));
});
};
Pinning.pinChannel = function (Env, publicKey, channels, cb) {
if (!channels && channels.filter) {
return void cb('INVALID_PIN_LIST');
}
// get channel list ensures your session has a cached channel list
getChannelList(Env, publicKey, function (pinned) {
var session = Core.getSession(Env.Sessions, publicKey);
// only pin channels which are not already pinned
var toStore = channels.filter(function (channel) {
return pinned.indexOf(channel) === -1;
});
if (toStore.length === 0) {
return void getHash(Env, publicKey, cb);
}
getMultipleFileSize(Env, toStore, function (e, sizes) {
if (typeof(sizes) === 'undefined') { return void cb(e); }
var pinSize = sumChannelSizes(sizes);
getFreeSpace(Env, publicKey, function (e, free) {
if (typeof(free) === 'undefined') {
Env.WARN('getFreeSpace', e);
return void cb(e);
}
if (pinSize > free) { return void cb('E_OVER_LIMIT'); }
Env.pinStore.message(publicKey, JSON.stringify(['PIN', toStore, +new Date()]),
function (e) {
if (e) { return void cb(e); }
toStore.forEach(function (channel) {
session.channels[channel] = true;
});
addPinned(Env, publicKey, toStore, () => {});
getHash(Env, publicKey, cb);
});
});
});
});
};
Pinning.unpinChannel = function (Env, publicKey, channels, cb) {
if (!channels && channels.filter) {
// expected array
return void cb('INVALID_PIN_LIST');
}
getChannelList(Env, publicKey, function (pinned) {
var session = Core.getSession(Env.Sessions, publicKey);
// only unpin channels which are pinned
var toStore = channels.filter(function (channel) {
return pinned.indexOf(channel) !== -1;
});
if (toStore.length === 0) {
return void getHash(Env, publicKey, cb);
}
Env.pinStore.message(publicKey, JSON.stringify(['UNPIN', toStore, +new Date()]),
function (e) {
if (e) { return void cb(e); }
toStore.forEach(function (channel) {
delete session.channels[channel];
});
removePinned(Env, publicKey, toStore, () => {});
getHash(Env, publicKey, cb);
});
});
};
Pinning.resetUserPins = function (Env, publicKey, channelList, cb) {
if (!Array.isArray(channelList)) { return void cb('INVALID_PIN_LIST'); }
var session = Core.getSession(Env.Sessions, publicKey);
if (!channelList.length) {
return void getHash(Env, publicKey, function (e, hash) {
if (e) { return cb(e); }
cb(void 0, hash);
});
}
var pins = {};
getMultipleFileSize(Env, channelList, function (e, sizes) {
if (typeof(sizes) === 'undefined') { return void cb(e); }
var pinSize = sumChannelSizes(sizes);
getLimit(Env, publicKey, function (e, limit) {
if (e) {
Env.WARN('[RESET_ERR]', e);
return void cb(e);
}
/* we want to let people pin, even if they are over their limit,
but they should only be able to do this once.
This prevents data loss in the case that someone registers, but
does not have enough free space to pin their migrated data.
They will not be able to pin additional pads until they upgrade
or delete enough files to go back under their limit. */
if (pinSize > limit[0] && session.hasPinned) { return void(cb('E_OVER_LIMIT')); }
Env.pinStore.message(publicKey, JSON.stringify(['RESET', channelList, +new Date()]),
function (e) {
if (e) { return void cb(e); }
channelList.forEach(function (channel) {
pins[channel] = true;
});
var oldChannels;
if (session.channels && typeof(session.channels) === 'object') {
oldChannels = Object.keys(session.channels);
} else {
oldChannels = [];
}
removePinned(Env, publicKey, oldChannels, () => {
addPinned(Env, publicKey, channelList, ()=>{});
});
// update in-memory cache IFF the reset was allowed.
session.channels = pins;
getHash(Env, publicKey, function (e, hash) {
cb(e, hash);
});
});
});
});
};
Pinning.getFileSize = function (Env, channel, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); }
if (channel.length === 32) {
if (typeof(Env.msgStore.getChannelSize) !== 'function') {
return cb('GET_CHANNEL_SIZE_UNSUPPORTED');
}
return void Env.msgStore.getChannelSize(channel, function (e, size /*:number*/) {
if (e) {
if (e.code === 'ENOENT') { return void cb(void 0, 0); }
return void cb(e.code);
}
cb(void 0, size);
});
}
// 'channel' refers to a file, so you need another API
Env.blobStore.size(channel, function (e, size) {
if (typeof(size) === 'undefined') { return void cb(e); }
cb(void 0, size);
});
};

View File

@ -15,10 +15,10 @@ const Package = require('../package.json');
const Pinned = require('../scripts/pinned');
const Saferphore = require("saferphore");
const nThen = require("nthen");
const Pins = require("./pins");
const Meta = require("./metadata");
const WriteQueue = require("./write-queue");
const BatchRead = require("./batch-read");
const Core = require("./commands/core");
const Util = require("./common-util");
const escapeKeyCharacters = Util.escapeKeyCharacters;
@ -26,15 +26,13 @@ const unescapeKeyCharacters = Util.unescapeKeyCharacters;
const mkEvent = Util.mkEvent;
const Admin = require("./commands/admin-rpc");
const Pinning = require("./commands/pin-rpc");
var RPC = module.exports;
var Store = require("../storage/file");
var BlobStore = require("../storage/blob");
var DEFAULT_LIMIT = 50 * 1024 * 1024;
var SESSION_EXPIRATION_TIME = 60 * 1000;
var Log;
var WARN = function (e, output) {
@ -47,16 +45,6 @@ var WARN = function (e, output) {
}
};
var isValidId = function (chan) {
return chan && chan.length && /^[a-zA-Z0-9=+-]*$/.test(chan) &&
[32, 48].indexOf(chan.length) > -1;
};
var makeToken = function () {
return Number(Math.floor(Math.random() * Number.MAX_SAFE_INTEGER))
.toString(16);
};
var makeCookie = function (token) {
var time = (+new Date());
time -= time % 5000;
@ -81,20 +69,6 @@ var parseCookie = function (cookie) {
return c;
};
var getSession = function (Sessions, key) {
var safeKey = escapeKeyCharacters(key);
if (Sessions[safeKey]) {
Sessions[safeKey].atime = +new Date();
return Sessions[safeKey];
}
var user = Sessions[safeKey] = {};
user.atime = +new Date();
user.tokens = [
makeToken()
];
return user;
};
var isTooOld = function (time, now) {
return (now - time) > 300000;
};
@ -121,7 +95,7 @@ var expireSessions = function (Sessions) {
var addTokenForKey = function (Sessions, publicKey, token) {
if (!Sessions[publicKey]) { throw new Error('undefined user'); }
var user = getSession(Sessions, publicKey);
var user = Core.getSession(Sessions, publicKey);
user.tokens.push(token);
user.atime = +new Date();
if (user.tokens.length > 2) { user.tokens.shift(); }
@ -143,7 +117,7 @@ var isValidCookie = function (Sessions, publicKey, cookie) {
return false;
}
var user = getSession(Sessions, publicKey);
var user = Core.getSession(Sessions, publicKey);
if (!user) { return false; }
var idx = user.tokens.indexOf(parsed.seq);
@ -151,7 +125,7 @@ var isValidCookie = function (Sessions, publicKey, cookie) {
if (idx > 0) {
// make a new token
addTokenForKey(Sessions, publicKey, makeToken());
addTokenForKey(Sessions, publicKey, Core.makeToken());
}
return true;
@ -195,74 +169,9 @@ var checkSignature = function (signedMsg, signature, publicKey) {
return Nacl.sign.detached.verify(signedBuffer, signatureBuffer, pubBuffer);
};
const batchUserPins = BatchRead("LOAD_USER_PINS");
var loadUserPins = function (Env, publicKey, cb) {
var session = getSession(Env.Sessions, publicKey);
if (session.channels) {
return cb(session.channels);
}
batchUserPins(publicKey, cb, function (done) {
var ref = {};
var lineHandler = Pins.createLineHandler(ref, function (label, data) {
Log.error(label, {
log: publicKey,
data: data,
});
});
// if channels aren't in memory. load them from disk
Env.pinStore.getMessages(publicKey, lineHandler, function () {
// no more messages
// only put this into the cache if it completes
session.channels = ref.pins;
done(ref.pins); // FIXME no error handling?
});
});
};
var truthyKeys = function (O) {
return Object.keys(O).filter(function (k) {
return O[k];
});
};
var getChannelList = function (Env, publicKey, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
loadUserPins(Env, publicKey, function (pins) {
cb(truthyKeys(pins));
});
};
var getFileSize = function (Env, channel, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidId(channel)) { return void cb('INVALID_CHAN'); }
if (channel.length === 32) {
if (typeof(Env.msgStore.getChannelSize) !== 'function') {
return cb('GET_CHANNEL_SIZE_UNSUPPORTED');
}
return void Env.msgStore.getChannelSize(channel, function (e, size /*:number*/) {
if (e) {
if (e.code === 'ENOENT') { return void cb(void 0, 0); }
return void cb(e.code);
}
cb(void 0, size);
});
}
// 'channel' refers to a file, so you need another API
Env.blobStore.size(channel, function (e, size) {
if (typeof(size) === 'undefined') { return void cb(e); }
cb(void 0, size);
});
};
const batchMetadata = BatchRead("GET_METADATA");
var getMetadata = function (Env, channel, cb) {
if (!isValidId(channel)) { return void cb('INVALID_CHAN'); }
if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); }
if (channel.length !== 32) { return cb("INVALID_CHAN_LENGTH"); }
batchMetadata(channel, cb, function (done) {
@ -309,7 +218,7 @@ var queueMetadata = WriteQueue();
var setMetadata = function (Env, data, unsafeKey, cb) {
var channel = data.channel;
var command = data.command;
if (!channel || !isValidId(channel)) { return void cb ('INVALID_CHAN'); }
if (!channel || !Core.isValidId(channel)) { return void cb ('INVALID_CHAN'); }
if (!command || typeof (command) !== 'string') { return void cb ('INVALID_COMMAND'); }
if (Meta.commands.indexOf(command) === -1) { return void('UNSUPPORTED_COMMAND'); }
@ -382,38 +291,6 @@ var setMetadata = function (Env, data, unsafeKey, cb) {
});
};
var getMultipleFileSize = function (Env, channels, cb) {
if (!Array.isArray(channels)) { return cb('INVALID_PIN_LIST'); }
if (typeof(Env.msgStore.getChannelSize) !== 'function') {
return cb('GET_CHANNEL_SIZE_UNSUPPORTED');
}
var i = channels.length;
var counts = {};
var done = function () {
i--;
if (i === 0) { return cb(void 0, counts); }
};
channels.forEach(function (channel) {
getFileSize(Env, channel, function (e, size) {
if (e) {
// most likely error here is that a file no longer exists
// but a user still has it in their drive, and wants to know
// its size. We should find a way to inform them of this in
// the future. For now we can just tell them it has no size.
//WARN('getFileSize', e);
counts[channel] = 0;
return done();
}
counts[channel] = size;
done();
});
});
};
/* accepts a list, and returns a sublist of channel or file ids which seem
to have been deleted from the server (file size 0)
@ -429,7 +306,7 @@ var getDeletedPads = function (Env, channels, cb) {
var job = function (channel, wait) {
return function (give) {
getFileSize(Env, channel, wait(give(function (e, size) {
Pinning.getFileSize(Env, channel, wait(give(function (e, size) {
if (e) { return; }
if (size === 0) { absentees.push(channel); }
})));
@ -445,72 +322,6 @@ var getDeletedPads = function (Env, channels, cb) {
});
};
const batchTotalSize = BatchRead("GET_TOTAL_SIZE");
var getTotalSize = function (Env, publicKey, cb) {
var unescapedKey = unescapeKeyCharacters(publicKey);
var limit = Env.limits[unescapedKey];
// Get a common key if multiple users share the same quota, otherwise take the public key
var batchKey = (limit && Array.isArray(limit.users)) ? limit.users.join('') : publicKey;
batchTotalSize(batchKey, cb, function (done) {
var channels = [];
var bytes = 0;
nThen(function (waitFor) {
// Get the channels list for our user account
getChannelList(Env, publicKey, waitFor(function (_channels) {
if (!_channels) {
waitFor.abort();
return done('INVALID_PIN_LIST');
}
Array.prototype.push.apply(channels, _channels);
}));
// Get the channels list for users sharing our quota
if (limit && Array.isArray(limit.users) && limit.users.length > 1) {
limit.users.forEach(function (key) {
if (key === unescapedKey) { return; } // Don't count ourselves twice
getChannelList(Env, key, waitFor(function (_channels) {
if (!_channels) { return; } // Broken user, don't count their quota
Array.prototype.push.apply(channels, _channels);
}));
});
}
}).nThen(function (waitFor) {
// Get size of the channels
var list = []; // Contains the channels already counted in the quota to avoid duplicates
channels.forEach(function (channel) { // TODO semaphore?
if (list.indexOf(channel) !== -1) { return; }
list.push(channel);
getFileSize(Env, channel, waitFor(function (e, size) {
if (!e) { bytes += size; }
}));
});
}).nThen(function () {
done(void 0, bytes);
});
});
};
var hashChannelList = function (A) {
var uniques = [];
A.forEach(function (a) {
if (uniques.indexOf(a) === -1) { uniques.push(a); }
});
uniques.sort();
var hash = Nacl.util.encodeBase64(Nacl.hash(Nacl
.util.decodeUTF8(JSON.stringify(uniques))));
return hash;
};
var getHash = function (Env, publicKey, cb) {
getChannelList(Env, publicKey, function (channels) {
cb(void 0, hashChannelList(channels));
});
};
var applyCustomLimits = function (Env, config) {
var isLimit = function (o) {
var valid = o && typeof(o) === 'object' &&
@ -551,7 +362,7 @@ var updateLimits = function (Env, config, publicKey, cb /*:(?string, ?any[])=>vo
if (typeof cb !== "function") { cb = function () {}; }
var defaultLimit = typeof(config.defaultStorageLimit) === 'number'?
config.defaultStorageLimit: DEFAULT_LIMIT;
config.defaultStorageLimit: Core.DEFAULT_LIMIT;
var userId;
if (publicKey) {
@ -612,45 +423,6 @@ var updateLimits = function (Env, config, publicKey, cb /*:(?string, ?any[])=>vo
req.end(body);
};
// XXX it's possible for this to respond before the server has had a chance
// to fetch the limits. Maybe we should respond with an error...
// or wait until we actually know the limits before responding
var getLimit = function (Env, publicKey, cb) {
var unescapedKey = unescapeKeyCharacters(publicKey);
var limit = Env.limits[unescapedKey];
var defaultLimit = typeof(Env.defaultStorageLimit) === 'number'?
Env.defaultStorageLimit: DEFAULT_LIMIT;
var toSend = limit && typeof(limit.limit) === "number"?
[limit.limit, limit.plan, limit.note] : [defaultLimit, '', ''];
cb(void 0, toSend);
};
var getFreeSpace = function (Env, publicKey, cb) {
getLimit(Env, publicKey, function (e, limit) {
if (e) { return void cb(e); }
getTotalSize(Env, publicKey, function (e, size) {
if (typeof(size) === 'undefined') { return void cb(e); }
var rem = limit[0] - size;
if (typeof(rem) !== 'number') {
return void cb('invalid_response');
}
cb(void 0, rem);
});
});
};
var sumChannelSizes = function (sizes) {
return Object.keys(sizes).map(function (id) { return sizes[id]; })
.filter(function (x) {
// only allow positive numbers
return !(typeof(x) !== 'number' || x <= 0);
})
.reduce(function (a, b) { return a + b; }, 0);
};
// inform that the
var loadChannelPins = function (Env) {
Pinned.load(function (err, data) {
@ -670,35 +442,7 @@ var loadChannelPins = function (Env) {
pinPath: Env.paths.pin,
});
};
var addPinned = function (
Env,
publicKey /*:string*/,
channelList /*Array<string>*/,
cb /*:()=>void*/)
{
Env.evPinnedPadsReady.reg(() => {
channelList.forEach((c) => {
const x = Env.pinnedPads[c] = Env.pinnedPads[c] || {};
x[publicKey] = 1;
});
cb();
});
};
var removePinned = function (
Env,
publicKey /*:string*/,
channelList /*Array<string>*/,
cb /*:()=>void*/)
{
Env.evPinnedPadsReady.reg(() => {
channelList.forEach((c) => {
const x = Env.pinnedPads[c];
if (!x) { return; }
delete x[publicKey];
});
cb();
});
};
var isChannelPinned = function (Env, channel, cb) {
Env.evPinnedPadsReady.reg(() => {
if (Env.pinnedPads[channel] && Object.keys(Env.pinnedPads[channel]).length) {
@ -710,138 +454,6 @@ var isChannelPinned = function (Env, channel, cb) {
});
};
var pinChannel = function (Env, publicKey, channels, cb) {
if (!channels && channels.filter) {
return void cb('INVALID_PIN_LIST');
}
// get channel list ensures your session has a cached channel list
getChannelList(Env, publicKey, function (pinned) {
var session = getSession(Env.Sessions, publicKey);
// only pin channels which are not already pinned
var toStore = channels.filter(function (channel) {
return pinned.indexOf(channel) === -1;
});
if (toStore.length === 0) {
return void getHash(Env, publicKey, cb);
}
getMultipleFileSize(Env, toStore, function (e, sizes) {
if (typeof(sizes) === 'undefined') { return void cb(e); }
var pinSize = sumChannelSizes(sizes);
getFreeSpace(Env, publicKey, function (e, free) {
if (typeof(free) === 'undefined') {
WARN('getFreeSpace', e);
return void cb(e);
}
if (pinSize > free) { return void cb('E_OVER_LIMIT'); }
Env.pinStore.message(publicKey, JSON.stringify(['PIN', toStore, +new Date()]),
function (e) {
if (e) { return void cb(e); }
toStore.forEach(function (channel) {
session.channels[channel] = true;
});
addPinned(Env, publicKey, toStore, () => {});
getHash(Env, publicKey, cb);
});
});
});
});
};
var unpinChannel = function (Env, publicKey, channels, cb) {
if (!channels && channels.filter) {
// expected array
return void cb('INVALID_PIN_LIST');
}
getChannelList(Env, publicKey, function (pinned) {
var session = getSession(Env.Sessions, publicKey);
// only unpin channels which are pinned
var toStore = channels.filter(function (channel) {
return pinned.indexOf(channel) !== -1;
});
if (toStore.length === 0) {
return void getHash(Env, publicKey, cb);
}
Env.pinStore.message(publicKey, JSON.stringify(['UNPIN', toStore, +new Date()]),
function (e) {
if (e) { return void cb(e); }
toStore.forEach(function (channel) {
delete session.channels[channel];
});
removePinned(Env, publicKey, toStore, () => {});
getHash(Env, publicKey, cb);
});
});
};
var resetUserPins = function (Env, publicKey, channelList, cb) {
if (!Array.isArray(channelList)) { return void cb('INVALID_PIN_LIST'); }
var session = getSession(Env.Sessions, publicKey);
if (!channelList.length) {
return void getHash(Env, publicKey, function (e, hash) {
if (e) { return cb(e); }
cb(void 0, hash);
});
}
var pins = {};
getMultipleFileSize(Env, channelList, function (e, sizes) {
if (typeof(sizes) === 'undefined') { return void cb(e); }
var pinSize = sumChannelSizes(sizes);
getLimit(Env, publicKey, function (e, limit) {
if (e) {
WARN('[RESET_ERR]', e);
return void cb(e);
}
/* we want to let people pin, even if they are over their limit,
but they should only be able to do this once.
This prevents data loss in the case that someone registers, but
does not have enough free space to pin their migrated data.
They will not be able to pin additional pads until they upgrade
or delete enough files to go back under their limit. */
if (pinSize > limit[0] && session.hasPinned) { return void(cb('E_OVER_LIMIT')); }
Env.pinStore.message(publicKey, JSON.stringify(['RESET', channelList, +new Date()]),
function (e) {
if (e) { return void cb(e); }
channelList.forEach(function (channel) {
pins[channel] = true;
});
var oldChannels;
if (session.channels && typeof(session.channels) === 'object') {
oldChannels = Object.keys(session.channels);
} else {
oldChannels = [];
}
removePinned(Env, publicKey, oldChannels, () => {
addPinned(Env, publicKey, channelList, ()=>{});
});
// update in-memory cache IFF the reset was allowed.
session.channels = pins;
getHash(Env, publicKey, function (e, hash) {
cb(e, hash);
});
});
});
});
};
var clearOwnedChannel = function (Env, channelId, unsafeKey, cb) {
if (typeof(channelId) !== 'string' || channelId.length !== 32) {
return cb('INVALID_ARGUMENTS');
@ -861,7 +473,7 @@ var clearOwnedChannel = function (Env, channelId, unsafeKey, cb) {
};
var removeOwnedChannel = function (Env, channelId, unsafeKey, cb) {
if (typeof(channelId) !== 'string' || !isValidId(channelId)) {
if (typeof(channelId) !== 'string' || !Core.isValidId(channelId)) {
return cb('INVALID_ARGUMENTS');
}
@ -948,26 +560,6 @@ var removeOwnedChannelHistory = function (Env, channelId, unsafeKey, hash, cb) {
});
};
/* Users should be able to clear their own pin log with an authenticated RPC
*/
var removePins = function (Env, safeKey, cb) {
if (typeof(Env.pinStore.removeChannel) !== 'function') {
return void cb("E_NOT_IMPLEMENTED");
}
Env.pinStore.removeChannel(safeKey, function (err) {
Log.info('DELETION_PIN_BY_OWNER_RPC', {
safeKey: safeKey,
status: err? String(err): 'SUCCESS',
});
cb(err);
});
};
var trimPins = function (Env, safeKey, cb) {
// XXX trim to latest pin checkpoint
cb("NOT_IMPLEMENTED");
};
/*
We assume that the server is secured against MitM attacks
@ -1136,7 +728,7 @@ var ARRAY_LINE = /^\[/;
otherwise false
*/
var isNewChannel = function (Env, channel, cb) {
if (!isValidId(channel)) { return void cb('INVALID_CHAN'); }
if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); }
if (channel.length !== 32) { return void cb('INVALID_CHAN'); }
var done = false;
@ -1172,7 +764,7 @@ var writePrivateMessage = function (Env, args, nfwssCtx, cb) {
if (!msg) { return void cb("INVALID_MESSAGE"); }
// don't support anything except regular channels
if (!isValidId(channelId) || channelId.length !== 32) {
if (!Core.isValidId(channelId) || channelId.length !== 32) {
return void cb("INVALID_CHAN");
}
@ -1270,7 +862,7 @@ var upload_status = function (Env, safeKey, filesize, _cb) { // FIXME FILES
}).nThen(function () {
// if yuo're here then there are no pending uploads
// check if you have space in your quota to upload something of this size
getFreeSpace(Env, safeKey, function (e, free) {
Pinning.getFreeSpace(Env, safeKey, function (e, free) {
if (e) { return void cb(e); }
if (filesize >= free) { return cb('NOT_ENOUGH_SPACE'); }
cb(void 0, false);
@ -1300,6 +892,8 @@ RPC.create = function (config, cb) {
limits: {},
admins: [],
sessionExpirationInterval: undefined,
Log: Log,
WARN: WARN,
};
try {
@ -1344,7 +938,7 @@ RPC.create = function (config, cb) {
break;
}
case 'GET_FILE_SIZE':
return void getFileSize(Env, msg[1], function (e, size) {
return void Pinning.getFileSize(Env, msg[1], function (e, size) {
WARN(e, msg[1]);
respond(e, [null, size, null]);
});
@ -1354,7 +948,7 @@ RPC.create = function (config, cb) {
respond(e, [null, data, null]);
});
case 'GET_MULTIPLE_FILE_SIZE':
return void getMultipleFileSize(Env, msg[1], function (e, dict) {
return void Pinning.getMultipleFileSize(Env, msg[1], function (e, dict) {
if (e) {
WARN(e, dict);
return respond(e);
@ -1414,7 +1008,7 @@ RPC.create = function (config, cb) {
// make sure a user object is initialized in the cookie jar
if (publicKey) {
getSession(Sessions, publicKey);
Core.getSession(Sessions, publicKey);
} else {
Log.debug("NO_PUBLIC_KEY_PROVIDED", publicKey);
}
@ -1471,38 +1065,33 @@ RPC.create = function (config, cb) {
switch (msg[0]) {
case 'COOKIE': return void Respond(void 0);
case 'RESET':
return resetUserPins(Env, safeKey, msg[1], function (e, hash) {
return Pinning.resetUserPins(Env, safeKey, msg[1], function (e, hash) {
//WARN(e, hash);
return void Respond(e, hash);
});
case 'PIN':
return pinChannel(Env, safeKey, msg[1], function (e, hash) {
return Pinning.pinChannel(Env, safeKey, msg[1], function (e, hash) {
WARN(e, hash);
Respond(e, hash);
});
case 'UNPIN':
return unpinChannel(Env, safeKey, msg[1], function (e, hash) {
return Pinning.unpinChannel(Env, safeKey, msg[1], function (e, hash) {
WARN(e, hash);
Respond(e, hash);
});
case 'GET_HASH':
return void getHash(Env, safeKey, function (e, hash) {
return void Pinning.getHash(Env, safeKey, function (e, hash) {
WARN(e, hash);
Respond(e, hash);
});
case 'GET_TOTAL_SIZE': // TODO cache this, since it will get called quite a bit
return getTotalSize(Env, safeKey, function (e, size) {
return Pinning.getTotalSize(Env, safeKey, function (e, size) {
if (e) {
WARN(e, safeKey);
return void Respond(e);
}
Respond(e, size);
});
case 'GET_FILE_SIZE':
return void getFileSize(Env, msg[1], function (e, size) {
WARN(e, msg[1]);
Respond(e, size);
});
case 'UPDATE_LIMITS':
return void updateLimits(Env, config, safeKey, function (e, limit) {
if (e) {
@ -1512,21 +1101,13 @@ RPC.create = function (config, cb) {
Respond(void 0, limit);
});
case 'GET_LIMIT':
return void getLimit(Env, safeKey, function (e, limit) {
return void Pinning.getLimit(Env, safeKey, function (e, limit) {
if (e) {
WARN(e, limit);
return void Respond(e);
}
Respond(void 0, limit);
});
case 'GET_MULTIPLE_FILE_SIZE':
return void getMultipleFileSize(Env, msg[1], function (e, dict) {
if (e) {
WARN(e, dict);
return void Respond(e);
}
Respond(void 0, dict);
});
case 'EXPIRE_SESSION':
return void setTimeout(function () {
expireSession(Sessions, safeKey);
@ -1549,12 +1130,12 @@ RPC.create = function (config, cb) {
Respond(void 0, 'OK');
});
case 'REMOVE_PINS':
return void removePins(Env, safeKey, function (e) {
return void Pinning.removePins(Env, safeKey, function (e) {
if (e) { return void Respond(e); }
Respond(void 0, "OK");
});
case 'TRIM_PINS':
return void trimPins(Env, safeKey, function (e) {
return void Pinning.trimPins(Env, safeKey, function (e) {
if (e) { return void Respond(e); }
Respond(void 0, "OK");
});
@ -1568,7 +1149,7 @@ RPC.create = function (config, cb) {
return void upload_status(Env, safeKey, filesize, function (e, yes) {
if (!e && !yes) {
// no pending uploads, set the new size
var user = getSession(Sessions, safeKey);
var user = Core.getSession(Sessions, safeKey);
user.pendingUploadSize = filesize;
user.currentUploadSize = 0;
}
@ -1665,7 +1246,7 @@ RPC.create = function (config, cb) {
blobStagingPath: config.blobStagingPath,
archivePath: config.archivePath,
getSession: function (safeKey) {
return getSession(Sessions, safeKey);
return Core.getSession(Sessions, safeKey);
},
}, w(function (err, blob) {
if (err) { throw new Error(err); }
@ -1677,6 +1258,6 @@ RPC.create = function (config, cb) {
// XXX allow for graceful shutdown
Env.sessionExpirationInterval = setInterval(function () {
expireSessions(Sessions);
}, SESSION_EXPIRATION_TIME);
}, Core.SESSION_EXPIRATION_TIME);
});
};