Skip to content

Commit

Permalink
perf: speed up clean operation
Browse files Browse the repository at this point in the history
The clean operation on sets backed by lists (wait, active, paused)
quickly gets very slow when the list is large. This is because each job
deletion scans the whole list in a LREM call, resulting in O(N * M)
complexity where N is the number of jobs in the list and M is the number
of jobs to delete.

With this change, the deletion is done in two passes. The first pass
sets each item that should be deleted to a special value. The second
pass deletes all items with that special value in a single LREM call.
This results in O(N) complexity.
  • Loading branch information
emcsween committed Mar 18, 2022
1 parent f163022 commit 85d803b
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
17 changes: 15 additions & 2 deletions lib/commands/cleanJobsInSet-3.lua
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ local jobIds = rcall(command, setKey, rangeStart, rangeEnd)
local deleted = {}
local deletedCount = 0
local jobTS
local deletionMarker

-- Run this loop:
-- - Once, if limit is -1 or 0
-- - As many times as needed if limit is positive
while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do
for _, jobId in ipairs(jobIds) do
local jobIdsLen = #jobIds
for i, jobId in ipairs(jobIds) do
if limit > 0 and deletedCount >= limit then
break
end
Expand All @@ -63,7 +65,14 @@ while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do
jobTS = rcall("HGET", jobKey, "timestamp")
if (not jobTS or jobTS < maxTimestamp) then
if isList then
rcall("LREM", setKey, 0, jobId)
if deletionMarker == nil then
-- we could use anything as a deletion marker; it's convenient to
-- use a job id that we know we're going to delete
deletionMarker = jobId
end
-- replace the entry with a deletion marker; the actual deletion will
-- occur at the end of the script
rcall("LSET", setKey, rangeEnd - jobIdsLen + i, deletionMarker)
else
rcall("ZREM", setKey, jobId)
end
Expand Down Expand Up @@ -101,4 +110,8 @@ while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do
end
end

if isList and deletionMarker ~= nil then
rcall("LREM", setKey, 0, deletionMarker)
end

return deleted
9 changes: 9 additions & 0 deletions test/test_queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -2983,6 +2983,15 @@ describe('Queue', () => {
});
});

it('should succeed when the limit is higher than the actual number of jobs', async () => {
await queue.add({ some: 'data' });
await queue.add({ some: 'data' });
const deletedJobs = await queue.clean(0, 'wait', 100);
expect(deletedJobs).to.have.length(2);
const remainingJobsCount = await queue.count();
expect(remainingJobsCount).to.be.eql(0);
});

it("should clean the number of jobs requested even if first jobs timestamp doesn't match", async () => {
// This job shouldn't get deleted due to the 5000 grace
await queue.add({ some: 'data' });
Expand Down

0 comments on commit 85d803b

Please sign in to comment.