Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fine performance metrics for execute, gather_dep, etc. #7586

Merged
merged 45 commits into from
Mar 14, 2023

Conversation

crusaderky
Copy link
Collaborator

@crusaderky crusaderky commented Feb 24, 2023

This PR introduces finely-grained performance measures in every call to execute, gather_dep, get_data, and memory_monitor.

In Worker.state.stimulus_log, every GatherDep*Event and Execute*Event now includes a metrics attribute, with the following elements:

GatherDep<Success,Busy,NetworkFailure,Failure>Event

  • network (seconds)
  • decompress (seconds)
  • deserialize (seconds)
  • For output spilling:
    • serialize (seconds)
    • compress (seconds)
    • disk write (seconds)
    • disk write (bytes)
    • disk write (count)
  • other (seconds) (read below)

ExecuteSuccessEvent, ExecuteFailureEvent, RescheduleEvent

All metrics are broken down by task prefix.

  • For run_spec deserialization:
    • decompress (seconds)
    • deserialize (seconds)
  • For dependencies unspilling:
    • disk read (seconds)
    • disk read (bytes)
    • disk read (count)
    • decompress (seconds)
    • deserialize (seconds)
  • For execution:
    • user-defined metrics from inside the task (e.g. tasks tagging their non-CPU time as I/O)
    • thread CPU (seconds) (time.thread_time - user metrics)
    • threead non-CPU (seconds) (time.time - time.thread_time - user metrics)
  • For output spilling:
    • serialize (seconds)
    • compress (seconds)
    • disk write (seconds)
    • disk write (bytes)
    • disk write (count)
  • other (seconds) (read below)

The other time is the difference between the elapsed time as seen from the WorkerStateMachine (from the emission of the GatherDep or Execute Instruction to the final Event) and the sum of all other time metrics. It highlights, for example, a slow event loop or a large overhead in loop.run_in_executor.

The above metrics are aggregated to per-worker totals (or in case of execute, per-task prefix totals) and posted to Worker.digests, Worker.digests_total, and Worker.digests_max.
In case of unsuccessful execution or transfer, time metrics are aggregated into a single, separate event, so that one can appreciate how much time is being wasted in failed attempts.

Equivalent digests are also produced in other resource-intensive activities:

Memory Monitor

  • serialize (seconds)
  • compress (seconds)
  • disk write (seconds)
  • disk write (bytes)
  • disk write (count)

get_data

  • For dependencies unspilling:
    • disk read (seconds)
    • disk read (bytes)
    • disk read (count)
    • decompress (seconds)
    • deserialize (seconds)
  • serialize (seconds)
  • compress (seconds)
  • network (seconds)

Spill-specific activity is also recorded under Worker.data.cumulative_metrics and separately posted to Prometheus.
Besides convenience, this allows separating (de)serialization and (de)compression caused by network transfers from those caused by spilling/unspilling.

Out of scope

  • Propagate metrics to Scheduler
  • Publish metrics other than the spill metrics
  • measure and break down time spent without enough Execute instructions running to saturate the worker's thread pool
  • measure time spent in unfinished task execution
  • move time metrics for task re-execution following worker death into a separate measure
  • logic to correctly measure seceded tasks
  • logic to separate "good" I/O time during task execution (e.g. read_parquet), as well as GPU time, from time waste. This PR offers the toolkit to log this time in dask/dask.
  • cleanup of legacy metrics (startstops, crick)

Demo

https://gist.github.com/crusaderky/a97f870c51260e63a1c14c20b762f666

@crusaderky crusaderky self-assigned this Feb 24, 2023
@crusaderky crusaderky changed the title WIP: Fine performance metrics for execute, gather_dep, and get_data WIP: Fine performance metrics for execute, gather_dep, etc. Feb 24, 2023
@crusaderky crusaderky marked this pull request as draft February 24, 2023 20:08
@github-actions
Copy link
Contributor

github-actions bot commented Feb 24, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       26 files  ±    0         26 suites  ±0   12h 31m 51s ⏱️ + 15m 10s
  3 527 tests +  30    3 423 ✔️ +  30     103 💤 ±0  1 ±0 
44 591 runs  +390  42 516 ✔️ +384  2 074 💤 +6  1 ±0 

For more details on these failures, see this check.

Results for commit ad45e4a. ± Comparison against base commit 700f14a.

This pull request removes 3 and adds 33 tests. Note that renamed tests count towards both.
distributed.http.worker.tests.test_worker_http ‑ test_prometheus_resets_max_metrics
distributed.tests.test_worker_memory ‑ test_digests
distributed.tests.test_worker_state_machine ‑ test_slots[DigestMetric]
distributed.http.scheduler.tests.test_scheduler_http ‑ test_check_idle
distributed.tests.test_metrics ‑ test_context_meter
distributed.tests.test_metrics ‑ test_context_meter_decorator
distributed.tests.test_metrics ‑ test_context_meter_nested
distributed.tests.test_metrics ‑ test_context_meter_nested_floor
distributed.tests.test_metrics ‑ test_context_meter_pickle
distributed.tests.test_metrics ‑ test_context_meter_raise
distributed.tests.test_metrics ‑ test_delayed_metrics_ledger
distributed.tests.test_metrics ‑ test_meter
distributed.tests.test_metrics ‑ test_meter_floor[kwargs0-0]
…

♻️ This comment has been updated with latest results.

@crusaderky crusaderky force-pushed the gather-dep branch 3 times, most recently from 892cb11 to 0f74274 Compare February 27, 2023 12:41
Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far I only had a very high level look. I have a couple of concerns

  • We're wrapping many things in ctx managers. I understand that this is context aware but I wonder if we cannot (sometimes) use decorators instead. This would feel less invasive and would be easier to remove/modify later on
  • This is added in many places at the same time. This is not only instrumenting parts of the worker but even parts of our protocol. While I couldn't see any hot for loops, this all makes me worry a bit about overhead. Both in terms of memory overhead when retaining the metrics but also in terms of plain runtime overhead, e.g. when there are multiple callbacks registered.
  • This feels a bit like we're introducing our own instrumentation/tracing framework which makes me wonder if we shouldn't instead look into existing frameworks for this kind of thing. cc @hendrikmakait you spent some time with OTel tracing. Do you think we could move from this ContextMeter to an OTel Span or are there fundamental differences? I do not want to suggest to make this switch immediately but I would like us to be aware of this, the proposed callback based API feels a bit special. (I actually think what we're doing here is a mixture between tracing and collecting metrics so this might not map perfectly)

Comment on lines 907 to 909
DigestMetric.match(stimulus_id="s5", name="gather-dep-failed-seconds"),
DigestMetric.match(stimulus_id="s6", name="execute-other-seconds"),
DigestMetric.match(name="compute-duration", stimulus_id="s6"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the POV of this test, I don't care that these digests are emitted here. They make the test more brittle and harder to understand.
Can we keep the test somehow as before?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I found a way to remove DigestMetric altogether 🥳


assert ts.annotations is not None
executor = ts.annotations.get("executor", "default")
with context_meter.add_callback(metrics_callback):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wraps almost the entire method such that the indentation creates a very large diff. Can this be introduced a bit less invasive, e.g. using a decorator?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

merged_frames.append(merged)

return deserialize(header, merged_frames, deserializers=deserializers)
with context_meter.meter("deserialize"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also an example where I would consider a decorator less invasive

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

distributed/metrics.py Show resolved Hide resolved
@hendrikmakait
Copy link
Member

hendrikmakait commented Feb 27, 2023

  • This feels a bit like we're introducing our own instrumentation/tracing framework which makes me wonder if we shouldn't instead look into existing frameworks for this kind of thing. cc @hendrikmakait you spent some time with OTel tracing. Do you think we could move from this ContextMeter to an OTel Span or are there fundamental differences? I do not want to suggest to make this switch immediately but I would like us to be aware of this, the proposed callback based API feels a bit special. (I actually think what we're doing here is a mixture between tracing and collecting metrics so this might not map perfectly)

I think the main difference is around the data we collect and the overhead generated by collecting or deriving the relevant data.

From what I understand, we are exclusively interested in collecting performance metrics. At the moment, these seem to be the latency of some operation. With tracing, we'd collect more information, essentially the nested call traces of our operations. This would include the information about their latency, also all sorts of other attributes and creating metrics from that data requires additional work. There is the SpanMetricsProcessor, which allows us to generate Rate, Errors, Duration metrics from our tracing data. We may also need some additional thoughts to tie group those metrics to the relevant levels defined in this PR by the callbacks. Once again, there seem to be some ways to do so. This RP seems to try to generate an API that combines that with tracing. The runtime overhead I could measure in a small experiment was around 25 µs, so this might add up over time.

FWIW, I've been working on a small worker plugin that hooks into our startstops to generate some taskstream-like tracing. I should be able to hack something together that would hook into the ContextMeter.meter() calls and trace data in OTel if we'd like to take that idea for a spin before settling on one or the other.

@crusaderky crusaderky force-pushed the gather-dep branch 10 times, most recently from b230744 to 6d1dd32 Compare March 1, 2023 16:44
@crusaderky
Copy link
Collaborator Author

crusaderky commented Mar 1, 2023

The end-to-end runtime for a trivial execute() call is 640us for sync tasks and 230us for async ones.
This is from the moment when the Instruction leaves the WorkerStateMachine to the moment when the final Event goes back into it.
We do not have a measure on main for comparison.
I think in absolute terms this is negligible (it shows that getting in and out of the thread takes almost 2x of everything else).

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the current state of this instrumentation is sufficiently well encapsulated that I am not concerned. I could easily see us replacing this Meter and DelayedMetricsLedger again with something else that is rather close to a Span. The actual instrumentation in-code is fairly minimal which is what I wanted to see.

While I like the Span object of #7619 this PR is more advanced and I don't consider added investment in #7619 would significantly improve this instrumentation system.

I believe, besides nomenclature, the most notable difference between the two PRs is where ownership of certain responsibilities is attached. This PR tangles the intstrumentation closer to the BaseWorker while the other PR tries harder to isolate this. Both approaches come with pros and cons. There are also a couple of other difference that I consider mostly syntactic sugar.
For instance, I like Span.flat which is a really nice API but I doubt that we actually need it. If so, we could likely offer something similar in this architecture.

I suggest to move forward with this PR and focus our time on exposing these metrics as fast as possible.

Comment on lines 3342 to 3343
"start": ev.user_code_start,
"stop": ev.user_code_stop,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This has a non-trivial and subtle side effect (I think positive).

The startstops information is used to set TaskGroup and therefore TaskPrefix duration estimates/measurements, i.e. by no longer counting in disk, etc. the measurements will more accurately reflect the actual tasks compute time.
However, this in turn also impacts occupancy calculations and therefore even impacts scheduling.

I believe overall this makes scheduling decisions more robust and I guess we will not even see this matter in reality. However, the connection is not necessarily apparent and I wanted to highlight this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never counted disk etc. In main this is already using the in-thread measure; I just changed the naming to be less ambiguous. There was a moment where I accidentally set this to the end-to-end runtime and a couple of tests blew up.

distributed/worker_state_machine.py Outdated Show resolved Hide resolved
Comment on lines 263 to 271
# Work around bug with Tornado PeriodicCallback, which does not properly
# insulate contextvars
async def _() -> None:
with context_meter.add_callback(metrics_callback):
# Measure delta between the measures from the SpillBuffer and the total
# end-to-end duration of _spill
await self._spill(worker, memory)

await asyncio.create_task(_(), name="memory-monitor-spill")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate? The create_task thing looks a bit odd and I would likely refactor this if I stumbled over it. What of this construction is required to work around the bug?
Do you need a "clean" context in _ or what is this construction doing?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create_task automatically duplicates and insulates the current context. PeriodicCallback doesn't (official support for async callbacks is very new fwiw). Without this workaround, I saw metrics clearly emitted by Worker.execute show up here.
I've updated the comment to clarify.

distributed/worker.py Outdated Show resolved Hide resolved
distributed/utils.py Outdated Show resolved Hide resolved
Comment on lines +3622 to +3628
def _finalize_metrics(
self,
stim: StateMachineEvent,
ledger: DelayedMetricsLedger,
) -> None:
activity: tuple[str, ...]
coarse_time: str | Literal[False]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This is a big switch statement. I personally would likely implement this as a method of the various events. attaching the ownership of this to the event moves the implementation slightly closer to the TracedEvent of #7619

However, I don't have a strong preference and don't mind having this as a method here. I think there is a case for both, I just wanted to pass along this observation.

Copy link
Collaborator Author

@crusaderky crusaderky Mar 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, I thought about it but felt that putting it in methods would be

  1. too dispersive - you'd need to go through all of those classes to see the fine differences between their implementation
  2. it would require a back pointer to WorkerStateMachine in the parameters, which while technically sound is not a pattern we've used anywhere else
  3. We would need to implement a dummy in RetryBusyWorkerEvent, just in virtue of it being the exit event of an async instruction - or alternatively we'd need to insert an extra level of parent classes class EventAtEndOfAsyncInstruction(StateMachineEvent) just for this.

@crusaderky
Copy link
Collaborator Author

@fjetter all code review comments have been addressed; this is ready for final review and merge.

@fjetter fjetter merged commit a09a151 into dask:main Mar 14, 2023
@fjetter
Copy link
Member

fjetter commented Mar 14, 2023

very excited about seeing this in action!

@crusaderky crusaderky deleted the gather-dep branch March 14, 2023 17:25
ypogorelova pushed a commit to ypogorelova/distributed that referenced this pull request Mar 16, 2023
crusaderky added a commit to crusaderky/distributed that referenced this pull request Mar 18, 2023
crusaderky added a commit to crusaderky/distributed that referenced this pull request Mar 18, 2023
jrbourbeau pushed a commit that referenced this pull request Mar 20, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Top-level cluster efficiency metric Better instrumentation for Worker.gather_dep
5 participants