diff --git a/lib/queue.js b/lib/queue.js index e7b98ed36..bc4db4348 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -502,30 +502,18 @@ Queue.prototype.processStalledJob = function(job){ if(!job){ return Promise.resolve(); }else{ - // TODO: Make taking/releasing lock and sismemberAsync an atomic operation. - return job.takeLock(_this.token).then(function(lock){ - if(lock){ - var key = _this.toKey('completed'); - if(_this.closing){ - return _this.closing; - } - return _this.client.sismemberAsync(key, job.jobId).then(function(isMember){ - if(!isMember){ - _this.emit('stalled', job); - return _this.processJob(job, true); - } else { - return job.releaseLock(_this.token); - } - }).catch(function(err) { + scripts.getStalledJob(this, job, _this.token).then(function(isStalled){ + if(isStalled){ + _this.emit('stalled', job); + return _this.processJob(job, true).catch(function(err) { // Any uncaught error will come here. We'll ensure that the job lock is released and rethrow // the error so it bubbles normally if(_this.closing){ return _this.closing; } - job.releaseLock(_this.token); throw err; - }); + }) } }); } @@ -558,6 +546,14 @@ Queue.prototype.processJob = function(job, renew){ return Promise.resolve(); } + // + // TODO: + // There are two cases to take into consideration regarding locks. + // 1) The lock renewer fails to renew a lock, this should make this job + // unable to complete, since some other worker is also working on it. + // 2) The lock renewer is called more seldom than the check for stalled + // jobs, so we can assume the job has been stalled and is already being processed + // by another worker. See #308 var lockRenewer = function(){ lockRenewId = _this.timers.set('lockRenewer', _this.LOCK_RENEW_TIME / 2, lockRenewer); return job.takeLock(_this.token, renew).catch(function(err){ diff --git a/lib/scripts.js b/lib/scripts.js index 0d338f945..4052d2850 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -344,7 +344,33 @@ var scripts = { ]; return execScript.apply(scripts, args); - } + }, + + /** + * Gets a stalled job by locking it and checking it is not already completed. + * Returns a "OK" if the job was locked and not in completed set. + */ + getStalledJob: function(queue, job, token){ + var script = [ + 'if redis.call("sismember", KEYS[1], ARGV[1]) then', + ' return redis.call("set", KEYS[2], ARGV[2], "PX", ARGV[3], "NX")', + 'end', + 'return 0'].join('\n'); + + var args = [ + queue.client, + 'getStalledJob', + script, + 2, + queue.toKey('completed'), + job.lockKey(), + job.jobId, + token, + queue.LOCK_RENEW_TIME + ]; + + return execScript.apply(scripts, args); + }, }; /*