Skip to content

Commit

Permalink
fix(obliterate): obliterate many jobs fixes #2016
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Apr 23, 2021
1 parent 2bc0b71 commit 7a923b4
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 45 deletions.
88 changes: 44 additions & 44 deletions lib/commands/obliterate-2.lua
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,42 @@
-- If the queue has currently active jobs then the script by default will return error,
-- however this behaviour can be overrided using the `force` option.
local maxCount = tonumber(ARGV[1])
local count = 0
local baseKey = KEYS[2]

local rcall = redis.call
local function getListItems(keyName)
return rcall('LRANGE', keyName, 0, -1)
local function getListItems(keyName, max)
return rcall('LRANGE', keyName, 0, max - 1)
end

local function getZSetItems(keyName)
return rcall('ZRANGE', keyName, 0, -1)
local function getZSetItems(keyName, max)
return rcall('ZRANGE', keyName, 0, max - 1)
end

local function getSetItems(keyName)
return rcall('SMEMBERS', keyName, 0, -1)
end

local function removeKeys(parentKey, keys)
local function removeJobs(parentKey, keys)
for i, key in ipairs(keys) do
if(count > maxCount) then
return true
end
rcall("DEL", baseKey .. key)
count = count + 1
end
rcall("DEL", parentKey)
return false
maxCount = maxCount - #keys
end

local function removeListJobs(keyName, max)
local jobs = getListItems(keyName, max)
removeJobs(keyName, jobs)
rcall("LTRIM", keyName, #jobs, -1)
end

local function removeZSetJobs(keyName, max)
local jobs = getZSetItems(keyName, max)
removeJobs(keyName, jobs)
if(#jobs > 0) then
rcall("ZREM", keyName, unpack(jobs))
end
end

local function removeLockKeys(keys)
for i, key in ipairs(keys) do
if(count > maxCount) then
return true
end
rcall("DEL", baseKey .. key .. ':lock')
count = count + 1
end
return false
end

-- 1) Check if paused, if not return with error.
Expand All @@ -63,52 +62,53 @@ end

-- 2) Check if there are active jobs, if there are and not "force" return error.
local activeKey = baseKey .. 'active'
local activeKeys = getListItems(activeKey)
if (#activeKeys > 0) then
local activeJobs = getListItems(activeKey, maxCount)
if (#activeJobs > 0) then
if(ARGV[2] == "") then
return -2 -- Error, ExistsActiveJobs
end
end

if(removeLockKeys(activeKeys)) then
return 1
end

if(removeKeys(activeKey, activeKeys)) then
removeLockKeys(activeJobs)
removeJobs(activeKey, activeJobs)
rcall("LTRIM", activeKey, #activeJobs, -1)
if(maxCount <= 0) then
return 1
end

local waitKey = baseKey .. 'paused'
if(removeKeys(waitKey, getListItems(waitKey))) then
removeListJobs(waitKey, maxCount)
if(maxCount <= 0) then
return 1
end

local delayedKey = baseKey .. 'delayed'
if(removeKeys(delayedKey, getZSetItems(delayedKey))) then
removeZSetJobs(delayedKey, maxCount)
if(maxCount <= 0) then
return 1
end

local completedKey = baseKey .. 'completed'
if(removeKeys(completedKey, getZSetItems(completedKey))) then
removeZSetJobs(completedKey, maxCount)
if(maxCount <= 0) then
return 1
end

local failedKey = baseKey .. 'failed'
if(removeKeys(failedKey, getZSetItems(failedKey))) then
removeZSetJobs(failedKey, maxCount)
if(maxCount <= 0) then
return 1
end

local waitKey = baseKey .. 'wait'
if(removeKeys(waitKey, getListItems(waitKey))) then
if(maxCount > 0) then
rcall("DEL", baseKey .. 'priority')
rcall("DEL", baseKey .. 'stalled-check')
rcall("DEL", baseKey .. 'stalled')
rcall("DEL", baseKey .. 'meta-paused')
rcall("DEL", baseKey .. 'meta')
rcall("DEL", baseKey .. 'id')
rcall("DEL", baseKey .. 'repeat')
return 0
else
return 1
end

rcall("DEL", baseKey .. 'priority')
rcall("DEL", baseKey .. 'stalled-check')
rcall("DEL", baseKey .. 'stalled')
rcall("DEL", baseKey .. 'meta-paused')
rcall("DEL", baseKey .. 'meta')
rcall("DEL", baseKey .. 'id')
rcall("DEL", baseKey .. 'repeat')

return 0
1 change: 1 addition & 0 deletions lib/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ const scripts = {
throw new Error('Cannot obliterate queue with active jobs');
}
}
return result;
});
}
};
Expand Down
47 changes: 46 additions & 1 deletion test/test_obliterate.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ describe('Obliterate', () => {
}
return delay(250);
});

await job.finished();

await queue.obliterate({ force: true });
const client = await queue.client;
const keys = await client.keys(`bull:${queue.name}*`);
Expand All @@ -123,4 +123,49 @@ describe('Obliterate', () => {
const keys = await client.keys(`bull:${queue.name}:*`);
expect(keys.length).to.be.eql(0);
});

it('should obliterate a queue with high number of jobs in different statuses', async () => {
const arr1 = [];
for (let i = 0; i < 300; i++) {
arr1.push(queue.add({ foo: `barLoop${i}` }));
}

const [lastCompletedJob] = (await Promise.all(arr1)).splice(-1);

let fail = false;
queue.process(async () => {
if (fail) {
throw new Error('failed job');
}
});

await lastCompletedJob.finished();

fail = true;

const arr2 = [];
for (let i = 0; i < 300; i++) {
arr2.push(queue.add({ foo: `barLoop${i}` }));
}

const [lastFailedJob] = (await Promise.all(arr2)).splice(-1);

try {
await lastFailedJob.finished();
expect(true).to.be.equal(false);
} catch (err) {
expect(true).to.be.equal(true);
}

const arr3 = [];
for (let i = 0; i < 1623; i++) {
arr3.push(queue.add({ foo: `barLoop${i}` }, { delay: 10000 }));
}
await Promise.all(arr3);

await queue.obliterate();
const client = await queue.client;
const keys = await client.keys(`bull:${queue.name}*`);
expect(keys.length).to.be.eql(0);
}).timeout(20000);
});

0 comments on commit 7a923b4

Please sign in to comment.