From 63636b181d0a166b1702059e5abb53ce1589d90f Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Mon, 20 May 2024 22:58:34 -0600 Subject: [PATCH] fix(scripts): throw error when moving non-active job to delayed (#2740) --- lib/commands/includes/removeLock.lua | 19 +++++++++++++++++++ lib/commands/moveToDelayed-4.lua | 19 +++++++++---------- lib/job.js | 2 +- lib/scripts.js | 8 ++++++-- test/test_job.js | 11 +++++++++-- 5 files changed, 44 insertions(+), 15 deletions(-) create mode 100644 lib/commands/includes/removeLock.lua diff --git a/lib/commands/includes/removeLock.lua b/lib/commands/includes/removeLock.lua new file mode 100644 index 000000000..d7335f635 --- /dev/null +++ b/lib/commands/includes/removeLock.lua @@ -0,0 +1,19 @@ +local function removeLock(jobKey, stalledKey, token, jobId) + if token ~= "0" then + local lockKey = jobKey .. ':lock' + local lockToken = rcall("GET", lockKey) + if lockToken == token then + rcall("DEL", lockKey) + rcall("SREM", stalledKey, jobId) + else + if lockToken then + -- Lock exists but token does not match + return -6 + else + -- Lock is missing completely + return -2 + end + end + end + return 0 +end diff --git a/lib/commands/moveToDelayed-4.lua b/lib/commands/moveToDelayed-4.lua index bc8e09b51..f7c57d99e 100644 --- a/lib/commands/moveToDelayed-4.lua +++ b/lib/commands/moveToDelayed-4.lua @@ -21,22 +21,21 @@ ]] local rcall = redis.call +-- Includes +--- @include "includes/removeLock" + if rcall("EXISTS", KEYS[3]) == 1 then - -- Check for job lock - if ARGV[3] ~= "0" then - local lockKey = KEYS[3] .. ':lock' - if rcall("GET", lockKey) == ARGV[3] then - rcall("DEL", lockKey) - rcall("SREM", KEYS[4], ARGV[2]) - else - return -2 - end + local errorCode = removeLock(KEYS[3], KEYS[4], ARGV[3], ARGV[2]) + if errorCode < 0 then + return errorCode end + local numRemovedElements = rcall("LREM", KEYS[1], -1, ARGV[2]) + if numRemovedElements < 1 then return -3 end + local score = tonumber(ARGV[1]) rcall("ZADD", KEYS[2], score, ARGV[2]) rcall("PUBLISH", KEYS[2], (score / 0x1000)) - rcall("LREM", KEYS[1], 0, ARGV[2]) return 0 else diff --git a/lib/job.js b/lib/job.js index 8bf21075a..18b7e03dc 100644 --- a/lib/job.js +++ b/lib/job.js @@ -339,7 +339,7 @@ Job.prototype.moveToFailed = async function(err, ignoreLock) { const results = await multi.exec(); const code = _.last(results)[1]; if (code < 0) { - throw scripts.finishedErrors(code, this.id, command); + throw scripts.finishedErrors(code, this.id, command, 'active'); } }; diff --git a/lib/scripts.js b/lib/scripts.js index 714a62cd5..3436626f8 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -238,7 +238,7 @@ const scripts = { ); return job.queue.client.moveToFinished(args).then(result => { if (result < 0) { - throw scripts.finishedErrors(result, job.id, 'finished'); + throw scripts.finishedErrors(result, job.id, 'finished', 'active'); } else if (result) { return raw2jobData(result); } @@ -246,12 +246,16 @@ const scripts = { }); }, - finishedErrors(code, jobId, command) { + finishedErrors(code, jobId, command, state) { switch (code) { case -1: return new Error('Missing key for job ' + jobId + ' ' + command); case -2: return new Error('Missing lock for job ' + jobId + ' ' + command); + case -3: + return new Error(`Job ${jobId} is not in the ${state} state. ${command}`); + case -6: + return new Error(`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`); } }, diff --git a/test/test_job.js b/test/test_job.js index ec538fd9e..76cb729dc 100644 --- a/test/test_job.js +++ b/test/test_job.js @@ -726,6 +726,9 @@ describe('Job', () => { .then(isFailed => { expect(isFailed).to.be(false); }) + .then(() => { + return scripts.moveToActive(queue); + }) .then(() => { return job.moveToFailed(new Error('test error'), true); }) @@ -893,7 +896,9 @@ describe('Job', () => { }) .then(state => { expect(state).to.be('completed'); - return client.zrem(queue.toKey('completed'), job.id); + return client.zrem(queue.toKey('completed'), job.id).then(()=>{ + return client.lpush(queue.toKey('active'), job.id) + }); }) .then(() => { return job.moveToDelayed(Date.now() + 10000, true); @@ -907,7 +912,9 @@ describe('Job', () => { }) .then(state => { expect(state).to.be('delayed'); - return client.zrem(queue.toKey('delayed'), job.id); + return client.zrem(queue.toKey('delayed'), job.id).then(()=>{ + return client.lpush(queue.toKey('active'), job.id) + }); }) .then(() => { return job.moveToFailed(new Error('test'), true);