Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite the handling of process stalled jobs to be atomic so it doesn… #359

Merged
merged 16 commits into from
Oct 26, 2016

Conversation

bradvogel
Copy link
Contributor

@bradvogel bradvogel commented Oct 15, 2016

…'t double-process jobs. See #356 for more on how this can happen.

Additionally, this addresses long-standing issue where a job can be considered "stalled" even though it was just moved to active and before a lock could be obtained by the worker that moved it (#258), by waiting a grace period (with a default of LOCK_RENEW_TIME) before considering a job as possibly stalled. This gives the (real) worker time to acquire its lock after moving it to active.

…'t double-process jobs. See OptimalBits#356 for more on how this can happen.

Additionally, this addresses long-standing issue where a job can be considered "stalled" even though it was just moved to active and before a lock could be obtained by the worker that moved it (OptimalBits#258), by waiting a grace period (with a default of LOCK_RENEW_TIME) before considering a job as possibly stalled. This gives the (real) worker time to acquire its lock after moving it to active.

Note that this includes a small API change: the 'stalled' event is now passed an array of events.
return Job.fromId(_this, jobId).then(_this.processStalledJob);
});
return scripts.processStalledJobs(this, grace).then(function(jobs){
_this.emit('stalled', jobs);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should only emit this event if jobs.length > 0

' local jobKey = ARGV[2] .. job',
' if(redis.call("EXISTS", jobKey .. ":lock") == 0) then',
' local jobTS = redis.call("HGET", jobKey, "timestamp")',
' if(jobTS and jobTS < ARGV[1]) then',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about this. Isn't timestamp the time when the job was inserted inte the queue? Wouldn't the timestamp that we are interested in be the one that represents the moment in time where the job was moved to active? (I see a chicken and egg problem here already unless I am missing something...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right. This was only a hack to cover the majority case: where jobs were just created and about to be processed by the worker that moved.

@manast
Copy link
Member

manast commented Oct 15, 2016

I did a brief code review and also some thinking about this issue. Firstly, I think we need to properly define what a "stalled" job is. For me it is a job that has started to be processed, but for some reason the worker did not manage to keep the lock until the job finally completed or failed.

So the question is this, what does it matter if the worker that moved the job from any queue to the active list actually is the one stating the job? If there is another worker that was faster in starting the job from the active queue, then that job should not be considered stalled at all since it has never been started. So what we need to take care of is that independently who moved the job to the active list the job is only processed once, and that it is re-processed if it gets stalled.

We should probably rethink the way we process jobs and divide it in two separate parts, moving the job to active and the actual processing the job. It is just a happy "coincidence" that the worker that moves the job is often the one able to starts it faster than anybody else, but if somebody else does it, it should also be fine. When a job starts, and lock has been taken, the worker could mark the job as started (it could be done atomically when getting the lock), if in the future that job is picked by another worker because that job is in active and not locked anymore, then it can test if the job has the "started" status flag on, and if so then it knows it was stalled and can emit the stalled event, which in this case will be completely right.
What do you think?

@bradvogel
Copy link
Contributor Author

bradvogel commented Oct 15, 2016

Yes, I agree that the definition of 'stalled' should be "has been started once". So there are two types of jobs this function should be concerned with:

  1. Jobs that another worker moves from wait to active, and then dies before it can get the job lock.
  2. Jobs that another worker moves from wait to active, gets the lock, and then it dies while processing the job.

I agree that we should only emit the 'stalled' event for type (2), and to distinguish between them I can add a started flag to the job data. I'd also like to continue the strategy in this PR of putting type (2) jobs back in the wait queue so other workers can share the load in processing them (our real world-use case is that when a server dies, it will leave 10k+ jobs stalled, which is too much for any one worker to process serially).

But if I were to remove the grace period, then this will pick up many jobs that fit (1) and those jobs will just get bounced back wait again, and then the cycle can continue and those jobs might never get processed. How do you propose we work around that? Perhaps if we know the job is of type (1), we immediately process it, and if it's type (2) we put it back in wait?

@manast
Copy link
Member

manast commented Oct 16, 2016

I like the idea of moving back to wait. That means that we have 2 different tasks, one (task A) is to process the jobs, another (task B) is to check in the active list for jobs that may have stalled and put them back to the wait queue.

We can assume that the normal case (lets say 99% of cases or more) is that a job is moved from wait to active and then it starts to be processed by that same worker.

The remaining case is that some task B finds a job in the active queue that is not locked it should move it to wait, in the hope that it will processed later, it could be moved to the beginning of the queue, since that job has already been in active and it would be unfair to wait for the whole queue before it can be processed again.

It could also happen that task B finds a job that is not locked but that has its status === 'started', in that case it also moves to wait but also emits the stalled event.

The only remaining question that I can think of right now is if it is possible to end in an endless loop where a task is moved from active to wait for ever. If we have a counter that atomically counts when the task is moved back to wait, then we could also fail the job if that counter has exceeded a maximum configurable value with a proper reason.

Does this sound reasonable to you?

@manast
Copy link
Member

manast commented Oct 16, 2016

and there is the degenerate case where the worker that moved from wait to active did not manage to start the job faster than task B, which would result on the worker being moved back to wait for no reason. This degenerate case should happen seldom enough to not being a big problem. And maybe in this case we should not increase the counter, although there is a very small risk that we could end in a endless loop.

…job in the active queue on every cycle (looking at the oldest active jobs first).
@bradvogel
Copy link
Contributor Author

Just pushed another proposed change.

After thinking about it, you're right that it doesn't matter if the worker who processes the active job isn't the worker who moved it to 'active'. We just need to distinguish between jobs that have been previously started so the stalled event is accurate (which my recent change doesn't address - but if we agree on this new strategy I'll fix it).

So how about we just make the task of "looking through the active list for an unlocked job - whether it's stalled or freshly moved to 'active'" a part of the processJobs run loop? This should address all concerns:

  1. This fixes bug Jobs get double processed (if removed immediately upon completion) #356 since it acquires the lock for the job atomically in the script used to scan the 'active' collection. So there's no opportunity for it to be double processed, or to process a job that isn't in the active state.
  2. This yields (as much as it can) to 'normal' processing. When looking for potentially stalled jobs, it traverses the active queue backwards, since real stalled jobs will mostly likely be towards the end. In a normally operating queue without any stalled jobs, this will occasionally 'steal' jobs from other workers that just moved them to active, but that's fine, it just means the jobs get processed faster :).
  3. There's no way for a job to be 'starved' by bouncing around from wait to active.

The downside to this approach is that it's much more expensive by iterating the active job list after processing every job. One way we can mitigate this is to only run it for the first call to processJobs() (so, ignoring the concurrency setting). The tradeoff would just be that stalled jobs get processed as if concurrency=1.

@manast
Copy link
Member

manast commented Oct 17, 2016

I agree on most of what you propose. But I do not think we can run this stalled-checker in the processJobs run loop. I think it needs to be a separated loop that runs at a different interval. For example it could run at the same interval as LOCK_RENEW_TIME (I do not think it makes sense to run it more often than that). This is both for performance but also for the case where you have a queue that does process sparse jobs, you could end with stalled jobs that do not get processed because no new jobs have been added to the queue. The problem with this approach is of course that you could end having more parallel jobs executed than you would like to, but if we process the stalled jobs serially it should be no more than one concurrent extra job than what you have specified in your concurrency level.

@bradvogel
Copy link
Contributor Author

bradvogel commented Oct 17, 2016

"the case where you have a queue that does process sparse jobs" -> this is actually handled already because the brpoplpush command has a timeout: https://github.com/OptimalBits/bull/blob/master/lib/queue.js#L700. So in a queue that isn't getting any new jobs, processStalledJob is guaranteed to be called at least every LOCK_RENEW_TIME sec.

So we're basically saying that for every job we process, AND on a minimum interval LOCK_RENEW_TIME, we'll look for stalled jobs and process them.

This current approach is also nice because it respects the desired concurrency setting, as it will only ever process one job (either regular or 'stalled') at one time per concurrency.

If we're concerned about performance, we could have only the first call to processJobs handle process stalled jobs, and the rest just process jobs normally. Though it's nice to have job stalled processing proportional to the number of jobs processed, since there's more opportunity for stalled jobs in that scenario. A once-per-queue job to process those jobs might never get to them all.

@manast
Copy link
Member

manast commented Oct 17, 2016

ok. I agree mostly on all what you wrote.

One thing, just to be 100% clear on the solution, for most of the cases, a stalled job will be a real stalled job, i.e., a job that has started but stalled before completion, and in this case we should not process it, just move it back to the wait queue. It is only the jobs that are picked by the stalled-checker part but that are not really stalled, the ones to be executed directly.

So basically the active queue is most of the time just clean from stalled jobs and with only real active ones left. That also means that even in the case where you call the stalled-checker once per LOCK_RENEW_TIME it will be fine, since all the stalled jobs will be moved to the wait queue in one sweep, almost immediately. No risk for the active list growing ad infinitum.

@bradvogel
Copy link
Contributor Author

bradvogel commented Oct 17, 2016

Why not just process the (real) stalled jobs directly in the run loop? The problem with moving them back to wait is that it creates a hazard where they can bounce between wait and active (though it likely won't happen forever, since there will likely always be a worker that is waiting for it via brpoplpush)

An advantage to having a single codepath process both types of stalled jobs - real stalled jobs and unlocked active jobs - is that it makes it very elegant to emit the 'stalled' event (see 417ba9d for a demonstration, though I would need to add the code to write the 'started' property when getting the first lock). Otherwise scripts.getStalledJob would need to make a full pass over the active queue, putting real stalled jobs back into wait and then returning the first unlocked active job for immediate processing (we can't return more than one for processing because we have to lock them inside this script).

My concern in a previous comment about not putting the stalled jobs back into wait was that one worker would be burdened with processing all real stalled jobs. But that was fixed by this recent commit to only process one active job per turn of the event loop.

@manast
Copy link
Member

manast commented Oct 18, 2016

I see two worrisome issues with not moving to wait the un-stalled jobs. Let me argument it. First, let me make clear one assumption that I am doing. That is, a stalled job is (or should be with a well constructed process function) a quite rare event, and even more rare, but certainly proportional to how often you check for stalled jobs, is the event where a job that has never started is picked by the stalled-checker.
Based on this assumption, I do not think it is wise to put a lot of computational overhead for something that will happen so seldom. If we have many workers, it will create a lot of congestion in the redis communication to just sweep the active list in search of stalled jobs, which for most of the time is a futile job since there will not be any. I do not think this solution scales very well, and we will loose a lot of performance. I did an effort to improve the performance and I noticed the single thing that put bull behind bee-queue was the stalled-checker.
The risk for a job entering a never ending loop between wait and active should be really small, specially if the time for checking for stalled jobs is something like LOCK_RENEW_TIME / number_of_workers. But to avoid that risk completely, we shall keep a "stalledCounter" variable, which we will need anyway in the case a certain job keeps stalling, which could indeed cause an infinite loop...

Finally, I am not sure I understand the solution you mentioned to avoid too many concurrent jobs, by starting them after every turn of the event loop. What happens in the likely event that a job takes more time to complete than event loop?

@bradvogel
Copy link
Contributor Author

Ok, so if I understand correctly, the solution you're proposing is:

  1. processStalledJobs is run once per queue on an interval (perhaps LOCK_RENEW_TIME / number_of_workers so it scales with the number of jobs processed (and number of potentially stuck jobs).
  2. It iterates the active list, and for every unlocked job, it increases its stalledCounter. If stalledCounter is under some threshold, it moves it to the front of the wait queue to be immediately picked up (also: if it has the started property, then we'll broadcast the stalled event for it). If it's over the threshold, it fails the job.

Ok, this sounds reasonable and I can work on it tomorrow morning, unless you have time to get to it first. I'm still slightly concerned that jobs will needlessly get bounced between active and wait if there are many active workers in the queue, all running their own processStalledJobs.

@manast
Copy link
Member

manast commented Oct 18, 2016

sorry, I meant LOCK_RENEW_TIME * number_of_workers. Since we are moving to wait it scales automatically as the jobs are distributed by all the workers, we just need to check for stalled jobs with the same frequency independently of the amount of workers! :).-

@manast
Copy link
Member

manast commented Oct 18, 2016

btw, since you are expending so much resources on this project, how would it feel to put the mixmax logo on the readme as a sponsor?

bradvogel added a commit to bradvogel/bull that referenced this pull request Oct 19, 2016
per OptimalBits#359 (comment) - hopefully more companies are willing to contribute to this wonderful project and get their logo up there!
@bradvogel
Copy link
Contributor Author

Sorry for the delay on this. Will finish this tomorrow

* Note that there is no way to know for _certain_ if a job is "stalled"
* (since the process that moved it to active might just be slow to get the
* lock on it), so this function takes a 'grace period' parameter to ignore
* jobs that were created in the recent X milliseconds.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess some things in this comment regarding the grace period are not relevant anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes, fixed!

}
return null;
}).catch(function(err){
console.error(err);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it could be useful to add a text to this log to explain it was related to the moveUlockedJobsToWait.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

' if(redis.call("SET", jobKey .. ":lock", ARGV[2], "PX", ARGV[3], "NX")) then',
' return job',
' end',
'local stalled = {}',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought, maybe it would be interesting to have a configurable LRANGE here. For example, with very large queues, the active list could be too big for being traversed too often. I have to double check, but if the oldest jobs are at the end of the queue, limiting to a max number of elements per call may work well. I am thinking to expose also the other constants that we have, for better finetuning. We do not need to change more in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good idea.

' redis.call("HSET", jobKey, "failedReason", "job stalled more than allowable limit")',
' else',
// Move the job back to the wait queue, to immediately be picked up by a waiting worker.
' redis.call("RPUSH", KEYS[2], job)',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the queue is a LIFO (check options), we need to do a LPUSH here instead. LIFO should also imply a new name for the script hash (since we could have different queues (LIFO/FIFO) in the same redis instance).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't LIFO an option for the job, not the queue? Also, if we're reprocessing jobs that already made it to 'active', don't we always want to make them LIFO? Otherwise they'd be unfairly penalized by waiting the entire wait queue again.

' local lockAcquired = redis.call("HGET", jobKey, "lockAcquired")',
' if(lockAcquired) then',
// If it was previously locked then we consider it 'stalled' (Case A above).
' table.insert(stalled, job)',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this jobs fails due too many times stalled, should we still emit it as an event?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I fixed this and changed the stalled event to emit just the single job, so no more API breakage.

@manast
Copy link
Member

manast commented Oct 21, 2016

great work. When this is merged I will take a look on the failing tests and after that I think I should release a new bull version (1.1.0 maybe).

@bradvogel
Copy link
Contributor Author

Ok, addressed comments. Let me know what you think. I can also do a pass on fixing unit tests, and writing new ones for this, if you'd like me to.

@manast
Copy link
Member

manast commented Oct 22, 2016

I saw most test failures are actually because they try to call the old processStalledJobs. In the last commits though there are some lint errors that are preventing the tests from running.
Of course it would be fantastic if you wrote tests for the new functionality, but fixing the old test can be quite time consuming, I have some experience on it, so I can help after the PR is merged.

@bradvogel
Copy link
Contributor Author

I'll fix these up.

@bradvogel
Copy link
Contributor Author

@manast I made a few small fixes, but I need your help with the tests. They're very flaky for me, giving me different results on each run (even on master)

@bradvogel
Copy link
Contributor Author

No problem. I'll do a pass on the tests later today, as well as fix eslint. Just wanted to make sure the overall strategy was approved by you before writing tests.

On Oct 22, 2016, at 1:14 PM, Manuel Astudillo notifications@github.com wrote:

I saw most test failures are actually because they try to call the old processStalledJobs. In the last commits though there are some lint errors that are preventing the tests from running.
Of course it would be fantastic if you wrote tests for the new functionality, but fixing the old test can be quite time consuming, I have some experience on it, so I can help after the PR is merged.


You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub, or mute the thread.

@manast
Copy link
Member

manast commented Oct 26, 2016

I will merge this now and I can take a look at the tests after that.

@manast manast merged commit 69b965b into OptimalBits:master Oct 26, 2016
@bradvogel
Copy link
Contributor Author

Thanks and let me know if you need anything from me!

@tomasgvivo
Copy link

Thanks for the fix! 👍
Please, update the package version!

@lfcgomes
Copy link

lfcgomes commented Nov 3, 2016

Could you please update the package version? :) Thanks!

duyenddd added a commit to duyenddd/bull that referenced this pull request Jul 28, 2024
per OptimalBits/bull#359 (comment) - hopefully more companies are willing to contribute to this wonderful project and get their logo up there!
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants