From dc0f331cfb8e44b0b5f452088b6cec8d0a117940 Mon Sep 17 00:00:00 2001 From: Auxane Date: Wed, 11 Oct 2023 18:06:06 +0200 Subject: [PATCH] fix(JingleSessionPC): call task callbacks upon clearing (#2370) This ensures that promises relying on the task callback will settle. --- modules/util/AsyncQueue.js | 20 +++++++++++++++++ modules/xmpp/JingleSessionPC.js | 22 ++++++++++++++++++- .../hand-crafted/modules/util/AsyncQueue.d.ts | 4 ++++ 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/modules/util/AsyncQueue.js b/modules/util/AsyncQueue.js index b7883c3f50..60b19962cb 100644 --- a/modules/util/AsyncQueue.js +++ b/modules/util/AsyncQueue.js @@ -3,6 +3,19 @@ import { queue } from 'async-es'; const logger = getLogger(__filename); +/** + * Error to be passed to a callback of a queued task when the queue is cleared. + */ +export class ClearedQueueError extends Error { + /** + * Creates new instance. + */ + constructor(message) { + super(message); + this.name = 'ClearedQueueError'; + } +} + /** * A queue for async task execution. */ @@ -13,12 +26,16 @@ export default class AsyncQueue { constructor() { this._queue = queue(this._processQueueTasks.bind(this), 1); this._stopped = false; + this._taskCallbacks = new Map(); } /** * Removes any pending tasks from the queue. */ clear() { + for (const finishedCallback of this._taskCallbacks.values()) { + finishedCallback(new ClearedQueueError('The queue has been cleared')); + } this._queue.kill(); } @@ -31,6 +48,8 @@ export default class AsyncQueue { } catch (error) { logger.error(`Task failed: ${error?.stack}`); finishedCallback(error); + } finally { + this._taskCallbacks.delete(task); } } @@ -64,6 +83,7 @@ export default class AsyncQueue { return; } + this._taskCallbacks.set(task, callback); this._queue.push(task, callback); } diff --git a/modules/xmpp/JingleSessionPC.js b/modules/xmpp/JingleSessionPC.js index 84760970c2..9814ba8ee4 100644 --- a/modules/xmpp/JingleSessionPC.js +++ b/modules/xmpp/JingleSessionPC.js @@ -18,7 +18,7 @@ import SDP from '../sdp/SDP'; import SDPDiffer from '../sdp/SDPDiffer'; import SDPUtil from '../sdp/SDPUtil'; import Statistics from '../statistics/statistics'; -import AsyncQueue from '../util/AsyncQueue'; +import AsyncQueue, { ClearedQueueError } from '../util/AsyncQueue'; import GlobalOnErrorHandler from '../util/GlobalOnErrorHandler'; import browser from './../browser'; @@ -1321,6 +1321,11 @@ export default class JingleSessionPC extends JingleSession { workFunction, error => { if (error) { + if (error instanceof ClearedQueueError) { + // The session might have been terminated before the task was executed, making it obsolete. + logger.debug(`${this} ICE restart task aborted: session terminated`); + success(); + } logger.error(`${this} ICE restart task failed: ${error}`); failure(error); } else { @@ -2194,6 +2199,11 @@ export default class JingleSessionPC extends JingleSession { workFunction, error => { if (error) { + if (error instanceof ClearedQueueError) { + // The session might have been terminated before the task was executed, making it obsolete. + logger.debug(`${this} renegotiation after addTrack aborted: session terminated`); + resolve(); + } logger.error(`${this} renegotiation after addTrack error`, error); reject(error); } else { @@ -2306,6 +2316,11 @@ export default class JingleSessionPC extends JingleSession { workFunction, error => { if (error) { + if (error instanceof ClearedQueueError) { + // The session might have been terminated before the task was executed, making it obsolete. + logger.debug('Replace track aborted: session terminated'); + resolve(); + } logger.error(`${this} Replace track error:`, error); reject(error); } else { @@ -2504,6 +2519,11 @@ export default class JingleSessionPC extends JingleSession { workFunction, error => { if (error) { + if (error instanceof ClearedQueueError) { + // The session might have been terminated before the task was executed, making it obsolete. + logger.debug(`${this} ${operationName} aborted: session terminated`); + resolve(); + } logger.error(`${this} ${operationName} failed`); reject(error); } else { diff --git a/types/hand-crafted/modules/util/AsyncQueue.d.ts b/types/hand-crafted/modules/util/AsyncQueue.d.ts index ac45836be3..fd11370ea7 100644 --- a/types/hand-crafted/modules/util/AsyncQueue.d.ts +++ b/types/hand-crafted/modules/util/AsyncQueue.d.ts @@ -1,3 +1,7 @@ +export class ClearedQueueError extends Error { + constructor(); +} + export default class AsyncQueue { constructor(); push: ( task: ( callback: ( err?: Error ) => void ) => void, callback?: ( err: Error ) => void ) => void; // TODO: check this