diff --git a/lib/commands/addJob-6.lua b/lib/commands/addJob-6.lua index 9288f0b67..dda504810 100644 --- a/lib/commands/addJob-6.lua +++ b/lib/commands/addJob-6.lua @@ -38,6 +38,10 @@ local jobId local jobIdKey local rcall = redis.call +-- Includes +--- @include "includes/addJobWithPriority" +--- @include "includes/getTargetQueueList" + local jobCounter = rcall("INCR", KEYS[4]) if ARGV[2] == "" then @@ -67,14 +71,7 @@ else -- Whe check for the meta-paused key to decide if we are paused or not -- (since an empty list and !EXISTS are not really the same) - local paused - if rcall("EXISTS", KEYS[3]) ~= 1 then - target = KEYS[1] - paused = false - else - target = KEYS[2] - paused = true - end + local target, paused = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2]) -- Standard or priority add local priority = tonumber(ARGV[9]) @@ -82,18 +79,7 @@ else -- LIFO or FIFO rcall(ARGV[10], target, jobId) else - -- Priority add - rcall("ZADD", KEYS[6], priority, jobId) - local count = rcall("ZCOUNT", KEYS[6], 0, priority) - - local len = rcall("LLEN", target) - local id = rcall("LINDEX", target, len - (count-1)) - if id then - rcall("LINSERT", target, "BEFORE", id, jobId) - else - rcall("RPUSH", target, jobId) - end - + addJobWithPriority(KEYS[6], priority, jobId, target) end -- Emit waiting event (wait..ing@token) diff --git a/lib/commands/includes/batches.lua b/lib/commands/includes/batches.lua new file mode 100644 index 000000000..b8e6d4806 --- /dev/null +++ b/lib/commands/includes/batches.lua @@ -0,0 +1,18 @@ +--[[ + Function to loop in batches. + Just a bit of warning, some commands as ZREM + could receive a maximum of 7000 parameters per call. +]] + +local function batches(n, batchSize) + local i = 0 + + return function() + local from = i * batchSize + 1 + i = i + 1 + if (from <= n) then + local to = math.min(from + batchSize - 1, n) + return from, to + end + end +end diff --git a/lib/commands/moveStalledJobsToWait-7.lua b/lib/commands/moveStalledJobsToWait-7.lua index 672020705..651cb105d 100644 --- a/lib/commands/moveStalledJobsToWait-7.lua +++ b/lib/commands/moveStalledJobsToWait-7.lua @@ -22,6 +22,10 @@ local rcall = redis.call +-- Includes +--- @include "includes/batches" +--- @include "includes/getTargetQueueList" + local function removeJob(jobId, baseKey) local jobKey = baseKey .. jobId rcall("DEL", jobKey, jobKey .. ':logs') @@ -45,19 +49,6 @@ local function removeJobsByMaxCount(maxCount, targetSet, prefix) rcall("ZREMRANGEBYRANK", targetSet, 0, -(maxCount + 1)) end -local function batches(n, batchSize) - local i = 0 - - return function() - local from = i * batchSize + 1 - i = i + 1 - if (from <= n) then - local to = math.min(from + batchSize - 1, n) - return from, to - end - end -end - -- Check if we need to check for stalled jobs now. if rcall("EXISTS", KEYS[5]) == 1 then return {{}, {}} @@ -70,15 +61,6 @@ local stalling = rcall('SMEMBERS', KEYS[1]) local stalled = {} local failed = {} if(#stalling > 0) then - - local dst - -- wait or paused destination - if rcall("EXISTS", KEYS[6]) ~= 1 then - dst = KEYS[2] - else - dst = KEYS[7] - end - rcall('DEL', KEYS[1]) local MAX_STALLED_JOB_COUNT = tonumber(ARGV[1]) @@ -129,8 +111,10 @@ if(#stalling > 0) then table.insert(failed, jobId) else + local target = getTargetQueueList(KEYS[6], KEYS[2], KEYS[7]) + -- Move the job back to the wait queue, to immediately be picked up by a waiting worker. - rcall("RPUSH", dst, jobId) + rcall("RPUSH", target, jobId) rcall('PUBLISH', KEYS[1] .. '@', jobId) table.insert(stalled, jobId) end diff --git a/lib/commands/moveToFinished-9.lua b/lib/commands/moveToFinished-9.lua index d7e5ba436..49d37e82e 100644 --- a/lib/commands/moveToFinished-9.lua +++ b/lib/commands/moveToFinished-9.lua @@ -92,8 +92,10 @@ if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists end end - -- Remove from active list - rcall("LREM", KEYS[1], -1, ARGV[1]) + -- Remove from active list (if not active we shall return error) + local numRemovedElements = rcall("LREM", KEYS[1], -1, ARGV[1]) + + if numRemovedElements < 1 then return -3 end -- Remove job? local keepJobs = cmsgpack.unpack(ARGV[6]) diff --git a/lib/commands/retryJobs-5.lua b/lib/commands/retryJobs-5.lua index 75f7d5ba9..e74faf426 100644 --- a/lib/commands/retryJobs-5.lua +++ b/lib/commands/retryJobs-5.lua @@ -19,18 +19,8 @@ local maxCount = tonumber(ARGV[1]) local rcall = redis.call; -local function batches(n, batchSize) - local i = 0 - - return function() - local from = i * batchSize + 1 - i = i + 1 - if (from <= n) then - local to = math.min(from + batchSize - 1, n) - return from, to - end - end -end +-- Includes +--- @include "includes/batches" local function getZSetItems(keyName, max) return rcall('ZRANGE', keyName, 0, max - 1) diff --git a/lib/commands/updateDelaySet-6.lua b/lib/commands/updateDelaySet-6.lua index 7fb0e152f..486ea86b6 100644 --- a/lib/commands/updateDelaySet-6.lua +++ b/lib/commands/updateDelaySet-6.lua @@ -20,6 +20,10 @@ ]] local rcall = redis.call; +-- Includes +--- @include "includes/addJobWithPriority" +--- @include "includes/getTargetQueueList" + -- Try to get as much as 1000 jobs at once local jobs = rcall("ZRANGEBYSCORE", KEYS[1], 0, tonumber(ARGV[2]) * 0x1000, "LIMIT", 0, 1000) @@ -27,12 +31,7 @@ if(#jobs > 0) then rcall("ZREM", KEYS[1], unpack(jobs)) -- check if we need to use push in paused instead of waiting - local target; - if rcall("EXISTS", KEYS[6]) ~= 1 then - target = KEYS[3] - else - target = KEYS[5] - end + local target = getTargetQueueList(KEYS[6], KEYS[3], KEYS[5]) for _, jobId in ipairs(jobs) do -- Is this really needed? @@ -44,17 +43,7 @@ if(#jobs > 0) then -- LIFO or FIFO rcall("LPUSH", target, jobId) else - -- Priority add - rcall("ZADD", KEYS[4], priority, jobId) - local count = rcall("ZCOUNT", KEYS[4], 0, priority) - - local len = rcall("LLEN", target) - local id = rcall("LINDEX", target, len - (count-1)) - if id then - rcall("LINSERT", target, "BEFORE", id, jobId) - else - rcall("RPUSH", target, jobId) - end + addJobWithPriority(KEYS[4], priority, jobId, target) end -- Emit waiting event (wait..ing@token) diff --git a/test/test_job.js b/test/test_job.js index 76cb729dc..8b0210ce4 100644 --- a/test/test_job.js +++ b/test/test_job.js @@ -608,12 +608,15 @@ describe('Job', () => { describe('.moveToCompleted', () => { it('marks the job as completed and returns new job', () => { return Job.create(queue, { foo: 'bar' }).then(job1 => { - return Job.create(queue, { foo: 'bar' }).then(job2 => { + return Job.create(queue, { foo: 'bar' }, { lifo: true }).then(job2 => { return job2 .isCompleted() .then(isCompleted => { expect(isCompleted).to.be(false); }) + .then(() => { + return scripts.moveToActive(queue); + }) .then(() => { return job2.moveToCompleted('succeeded', true); }) @@ -637,6 +640,9 @@ describe('Job', () => { .then(isFailed => { expect(isFailed).to.be(false); }) + .then(() => { + return scripts.moveToActive(queue); + }) .then(() => { return job.moveToFailed(new Error('test error'), true); }) @@ -702,6 +708,9 @@ describe('Job', () => { .then(isFailed => { expect(isFailed).to.be(false); }) + .then(() => { + return scripts.moveToActive(queue); + }) .then(() => { return job.moveToFailed(new Error('test error'), true); }) @@ -747,16 +756,22 @@ describe('Job', () => { it('applies stacktrace limit on failure', () => { const stackTraceLimit = 1; - return Job.create(queue, { foo: 'bar' }, { stackTraceLimit }).then( + return Job.create(queue, { foo: 'bar' }, { stackTraceLimit, attempts: 2 }).then( job => { return job .isFailed() .then(isFailed => { expect(isFailed).to.be(false); }) + .then(() => { + return scripts.moveToActive(queue); + }) .then(() => { return job.moveToFailed(new Error('test error'), true); }) + .then(() => { + return scripts.moveToActive(queue); + }) .then(() => { return job .moveToFailed(new Error('test error'), true)