Skip to content

Commit

Permalink
fix(sandbox): broken processor files should fail jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
manast authored Feb 6, 2022
1 parent 27ca17b commit dd0b853
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 19 deletions.
25 changes: 16 additions & 9 deletions lib/process/child-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,14 @@ ChildPool.prototype.retain = function(processFile) {

child.on('exit', _this.remove.bind(_this, child));

return initChild(child, child.processFile).then(() => {
return child;
});
return initChild(child, child.processFile)
.then(() => {
return child;
})
.catch(err => {
this.remove(child);
throw new Error(err);
});
});
};

Expand Down Expand Up @@ -106,12 +111,14 @@ ChildPool.prototype.getAllFree = function() {
};

async function initChild(child, processFile) {
const onComplete = new Promise(resolve => {
const onComplete = new Promise((resolve, reject) => {
const onMessageHandler = msg => {
if (msg.cmd === 'init-complete') {
resolve();
child.off('message', onMessageHandler);
} else {
reject(msg.value);
}
child.off('message', onMessageHandler);
};
child.on('message', onMessageHandler);
});
Expand All @@ -122,11 +129,11 @@ async function initChild(child, processFile) {
await onComplete;
}
function ChildPoolSingleton(isSharedChildPool = false) {
if(isSharedChildPool === false) {
if (isSharedChildPool === false) {
return new ChildPool();
}
else if (
(!(this instanceof ChildPool) && ChildPoolSingleton.instance === undefined)
} else if (
!(this instanceof ChildPool) &&
ChildPoolSingleton.instance === undefined
) {
ChildPoolSingleton.instance = new ChildPool();
}
Expand Down
13 changes: 11 additions & 2 deletions lib/process/master.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,16 @@ process.on('SIGINT', waitForCurrentJobAndExit);
process.on('message', msg => {
switch (msg.cmd) {
case 'init':
processor = require(msg.value);
try {
processor = require(msg.value);
} catch (err) {
status = 'Errored';
return process.send({
cmd: 'error',
value: `Error loading process file ${msg.value}. ${err.toString()}`
});
}

if (processor.default) {
// support es2015 module.
processor = processor.default;
Expand Down Expand Up @@ -119,7 +128,7 @@ process.on('uncaughtException', err => {
cmd: 'failed',
value: err
});

// An uncaughException leaves this process in a potentially undetermined state so
// we must exit
process.exit(-1);
Expand Down
3 changes: 2 additions & 1 deletion lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ const Queue = function Queue(name, url, opts) {
retryProcessDelay: 5000,
drainDelay: 5,
backoffStrategies: {},
isSharedChildPool: false,
isSharedChildPool: false
});

this.settings.lockRenewTime =
Expand Down Expand Up @@ -285,6 +285,7 @@ function redisClientGetter(queue, options, initCallback) {
}
const clientOptions = _.assign({}, options.redis);
clientOptions.connectionName = this.clientName();

const client = (connections[type] = createClient(type, clientOptions));

const opts = client.options.redisOptions || client.options;
Expand Down
2 changes: 2 additions & 0 deletions test/fixtures/fixture_processor_broken.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
'use strict';
throw new Error('Broken file processor');
25 changes: 18 additions & 7 deletions test/test_sandboxed_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -462,27 +462,38 @@ describe('sandboxed process', () => {
]);

const processFile = __dirname + '/fixtures/fixture_processor.js';
queueA.process(processFile)
queueB.process(processFile)
queueA.process(processFile);
queueB.process(processFile);

await Promise.all([queueA.add(), queueB.add()]);


expect(queueA.childPool).to.be.eql(queueB.childPool);
});

it('should not share childPool across different queues if isSharedChildPool isn\'t specified', async () => {
it("should not share childPool across different queues if isSharedChildPool isn't specified", async () => {
const [queueA, queueB] = await Promise.all([
utils.newQueue('queueA', { settings: { isSharedChildPool: false } }),
utils.newQueue('queueB')
]);

const processFile = __dirname + '/fixtures/fixture_processor.js';
queueA.process(processFile)
queueB.process(processFile)
queueA.process(processFile);
queueB.process(processFile);

await Promise.all([queueA.add(), queueB.add()]);

expect(queueA.childPool).to.not.be.equal(queueB.childPool);
})
});

it('should fail if the process file is broken', async () => {
const processFile = __dirname + '/fixtures/fixture_processor_broken.js';
queue.process(processFile);
await queue.add('test', {});

return new Promise(resolve => {
queue.on('failed', () => {
resolve();
});
});
});
});

0 comments on commit dd0b853

Please sign in to comment.