Respect 429 rate-limiting during attachment backup

This commit is contained in:
trevor-signal 2024-08-01 16:55:13 -04:00 committed by GitHub
parent 0433264eed
commit 098f54602e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 78 additions and 7 deletions

View File

@ -60,6 +60,7 @@ import {
isVideoTypeSupported,
} from '../util/GoogleChrome';
import { getLocalAttachmentUrl } from '../util/getLocalAttachmentUrl';
import { findRetryAfterTimeFromError } from './helpers/findRetryAfterTimeFromError';
const MAX_CONCURRENT_JOBS = 3;
const RETRY_CONFIG = {
@ -215,6 +216,17 @@ export async function runAttachmentBackupJob(
return { status: 'finished' };
}
if (
error instanceof Error &&
'code' in error &&
(error.code === 413 || error.code === 429)
) {
return {
status: 'rate-limited',
pauseDurationMs: findRetryAfterTimeFromError(error),
};
}
return { status: 'retry' };
}
}

View File

@ -60,23 +60,24 @@ export type JobManagerJobResultType<CoreJobType> =
| {
status: 'retry';
}
| { status: 'finished'; newJob?: CoreJobType };
| { status: 'finished'; newJob?: CoreJobType }
| { status: 'rate-limited'; pauseDurationMs: number };
export abstract class JobManager<CoreJobType> {
protected enabled: boolean = false;
protected activeJobs: Map<
private enabled: boolean = false;
private activeJobs: Map<
string,
{
completionPromise: ExplodePromiseResultType<void>;
job: CoreJobType & JobManagerJobType;
}
> = new Map();
protected jobStartPromises: Map<string, ExplodePromiseResultType<void>> =
private jobStartPromises: Map<string, ExplodePromiseResultType<void>> =
new Map();
protected jobCompletePromises: Map<string, ExplodePromiseResultType<void>> =
private jobCompletePromises: Map<string, ExplodePromiseResultType<void>> =
new Map();
private tickTimeout: NodeJS.Timeout | null = null;
protected tickTimeout: NodeJS.Timeout | null = null;
protected logPrefix = 'JobManager';
public tickInterval = DEFAULT_TICK_INTERVAL;
constructor(readonly params: JobManagerParamsType<CoreJobType>) {}
@ -98,13 +99,22 @@ export abstract class JobManager<CoreJobType> {
);
}
tick(): void {
private tick(): void {
clearTimeoutIfNecessary(this.tickTimeout);
this.tickTimeout = null;
drop(this.maybeStartJobs());
this.tickTimeout = setTimeout(() => this.tick(), this.tickInterval);
}
private pauseForDuration(durationMs: number): void {
this.enabled = false;
clearTimeoutIfNecessary(this.tickTimeout);
this.tickTimeout = setTimeout(() => {
this.enabled = true;
this.tick();
}, durationMs);
}
// used in testing
waitForJobToBeStarted(
job: CoreJobType & Pick<JobManagerJobType, 'attempts'>
@ -270,6 +280,13 @@ export abstract class JobManager<CoreJobType> {
}
await this.retryJobLater(job);
return;
case 'rate-limited':
log.info(
`${logId}: rate-limited; retrying in ${jobRunResult.pauseDurationMs}`
);
this.pauseForDuration(jobRunResult.pauseDurationMs);
await this.retryJobLater(job);
return;
default:
throw missingCaseError(status);
}

View File

@ -25,6 +25,7 @@ import { APPLICATION_OCTET_STREAM, VIDEO_MP4 } from '../../types/MIME';
import { createName, getRelativePath } from '../../util/attachmentPath';
import { encryptAttachmentV2, generateKeys } from '../../AttachmentCrypto';
import { SECOND } from '../../util/durations';
import { HTTPError } from '../../textsecure/Errors';
const TRANSIT_CDN = 2;
const TRANSIT_CDN_FOR_NEW_UPLOAD = 42;
@ -42,6 +43,7 @@ describe('AttachmentBackupManager/JobManager', function attachmentBackupManager(
let backupsService = {};
let encryptAndUploadAttachment: sinon.SinonStub;
let sandbox: sinon.SinonSandbox;
let clock: sinon.SinonFakeTimers;
let isInCall: sinon.SinonStub;
function composeJob(
@ -116,6 +118,7 @@ describe('AttachmentBackupManager/JobManager', function attachmentBackupManager(
await window.storage.put('masterKey', Bytes.toBase64(getRandomBytes(32)));
sandbox = sinon.createSandbox();
clock = sandbox.useFakeTimers();
isInCall = sandbox.stub().returns(false);
backupMediaBatch = sandbox
@ -331,6 +334,45 @@ describe('AttachmentBackupManager/JobManager', function attachmentBackupManager(
assert.strictEqual(allRemainingJobs.length, 0);
});
it('pauses if it receives a retryAfter', async () => {
const jobs = await addJobs(5, { transitCdnInfo: undefined });
encryptAndUploadAttachment.throws(
new HTTPError('Rate limited', {
code: 429,
headers: { 'retry-after': '100' },
})
);
await backupManager?.start();
await waitForJobToBeStarted(jobs[2]);
assert.strictEqual(runJob.callCount, 3);
assertRunJobCalledWith([jobs[4], jobs[3], jobs[2]]);
// no jobs have occurred
await clock.tickAsync(50000);
assert.strictEqual(runJob.callCount, 3);
encryptAndUploadAttachment.returns({
cdnKey: 'newKeyOnTransitTier',
cdnNumber: TRANSIT_CDN_FOR_NEW_UPLOAD,
});
await clock.tickAsync(100000);
await waitForJobToBeStarted(jobs[0]);
assert.strictEqual(runJob.callCount, 8);
assertRunJobCalledWith([
jobs[4],
jobs[3],
jobs[2],
jobs[4],
jobs[3],
jobs[2],
jobs[1],
jobs[0],
]);
});
describe('thumbnail backups', () => {
it('addJobAndMaybeThumbnailJob conditionally adds thumbnail job', async () => {
const jobForVisualAttachment = composeJob(0);