Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow custom job ids to be specified in the job options #335

Merged
merged 10 commits into from
Aug 22, 2016
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,10 @@ __Arguments__
of the queue instead of the left (default false)
opts.timeout {Number} The number of milliseconds after which the job
should be fail with a timeout error [optional]
opts.jobId {Number|String} Override the job ID - by default, the job ID is a
unique integer, but you can use this setting to override it. If you use this option,
it is up to you to ensure the jobId is unique. If you attempt to add a job with an
id that already exists, it will not be added.
returns {Promise} A promise that resolves when the job has been succesfully
added to the queue (or rejects if some error occured). On success, the promise
resolves to the new Job.
Expand Down
4 changes: 2 additions & 2 deletions lib/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var Job = function(queue, data, opts){
function addJob(queue, job){
var jobData = job.toData();
var toKey = _.bind(queue.toKey, queue);
return scripts.addJob(queue.client, toKey, job.opts.lifo, jobData);
return scripts.addJob(queue.client, toKey, jobData, { lifo: job.opts.lifo, customJobId: job.opts.jobId });
}

Job.create = function(queue, data, opts){
Expand All @@ -65,7 +65,7 @@ Job.fromId = function(queue, jobId){
}
return queue.client.hgetallAsync(queue.toKey(jobId)).then(function(jobData){
if(jobData){
return Job.fromData(queue, +jobId, jobData);
return Job.fromData(queue, jobId, jobData);
}else{
return jobData;
}
Expand Down
22 changes: 15 additions & 7 deletions lib/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ var scripts = {
return result === 1;
});
},
addJob: function(client, toKey, lifo, job){
addJob: function(client, toKey, job, opts){
opts = opts || {};
opts.lifo = !!(opts.lifo);

var jobArgs = _.flatten(_.toPairs(job));
Copy link
Member

@manast manast Aug 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since lifo is optional and customId also, I think it is more convenient to put it in an opts argument, and also move it to the last argument in the list:function(client, toKey, job, opts)


var keys = _.map(['wait', 'paused', 'meta-paused', 'jobs', 'id', 'delayed'], function(name){
Expand All @@ -74,28 +77,32 @@ var scripts = {
var baseKey = toKey('');

var argvs = _.map(jobArgs, function(arg, index){
return ', ARGV['+(index+2)+']';
return ', ARGV['+(index+3)+']';
})

var script = [
'local jobId = redis.call("INCR", KEYS[5])',
'redis.call("HMSET", ARGV[1] .. jobId' + argvs.join('') + ')',
'local jobCounter = redis.call("INCR", KEYS[5])',
'local jobId',
'if ARGV[2] == "" then jobId = jobCounter else jobId = ARGV[2] end',
'local jobIdKey = ARGV[1] .. jobId',
'if redis.call("EXISTS", jobIdKey) == 1 then return jobId end',
'redis.call("HMSET", jobIdKey' + argvs.join('') + ')',
];

var scriptName;

var delayTimestamp = job.timestamp + job.delay;
if(job.delay && delayTimestamp > Date.now()){
script.push.apply(script, [
' local timestamp = tonumber(ARGV[' + (argvs.length + 2) + ']) * 0x1000 + bit.band(jobId, 0xfff)',
' local timestamp = tonumber(ARGV[' + (argvs.length + 3) + ']) * 0x1000 + bit.band(jobCounter, 0xfff)',
' redis.call("ZADD", KEYS[6], timestamp, jobId)',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch since we use the jobCounter for the delay logic.

' redis.call("PUBLISH", KEYS[6], (timestamp / 0x1000))',
' return jobId',
]);

scriptName = 'addJob:delayed';
}else{
var push = (lifo ? 'R' : 'L') + 'PUSH';
var push = (opts.lifo ? 'R' : 'L') + 'PUSH';
//
// Whe check for the meta-paused key to decide if we are paused or not
// (since an empty list and !EXISTS are not really the same)
Expand All @@ -106,7 +113,7 @@ var scripts = {
' redis.call("' + push + '", KEYS[2], jobId)',
'end',
'redis.call("PUBLISH", KEYS[4], jobId)',
'return jobId'
'return jobId .. ""'
]);

scriptName = 'addJob'+push;
Expand All @@ -121,6 +128,7 @@ var scripts = {

args.push.apply(args, keys);
args.push(baseKey);
args.push(opts.customJobId || '');
args.push.apply(args, jobArgs);
args.push(delayTimestamp);

Expand Down
22 changes: 22 additions & 0 deletions test/test_job.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,28 @@ describe('Job', function(){
expect(storedJob.opts.testOpt).to.be('enabled');
});
});

it('should use the custom jobId if one is provided', function() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test case suggestion:

Create a job with a customId and test if it can be processed, in the process callback, verify that the custom jobId matches the one provided

var customJobId = 'customjob';
return Job.create(queue, data, { jobId: customJobId }).then(function(createdJob){
expect(createdJob.jobId).to.be.equal(customJobId);
});
});

it('should process jobs with custom jobIds', function(done) {
var customJobId = 'customjob';
queue.process(function () {
return Promise.resolve();
});

queue.add({ foo: 'bar' }, { jobId: customJobId });

queue.on('completed', function(job) {
if (job.opts.jobId == customJobId) {
done();
}
});
});
});

describe('.remove', function () {
Expand Down