Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: recoverable processing + merging #646

Merged
merged 12 commits into from
Mar 29, 2022
Merged

Conversation

andrzejnovak
Copy link
Collaborator

We discussed in the past how it's bad when a big job crashes and many hours and core hours can be lost. Here's a draft of how we could go about solving that (ignore some of the merge logic, I used the FutureHolder class from my job merging PR because this needs a bit finer control than the _futures_handler generator, but it's not necessarily connected)

Workflow can look like this.

run_instance = processor.Runner(....
)

filemeta = run_instance(samples, "Events", processor_instance=MyZPeak(), prepro_only=True)` 
# save it in a pickle or sth for reference

# passing existing pre-made chunks to the Runner instance will skip the prepro step
output, processed, metrics = run_instance(filemeta , "Events", processor_instance=MyZPeak())` 
# WorkItem is attached to the accumulator like metrics are
# if exception is raised, report it, but return futures that already finished

missing = set(filemeta) - set(processed)
# resubmit
output, processed, metrics = run_instance(list(missing), "Events", processor_instance=MyZPeak())` 

@nsmith-

@lgray
Copy link
Collaborator

lgray commented Mar 7, 2022

image

Copy link
Member

@nsmith- nsmith- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some early comments, mostly I'm concerned about object lifetime. One of the first problems we had was holding onto futures longer than necessary and blowing up our memory, as discussed in #97

coffea/processor/executor.py Show resolved Hide resolved
coffea/processor/executor.py Outdated Show resolved Hide resolved
coffea/processor/executor.py Outdated Show resolved Hide resolved
@andrzejnovak
Copy link
Collaborator Author

The magic sauce seems to be a secondary executor (either spawned automatically or passed in) to run the merge jobs in parallel.

I am not sure where we landed on having rich as a dependency, but rich progress bars make the code with multiple bars a bit neater https://github.com/andrzejnovak/coffea/pull/1/files

@NJManganelli
Copy link
Collaborator

pip made rich a vendored dependency for 22.0, so was it transitively via that?

@andrzejnovak andrzejnovak changed the title feat: recoverable processing feat: recoverable processing + merging Mar 14, 2022
@andrzejnovak
Copy link
Collaborator Author

Actually seems quite a bit faster this time around, two runs each, merge pool with 2 workers, which can probably run anywhere.
image
image

image
image

@lgray
Copy link
Collaborator

lgray commented Mar 14, 2022

Can you replicate the work to be done a little bit and see how the memory scales?

@andrzejnovak
Copy link
Collaborator Author

image
Same files though, so I am guessing the files are cached somehow so the time doesn't scale x10. Looking at htop while that happens, the mem consumption looks consistent throughout. I am running a slightly modded MyZPeak example with 4x60bins axes, so should be not entirely negligible.

@lgray
Copy link
Collaborator

lgray commented Mar 14, 2022

Be careful with tests in windows - forking and multiprocessing is strange there.

@andrzejnovak
Copy link
Collaborator Author

Alright, I guess that makes it reviewable now. Specifically, I'd like to hear your thoughts on the added API and particularly for how the error state/processed items are returned back up by the executor(s). This is currently passed as a tuple, but it's not very elegant. I assume the reason the returns are formed like eg. (accumulator, metrics) as opposed to just returning the wrapped_out dict, is that somewhere it has to be immutable?

@lgray
Copy link
Collaborator

lgray commented Mar 15, 2022

We should test this on a many-core (64+) machine and check the scaling in extremum as well.

@lgray
Copy link
Collaborator

lgray commented Mar 15, 2022

We'll still run into some nasty issues with big histograms... hmm.

@andrzejnovak
Copy link
Collaborator Author

We should test this on a many-core machine and check the scaling in extremum as well.

I can test this on a node up to 80 cores, but there's a lot of RAM available, so it's not necessarily stressing the code.

The next scale up is obv the parsl executor (either with Futures or another parsl dfk doing the merging), which should maybe be a separate PR.

@lgray
Copy link
Collaborator

lgray commented Mar 15, 2022

Sure - the point is to observe trends rather than really stress anything.

@andrzejnovak
Copy link
Collaborator Author

Alright, this crept up in scope a bit.

  • New features (for futures/parsl executors)
    • recoverable - Optionally return currently processed chunks along with the error in case of failure, instead of raising directly
    • merging - Implement merging in batches (either in main process/in main executor/in separate executor)
      • On small test jobs at least as fast as master or faster
      • Will particularly improve when many small chunks are being processed or when many workers are available
    • rich progress bars
  • API changes - breaking
    • convert sets to list in metrics (it's annoying they are not be default json dumpable)
    • Executors return (out, 0) or (out, error) if recoverable is implemented
  • API changes
    • Calling Runner type standardized - instead of a variable-length tuple, return dict
      • isolated into Runner().run(...)
      • current Runner()() remain unchanged - under the hood calls run()
      • deprecate and switch in the future?

Ready to review. We can expand the merge logic to dask and overall use of rich progress bars in future PRs.

Copy link
Member

@nsmith- nsmith- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an overall comment, if you can sprinkle some typing in some of the new function signatures that would be helpful.

coffea/processor/executor.py Show resolved Hide resolved
coffea/processor/executor.py Outdated Show resolved Hide resolved
coffea/processor/executor.py Outdated Show resolved Hide resolved
coffea/processor/executor.py Outdated Show resolved Hide resolved
coffea/processor/executor.py Outdated Show resolved Hide resolved
coffea/processor/executor.py Outdated Show resolved Hide resolved
coffea/processor/executor.py Outdated Show resolved Hide resolved
tests/test_processor.py Show resolved Hide resolved
@andrzejnovak
Copy link
Collaborator Author

Instead of passing "prepro" to run, I tried to factor it out into a separate fcn, which makes more sense, but having to comply with the dynamic chunking there makes it a bit awkward (relies on both filemeta and chunk generator, so run needs to be able to take fileset as input.

@andrzejnovak
Copy link
Collaborator Author

@nsmith- Cleaned up the commits and also added rich.progress for Iterative. I'd say this is ready

@lgray
Copy link
Collaborator

lgray commented Mar 28, 2022

Since we can't capture it very well in CI have we tried this PR at scale with all the executors?

@andrzejnovak
Copy link
Collaborator Author

Since we can't capture it very well in CI have we tried this PR at scale with all the executors?

Reasonable scale for futures, full 2016 production of my analysis with parsl about 3 times. The speed there is dependent on how busy the disk, but in each it was faster or as fast as master

@lgray
Copy link
Collaborator

lgray commented Mar 29, 2022

@nsmith- you happy here?

Copy link
Member

@nsmith- nsmith- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a small type thing, not making it required but would be nice.

coffea/processor/executor.py Outdated Show resolved Hide resolved
coffea/processor/executor.py Outdated Show resolved Hide resolved
@andrzejnovak
Copy link
Collaborator Author

@nsmith- seems we're good to go

@lgray lgray merged commit d235b82 into CoffeaTeam:master Mar 29, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants