Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHORE] Swordfish refactors #3256

Merged
merged 16 commits into from
Nov 15, 2024
Merged

[CHORE] Swordfish refactors #3256

merged 16 commits into from
Nov 15, 2024

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Nov 11, 2024

There's a couple outstanding issues / inefficiencies / ugliness in the swordfish code. I originally intended of breaking these up into smaller PRs, but during the process of trying to split it up, I realized that all the changes are quite intertwined, and it may be easier on the reviewer to just see all of them in one. That being said, I'll try my best to explain all the changes and rationale in detail.

Problems

  • The PipelineChannel abstraction doesn't play well with streaming sinks, in fact, in only really works for intermediate ops. This is because the PipelineChannel assumes sending/receiving is round robin, but streaming sinks can send data from the workers, as well as after the finalize.
  • Pipeline nodes currently send across both data and probe tables in the same channel, which can be confusing. Furthermore, the logic of dispatching probe tables and data is also different, adding more complexity. Probe tables need to be broadcasted to all workers, while data should only be distributed to a single worker.
  • If an operator does not require input to be ordered, it shouldn't dispatch work to workers in round robin, it should dispatch to any available worker.
  • Some operators don't do work in their execute or finalize methods. These don't need to be spawned on the compute runtime.

Proposed Changes

  • Pipeline nodes should just send data across, via a simple Receiver<Arc<Micropartition>>.
  • Probe tables should be sent separately from data, via a ProbeStateBridge.
  • Implement an unordered dispatcher, an optimization for if ordering is not required. This uses loole channels, which are multi-producer multi-consumer. For consistency, use loole channels across all swordfish. In the future we can think about implementing our custom channels. https://users.rust-lang.org/t/loole-a-safe-async-sync-multi-producer-multi-consumer-channel/113785
  • Give the compute runtime to the operators, and let them decide whether they want to spawn.

@github-actions github-actions bot added the chore label Nov 11, 2024
Copy link

codspeed-hq bot commented Nov 11, 2024

CodSpeed Performance Report

Merging #3256 will improve performances by 45.45%

Comparing colin/probe-state-bridge (e751f61) with main (711e862)

Summary

⚡ 2 improvements
✅ 15 untouched benchmarks

Benchmarks breakdown

Benchmark main colin/probe-state-bridge Change
test_iter_rows_first_row[100 Small Files] 269 ms 237.4 ms +13.32%
test_show[100 Small Files] 22.3 ms 15.3 ms +45.45%

Copy link

codecov bot commented Nov 11, 2024

Codecov Report

Attention: Patch coverage is 97.11316% with 25 lines in your changes missing coverage. Please review.

Project coverage is 77.59%. Comparing base (711e862) to head (e751f61).
Report is 10 commits behind head on main.

Files with missing lines Patch % Lines
src/daft-local-execution/src/dispatcher.rs 93.85% 7 Missing ⚠️
...local-execution/src/sinks/outer_hash_join_probe.rs 95.70% 7 Missing ⚠️
...ecution/src/intermediate_ops/actor_pool_project.rs 84.00% 4 Missing ⚠️
.../daft-local-execution/src/sinks/hash_join_build.rs 93.02% 3 Missing ⚠️
src/daft-local-execution/src/run.rs 84.61% 2 Missing ⚠️
.../src/intermediate_ops/anti_semi_hash_join_probe.rs 97.36% 1 Missing ⚠️
...-execution/src/intermediate_ops/intermediate_op.rs 98.41% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #3256      +/-   ##
==========================================
+ Coverage   77.50%   77.59%   +0.08%     
==========================================
  Files         666      666              
  Lines       81335    81621     +286     
==========================================
+ Hits        63041    63331     +290     
+ Misses      18294    18290       -4     
Files with missing lines Coverage Δ
src/common/runtime/src/lib.rs 90.78% <100.00%> (ø)
src/daft-local-execution/src/channel.rs 98.14% <100.00%> (-0.36%) ⬇️
...-local-execution/src/intermediate_ops/aggregate.rs 100.00% <100.00%> (ø)
...ft-local-execution/src/intermediate_ops/explode.rs 100.00% <100.00%> (ø)
...aft-local-execution/src/intermediate_ops/filter.rs 100.00% <100.00%> (ø)
...tion/src/intermediate_ops/inner_hash_join_probe.rs 99.23% <100.00%> (+3.46%) ⬆️
...ft-local-execution/src/intermediate_ops/project.rs 100.00% <100.00%> (ø)
...aft-local-execution/src/intermediate_ops/sample.rs 100.00% <100.00%> (ø)
...ft-local-execution/src/intermediate_ops/unpivot.rs 100.00% <100.00%> (ø)
src/daft-local-execution/src/lib.rs 95.29% <100.00%> (+2.43%) ⬆️
... and 18 more

... and 3 files with indirect coverage changes


/// A dispatcher that distributes morsels to workers in an unordered fashion.
/// Used if the operator does not require maintaining the order of the input.
pub(crate) struct UnorderedDispatcher {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Script that tests for uneven runtimes across tasks. With the UnorderedDispatcher, performance is considerably increased.

daft.context.set_execution_config(default_morsel_size=1)

@daft.udf(return_dtype=daft.DataType.int64())
def do_work(x):
    x = x.to_pylist()
    # my laptop has 12 cores, so 12 workers will be spawned to run this udf.
    if x[0] % 12 == 0:
        print("doing a lot of work on ", x)
        time.sleep(1)
    else:
        print("doing a little work on ", x)
        time.sleep(0.1)
    return x


df = daft.from_pydict({"a": [i for i in range(48)]}).with_column("b", do_work(col("a"))).agg(col("b").sum())
df.collect()

@colin-ho colin-ho marked this pull request as ready for review November 11, 2024 08:48
@colin-ho colin-ho requested a review from samster25 November 11, 2024 14:15
Comment on lines 22 to 29
/// The `OperatorOutput` enum represents the output of an operator.
/// It can be either `Ready` or `Pending`.
/// If the output is `Ready`, the value is immediately available.
/// If the output is `Pending`, the value is not yet available and a `RuntimeTask` is returned.
pub(crate) enum OperatorOutput<T> {
Ready(T),
Pending(RuntimeTask<T>),
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This OperatorOutput allows operators to spawn their own tasks on the compute runtime if needed, or if no compute is necessary, return whatever result they have. This also allows hash join probes to await on the probe tables within the execute method.

@@ -46,6 +47,13 @@ impl LocalPartitionIterator {
#[cfg_attr(feature = "python", pyclass(module = "daft.daft"))]
pub struct NativeExecutor {
local_physical_plan: Arc<LocalPhysicalPlan>,
cancel: CancellationToken,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tasks spawned on the execution runtime handle currently only abort if they try a send and see that the channel is closed (this is something we did in the native runner pr). However, they should also just abort if the native executor gets dropped.

Comment on lines +21 to +24
pub(crate) struct ProbeStateBridge {
inner: OnceLock<Arc<ProbeState>>,
notify: tokio::sync::Notify,
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://docs.rs/tokio/latest/tokio/sync/struct.OnceCell.html doesn't work cuz there's no get method that is async.

Opted for just a manual implementation.

src/daft-local-execution/src/channel.rs Outdated Show resolved Hide resolved
src/daft-local-execution/src/channel.rs Outdated Show resolved Hide resolved
src/daft-local-execution/src/lib.rs Outdated Show resolved Hide resolved
///
/// It returns a vector of receivers (one per worker) that will receive the morsels.
///
/// Implementations must spawn a task on the runtime handle that reads from the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would make more sense to name this:

trait DispatcherBuilder {
  fn spawn_dispatcher(...) -> SpawnedDispatcherResult;
}

struct SpawnedDispatcherResult {
  receivers:  Vec<Receiver<Arc<MicroPartition>>>,
  dispatch_handle: SpawnedTask
}

I think my key confusion was that dispatch sounds like an action that would happen on every tick rather than something that happens once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, made it more explicit that it should spawn a dispatching task. Also implemented the SpawnedDispatcherResult, then the caller can await on it themselves.

@colin-ho colin-ho requested a review from samster25 November 14, 2024 06:26
Copy link
Member

@samster25 samster25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔥


fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
Pin::new(&mut this.0).poll(cx).map(|r| r.context(JoinSnafu))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually this looks a bit sketchy! Why do we have to Pin::new this?

Copy link
Contributor Author

@colin-ho colin-ho Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The poll method on a JoinHandle accepts Pin<&mut Self>, but self.0 is not pinned even though self is pinned, so i pinned it.

I did more reading, and I found pin_project which allows you to project the pin on a struct into it's fields, and don't need to Pin::new. Technically i think it was fine to Pin::new it since the joinhandle is Unpin? Note to self: Need to read up more on pin and unpin though

@colin-ho colin-ho merged commit ca8f25d into main Nov 15, 2024
44 checks passed
@colin-ho colin-ho deleted the colin/probe-state-bridge branch November 15, 2024 16:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants