Skip to content

Commit

Permalink
atomized the code for getting a stalled job
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Jun 16, 2016
1 parent dbacc69 commit 5429778
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 18 deletions.
30 changes: 13 additions & 17 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -502,30 +502,18 @@ Queue.prototype.processStalledJob = function(job){
if(!job){
return Promise.resolve();
}else{
// TODO: Make taking/releasing lock and sismemberAsync an atomic operation.
return job.takeLock(_this.token).then(function(lock){
if(lock){
var key = _this.toKey('completed');
if(_this.closing){
return _this.closing;
}
return _this.client.sismemberAsync(key, job.jobId).then(function(isMember){
if(!isMember){
_this.emit('stalled', job);
return _this.processJob(job, true);
} else {
return job.releaseLock(_this.token);
}
}).catch(function(err) {
scripts.getStalledJob(this, job, _this.token).then(function(isStalled){
if(isStalled){
_this.emit('stalled', job);
return _this.processJob(job, true).catch(function(err) {
// Any uncaught error will come here. We'll ensure that the job lock is released and rethrow
// the error so it bubbles normally
if(_this.closing){
return _this.closing;
}

job.releaseLock(_this.token);
throw err;
});
})
}
});
}
Expand Down Expand Up @@ -558,6 +546,14 @@ Queue.prototype.processJob = function(job, renew){
return Promise.resolve();
}

//
// TODO:
// There are two cases to take into consideration regarding locks.
// 1) The lock renewer fails to renew a lock, this should make this job
// unable to complete, since some other worker is also working on it.
// 2) The lock renewer is called more seldom than the check for stalled
// jobs, so we can assume the job has been stalled and is already being processed
// by another worker. See #308
var lockRenewer = function(){
lockRenewId = _this.timers.set('lockRenewer', _this.LOCK_RENEW_TIME / 2, lockRenewer);
return job.takeLock(_this.token, renew).catch(function(err){
Expand Down
28 changes: 27 additions & 1 deletion lib/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,33 @@ var scripts = {
];

return execScript.apply(scripts, args);
}
},

/**
* Gets a stalled job by locking it and checking it is not already completed.
* Returns a "OK" if the job was locked and not in completed set.
*/
getStalledJob: function(queue, job, token){
var script = [
'if redis.call("sismember", KEYS[1], ARGV[1]) then',
' return redis.call("set", KEYS[2], ARGV[2], "PX", ARGV[3], "NX")',
'end',
'return 0'].join('\n');

var args = [
queue.client,
'getStalledJob',
script,
2,
queue.toKey('completed'),
job.lockKey(),
job.jobId,
token,
queue.LOCK_RENEW_TIME
];

return execScript.apply(scripts, args);
},
};

/*
Expand Down

0 comments on commit 5429778

Please sign in to comment.