Merge branch 'staging' into stoppegp-code-authorcolors-PR

This commit is contained in:
yflory 2020-04-08 17:03:30 +02:00
commit cba8f5fce6
5 changed files with 134 additions and 7 deletions

View File

@ -7,6 +7,9 @@ const Path = require("path");
const Util = require("./common-util");
const Plan = require("./plan");
const Semaphore = require('saferphore');
const nThen = require('nthen');
/* Accepts a reference to an object, and...
either a string describing which log is being processed (backwards compatibility),
or a function which will log the error with all relevant data
@ -194,3 +197,63 @@ Pins.list = function (_done, config) {
}).start();
});
};
Pins.load = function (cb, config) {
const sema = Semaphore.create(config.workers || 5);
let dirList;
const fileList = [];
const pinned = {};
var pinPath = config.pinPath || './pins';
var done = Util.once(cb);
nThen((waitFor) => {
// recurse over the configured pinPath, or the default
Fs.readdir(pinPath, waitFor((err, list) => {
if (err) {
if (err.code === 'ENOENT') {
dirList = [];
return; // this ends up calling back with an empty object
}
waitFor.abort();
return void done(err);
}
dirList = list;
}));
}).nThen((waitFor) => {
dirList.forEach((f) => {
sema.take((returnAfter) => {
// iterate over all the subdirectories in the pin store
Fs.readdir(Path.join(pinPath, f), waitFor(returnAfter((err, list2) => {
if (err) {
waitFor.abort();
return void done(err);
}
list2.forEach((ff) => {
if (config && config.exclude && config.exclude.indexOf(ff) > -1) { return; }
fileList.push(Path.join(pinPath, f, ff));
});
})));
});
});
}).nThen((waitFor) => {
fileList.forEach((f) => {
sema.take((returnAfter) => {
Fs.readFile(f, waitFor(returnAfter((err, content) => {
if (err) {
waitFor.abort();
return void done(err);
}
const hashes = Pins.calculateFromLog(content.toString('utf8'), f);
hashes.forEach((x) => {
(pinned[x] = pinned[x] || {})[f.replace(/.*\/([^/]*).ndjson$/, (x, y)=>y)] = 1;
});
})));
});
});
}).nThen(() => {
done(void 0, pinned);
});
};

View File

@ -14,6 +14,28 @@ const readFileBin = require("../stream-file").readFileBin;
const BatchRead = require("../batch-read");
const Schedule = require("../schedule");
/* Each time you write to a channel it will either use an open file descriptor
for that channel or open a new descriptor if one is not available. These are
automatically closed after this window to prevent a file descriptor leak, so
writes that take longer than this time may be dropped! */
const CHANNEL_WRITE_WINDOW = 300000;
/* Each time you read a channel it will have this many milliseconds to complete
otherwise it will be closed to prevent a file descriptor leak. The server will
lock up if it uses all available file descriptors, so it's important to close
them. The tradeoff with this timeout is that some functions, the stream, and
and the timeout itself are stored in memory. A longer timeout uses more memory
and running out of memory will also kill the server. */
const STREAM_CLOSE_TIMEOUT = 300000;
/* The above timeout closes the stream, but apparently that doesn't always work.
We set yet another timeout to allow the runtime to gracefully close the stream
(flushing all pending writes/reads and doing who knows what else). After this timeout
it will be MERCILESSLY DESTROYED. This isn't graceful, but again, file descriptor
leaks are bad. */
const STREAM_DESTROY_TIMEOUT = 30000;
const isValidChannelId = function (id) {
return typeof(id) === 'string' &&
id.length >= 32 && id.length < 50 &&
@ -64,7 +86,7 @@ const destroyStream = function (stream) {
try { stream.close(); } catch (err) { console.error(err); }
setTimeout(function () {
try { stream.destroy(); } catch (err) { console.error(err); }
}, 15000);
}, STREAM_DESTROY_TIMEOUT);
};
const ensureStreamCloses = function (stream, id, ms) {
@ -74,7 +96,7 @@ const ensureStreamCloses = function (stream, id, ms) {
// this can only be a timeout error...
console.log("stream close error:", err, id);
}
}), ms || 45000), []);
}), ms || STREAM_CLOSE_TIMEOUT), []);
};
// readMessagesBin asynchronously iterates over the messages in a channel log
@ -729,7 +751,7 @@ var getChannel = function (env, id, _callback) {
delete env.channels[id];
destroyStream(channel.writeStream, path);
//console.log("closing writestream");
}, 120000);
}, CHANNEL_WRITE_WINDOW);
channel.delayClose();
env.channels[id] = channel;
done(void 0, channel);

View File

@ -0,0 +1,42 @@
/* jshint esversion: 6, node: true */
const nThen = require("nthen");
const Pins = require("../lib/pins");
const Assert = require("assert");
const config = require("../lib/load-config");
var compare = function () {
console.log(config);
var conf = {
pinPath: config.pinPath,
};
var list, load;
nThen(function (w) {
Pins.list(w(function (err, p) {
if (err) { throw err; }
list = p;
console.log(p);
console.log(list);
console.log();
}), conf);
}).nThen(function (w) {
Pins.load(w(function (err, p) {
if (err) { throw err; }
load = p;
console.log(load);
console.log();
}), conf);
}).nThen(function () {
console.log({
listLength: Object.keys(list).length,
loadLength: Object.keys(load).length,
});
Assert.deepEqual(list, load);
console.log("methods are equivalent");
});
};
compare();

View File

@ -42,7 +42,7 @@ nThen(function (w) {
store = _;
})); // load the list of pinned files so you know which files
// should not be archived or deleted
Pins.list(w(function (err, _) {
Pins.load(w(function (err, _) {
if (err) {
w.abort();
return void console.error(err);

View File

@ -71,7 +71,7 @@ define([
// Get contacts and extract their avatar channel and key
var getData = function (obj, href) {
var parsed = Hash.parsePadUrl(href);
if (!parsed || parsed.type !== "file") { return; } // XXX
if (!parsed || parsed.type !== "file") { return; }
var secret = Hash.getSecrets('file', parsed.hash);
if (!secret.keys || !secret.channel) { return; }
obj.avatarKey = Hash.encodeBase64(secret.keys && secret.keys.cryptKey);
@ -81,7 +81,7 @@ define([
contacts.friends = proxy.friends || {};
Object.keys(contacts.friends).map(function (key) {
var friend = contacts.friends[key];
// if (!friend) { return; } // XXX how should this be handled?
if (!friend) { return; }
var ret = {
edPublic: friend.edPublic,
name: friend.displayName,
@ -91,7 +91,7 @@ define([
});
Object.keys(contacts.teams).map(function (key) {
var team = contacts.teams[key];
// if (!team) { return; } // XXX how should this be handled. Is this possible?
if (!team) { return; }
var avatar = team.metadata && team.metadata.avatar;
var ret = {
edPublic: team.keys && team.keys.drive && team.keys.drive.edPublic,