diff --git a/ts/background.ts b/ts/background.ts index a3e7dccfec..45483c8183 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -735,6 +735,9 @@ export async function startApp(): Promise { 'background/shutdown: shutdown requested' ); + const attachmentDownloadStopPromise = AttachmentDownloadManager.stop(); + const attachmentBackupStopPromise = AttachmentBackupManager.stop(); + server?.cancelInflightRequests('shutdown'); // Stop background processing @@ -815,13 +818,12 @@ export async function startApp(): Promise { ]); log.info( - 'background/shutdown: waiting for all attachment downloads to finish' + 'background/shutdown: waiting for all attachment backups & downloads to finish' ); - // Since we canceled the inflight requests earlier in shutdown, these should // resolve quickly - await AttachmentDownloadManager.stop(); - await AttachmentBackupManager.stop(); + await attachmentDownloadStopPromise; + await attachmentBackupStopPromise; log.info('background/shutdown: closing the database'); diff --git a/ts/jobs/AttachmentDownloadManager.ts b/ts/jobs/AttachmentDownloadManager.ts index 6720e53f3e..23adb9168a 100644 --- a/ts/jobs/AttachmentDownloadManager.ts +++ b/ts/jobs/AttachmentDownloadManager.ts @@ -208,12 +208,10 @@ export class AttachmentDownloadManager extends JobManager { - log.info('AttachmentDownloadManager/starting'); await AttachmentDownloadManager.instance.start(); } static async stop(): Promise { - log.info('AttachmentDownloadManager/stopping'); return AttachmentDownloadManager._instance?.stop(); } diff --git a/ts/jobs/JobManager.ts b/ts/jobs/JobManager.ts index 32364d5b9f..3b16bdf706 100644 --- a/ts/jobs/JobManager.ts +++ b/ts/jobs/JobManager.ts @@ -83,19 +83,26 @@ export abstract class JobManager { constructor(readonly params: JobManagerParamsType) {} async start(): Promise { + log.info(`${this.logPrefix}: starting`); + this.enabled = true; await this.params.markAllJobsInactive(); this.tick(); } async stop(): Promise { + const activeJobs = [...this.activeJobs.values()]; + + log.info( + `${this.logPrefix}: stopping. There are ` + + `${activeJobs.length} active job(s)` + ); + this.enabled = false; clearTimeoutIfNecessary(this.tickTimeout); this.tickTimeout = null; await Promise.all( - [...this.activeJobs.values()].map( - ({ completionPromise }) => completionPromise.promise - ) + activeJobs.map(({ completionPromise }) => completionPromise.promise) ); } diff --git a/ts/textsecure/WebAPI.ts b/ts/textsecure/WebAPI.ts index dafd701152..bba19b624e 100644 --- a/ts/textsecure/WebAPI.ts +++ b/ts/textsecure/WebAPI.ts @@ -257,15 +257,10 @@ function getHostname(url: string): string { return urlObject.hostname; } -type FetchOptionsType = { - method: string; - body?: Uint8Array | Readable | string; - headers: FetchHeaderListType; - redirect?: 'error' | 'follow' | 'manual'; - agent?: Agent; +type FetchOptionsType = Omit & { + headers: Record; + // This is patch-packaged ca?: string; - timeout?: number; - abortSignal?: AbortSignal; }; async function getFetchOptions( @@ -297,7 +292,7 @@ async function getFetchOptions( const agentEntry = agents[cacheKey]; const agent = agentEntry?.agent ?? null; - const fetchOptions = { + const fetchOptions: FetchOptionsType = { method: options.type, body: typeof options.data === 'function' ? options.data() : options.data, headers: { @@ -309,7 +304,7 @@ async function getFetchOptions( agent, ca: options.certificateAuthority, timeout, - abortSignal: options.abortSignal, + signal: options.abortSignal, }; if (options.contentType) { @@ -364,7 +359,6 @@ async function _promiseAjax( response = socketManager ? await socketManager.fetch(url, fetchOptions) : await fetch(url, fetchOptions); - if ( options.serverUrl && getHostname(options.serverUrl) === getHostname(url) @@ -414,7 +408,6 @@ async function _promiseAjax( options.stack ); } - if ( options.responseType === 'json' || options.responseType === 'jsonwithdetails' @@ -433,6 +426,17 @@ async function _promiseAjax( } } + if (options.responseType === 'stream') { + log.info(logId, response.status, 'Streaming'); + response.body.on('error', e => { + log.info(logId, 'Errored while streaming:', e.message); + }); + response.body.on('end', () => { + log.info(logId, response.status, 'Streaming ended'); + }); + return result; + } + log.info(logId, response.status, 'Success'); if (options.responseType === 'byteswithdetails') { @@ -3482,19 +3486,38 @@ export function initialize({ }): Promise { const abortController = new AbortController(); const cdnUrl = cdnUrlObject[cdnNumber] ?? cdnUrlObject['0']; + + let downloadStream: Readable | undefined; + + const cancelRequest = () => { + abortController.abort(); + }; + + registerInflightRequest(cancelRequest); + // This is going to the CDN, not the service, so we use _outerAjax - const downloadStream = await _outerAjax(`${cdnUrl}${cdnPath}`, { - headers, - certificateAuthority, - disableRetries: options?.disableRetries, - proxyUrl, - responseType: 'stream', - timeout: options?.timeout || 0, - type: 'GET', - redactUrl: redactor, - version, - abortSignal: abortController.signal, - }); + try { + downloadStream = await _outerAjax(`${cdnUrl}${cdnPath}`, { + headers, + certificateAuthority, + disableRetries: options?.disableRetries, + proxyUrl, + responseType: 'stream', + timeout: options?.timeout || 0, + type: 'GET', + redactUrl: redactor, + version, + abortSignal: abortController.signal, + }); + } finally { + if (!downloadStream) { + unregisterInFlightRequest(cancelRequest); + } else { + downloadStream.on('close', () => { + unregisterInFlightRequest(cancelRequest); + }); + } + } const timeoutStream = getTimeoutStream({ name: `getAttachment(${redactor(cdnPath)})`, @@ -3509,16 +3532,6 @@ export function initialize({ }) .pipe(timeoutStream); - const cancelRequest = (error: Error) => { - combinedStream.emit('error', error); - abortController.abort(); - }; - registerInflightRequest(cancelRequest); - - combinedStream.on('done', () => { - unregisterInFlightRequest(cancelRequest); - }); - return combinedStream; }