Skip to content

Commit

Permalink
fix(name-processors): wait for all processors when closing fixes #1618
Browse files Browse the repository at this point in the history
  • Loading branch information
manast authored Oct 13, 2021
1 parent c2194fe commit 79ce013
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 6 deletions.
14 changes: 8 additions & 6 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -616,12 +616,12 @@ Queue.prototype.process = function(name, concurrency, handler) {
this.setHandler(name, handler);

return this._initProcess().then(() => {
return this.start(concurrency);
return this.start(concurrency, name);
});
};

Queue.prototype.start = function(concurrency) {
return this.run(concurrency).catch(err => {
Queue.prototype.start = function(concurrency, name) {
return this.run(concurrency, name).catch(err => {
this.emit('error', err, 'error running queue');
throw err;
});
Expand Down Expand Up @@ -860,7 +860,7 @@ Queue.prototype.isPaused = async function(isLocal) {
}
};

Queue.prototype.run = function(concurrency) {
Queue.prototype.run = function(concurrency, handlerName) {
const promises = [];

return this.isReady()
Expand All @@ -874,7 +874,7 @@ Queue.prototype.run = function(concurrency) {
while (concurrency--) {
promises.push(
new Promise(resolve => {
this.processJobs(concurrency, resolve);
this.processJobs(`${handlerName}:${concurrency}`, resolve);
})
);
}
Expand Down Expand Up @@ -1279,7 +1279,9 @@ Queue.prototype.whenCurrentJobsFinished = function() {
return this.bclient.connect();
});

return Promise.all(this.processing).then(() => forcedReconnection);
return Promise.all(Object.values(this.processing)).then(
() => forcedReconnection
);
};

//
Expand Down
28 changes: 28 additions & 0 deletions test/test_queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -1287,6 +1287,34 @@ describe('Queue', () => {
});
});

it('should wait for all jobs when closing queue with named processors', async () => {
let processedA = false;

const startProcessing = new Promise(resolve => {
queue.process('jobA', async () => {
resolve();
return new Promise(resolve => {
setTimeout(() => {
processedA = true;
resolve();
}, 500);
});
});
});

queue.process('jobB', async () => {});

queue.add('jobA', {});

await startProcessing;

expect(processedA).to.be.eq(false);

await queue.close();

expect(processedA).to.be.eq(true);
});

it('processes several stalled jobs when starting several queues', function(done) {
this.timeout(50000);

Expand Down

0 comments on commit 79ce013

Please sign in to comment.