Skip to content
This repository has been archived by the owner on Feb 7, 2023. It is now read-only.

Commit

Permalink
Merge in PR OptimalBits#358
Browse files Browse the repository at this point in the history
  • Loading branch information
bradvogel committed Oct 15, 2016
1 parent 1bfef20 commit fcab0d8
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 94 deletions.
53 changes: 27 additions & 26 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ var Queue = function Queue(name, redisPort, redisHost, redisOptions, queueOption
// Bind these methods to avoid constant rebinding and/or creating closures
// in processJobs etc.
this.processStalledJobs = this.processStalledJobs.bind(this);
this.processStalledJob = this.processStalledJob.bind(this);
this.getNextJob = this.getNextJob.bind(this);
this.processJobs = this.processJobs.bind(this);
this.processJob = this.processJob.bind(this);
Expand Down Expand Up @@ -489,44 +488,46 @@ Queue.prototype.updateDelayTimer = function(newDelayedTimestamp){

/**
* Process jobs that have been added to the active list but are not being
* processed properly.
* processed properly. This can happen due to a process crash in the middle
* of processing a job, leaving it in 'active' but without a job lock.
* Note that there is no way to know for _certain_ if a job is "stalled"
* (since the process that moved it to active might just be slow to get the
* lock on it), so this function takes a grace period parameter to ignore
* jobs that were just created.
*
* @param {Number?} limit Only process this many number of jobs. Greater than 1, otherwise -1
* @param {Number?} limit Only consider this number of jobs to process. Greater than 1, otherwise -1
* @param {Number?} grace Duration in milliseconds. Ignore jobs created since this many milliseconds ago.
* Defaults to LOCK_RENEW_TIME.
*/
Queue.prototype.processStalledJobs = function(limit){
Queue.prototype.processStalledJobs = function(limit, grace){
var _this = this;
limit = limit > 0 ? limit - 1 : -1;

if (_.isUndefined(limit)) limit = -1;
grace = grace || this.LOCK_RENEW_TIME;

if (limit === 0) return;

if(this.closing){
return this.closing;
} else{
return this.client.lrangeAsync(this.toKey('active'), 0, limit).then(function(jobs){
return Promise.each(jobs, function(jobId) {
return Job.fromId(_this, jobId).then(_this.processStalledJob);
});
return scripts.getStalledJob(this, this.token, limit, grace).then(function(jobId){
if (!jobId) return;

return Job.fromId(_this, jobId)
.then(function(job){
_this.emit('stalled', job);
return _this.processJob(job, true /* Renew the lock */);
})
.then(function(){
// Run recursively.
return _this.processStalledJobs(--limit, grace);
});
}).catch(function(err){
console.error(err);
});
}
};

Queue.prototype.processStalledJob = function(job){
var _this = this;
if(this.closing){
return this.closing;
}

if(!job){
return Promise.resolve();
}else{
return scripts.getStalledJob(this, job, _this.token).then(function(isStalled){
if(isStalled){
_this.emit('stalled', job);
return _this.processJob(job, true);
}
});
}
};

Queue.prototype.processJobs = function(resolve, reject){
var _this = this;
Expand Down
84 changes: 16 additions & 68 deletions lib/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -355,24 +355,30 @@ var scripts = {
},

/**
* Gets a stalled job by locking it and checking it is not already completed.
* Returns a "OK" if the job was locked and not in completed set.
* Returns the next (possible) stalled job in the queue.
*/
getStalledJob: function(queue, job, token){
getStalledJob: function(queue, token, grace){
var script = [
'if redis.call("sismember", KEYS[1], ARGV[1]) == 0 then',
' return redis.call("set", KEYS[2], ARGV[2], "PX", ARGV[3], "NX")',
'local activeJobs = redis.call("LRANGE", KEYS[1], 0, -1)',
'for _, job in ipairs(activeJobs) do',
' local jobKey = ARGV[2] .. job',
' local jobTS = redis.call("HGET", jobKey, "timestamp")',
' if(jobTS and jobTS < ARGV[1]) then',
' if(redis.call("SET", jobKey .. ":lock", ARGV[3], "PX", ARGV[4], "NX")) then',
' return job',
' end',
' end',
'end',
'return 0'].join('\n');
].join('\n');

var args = [
queue.client,
'getStalledJob',
script,
2,
queue.toKey('completed'),
job.lockKey(),
job.jobId,
1,
queue.toKey('active'),
Date.now() - grace,
queue.toKey(''),
token,
queue.LOCK_RENEW_TIME
];
Expand Down Expand Up @@ -451,64 +457,6 @@ var scripts = {
limit
];

return execScript.apply(scripts, args);
},

/**
* Attempts to retry a job
*
* @param {Job} job
*
* @return {Promise<Number>} Returns a promise that evaluates to a return code:
* 1 means the operation was a success
* 0 means the job does not exist
* -1 means the job is currently locked and can't be retried.
* -2 means the job was not found in the `failed` set
*/
retryJob: function(job) {
var push = (job.opts.lifo ? 'R' : 'L') + 'PUSH';

var script = [
'if (redis.call("EXISTS", KEYS[1]) == 1) then',
' if (redis.call("EXISTS", KEYS[2]) == 0) then',
' if (redis.call("SREM", KEYS[3], ARGV[1]) == 1) then',
' redis.call("' + push + '", KEYS[4], ARGV[1])',
' redis.call("PUBLISH", KEYS[5], ARGV[1])',
' return 1',
' else',
' return -2',
' end',
' else',
' return -1',
' end',
'else',
' return 0',
'end'
].join('\n');

var queue = job.queue;

var keys = [
queue.toKey(job.jobId),
queue.toKey(job.jobId) + ':lock',
queue.toKey('failed'),
queue.toKey('wait'),
queue.toKey('jobs')
];

var args = [
queue.client,
'retryJob',
script,
5,
keys[0],
keys[1],
keys[2],
keys[3],
keys[4],
job.jobId
];

return execScript.apply(scripts, args);
}
};
Expand Down

0 comments on commit fcab0d8

Please sign in to comment.