Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve SortPreservingMerge::enable_round_robin_repartition docs #13826

Merged
merged 7 commits into from
Dec 20, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 34 additions & 11 deletions datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Defines the sort preserving merge plan
//! [`SortPreservingMergeExec`] merges multiple sorted streams into one sorted stream.

use std::any::Any;
use std::sync::Arc;
Expand All @@ -38,10 +38,22 @@ use log::{debug, trace};

/// Sort preserving merge execution plan
///
/// This takes an input execution plan and a list of sort expressions, and
/// provided each partition of the input plan is sorted with respect to
/// these sort expressions, this operator will yield a single partition
/// that is also sorted with respect to them
/// # Overview
///
/// This operator implements a K-way merge. It is used to merge multiple sorted
/// streams into a single sorted stream and is highly optimized.
///
/// ## Inputs:
///
/// 1. A list of sort expressions
/// 2. An input plan, where each partition is sorted with respect to
/// these sort expressions.
///
/// ## Output:
///
/// 1. A single partition that is also sorted with respect to the expressions
///
/// ## Diagram
///
/// ```text
/// ┌─────────────────────────┐
Expand All @@ -55,12 +67,12 @@ use log::{debug, trace};
/// ┌─────────────────────────┐ │ └───────────────────┘ └─┬─────┴───────────────────────┘
/// │ ╔═══╦═══╗ │ │
/// │ ║ B ║ E ║ ... │──┘ │
/// │ ╚═══╩═══╝ │ Note Stable Sort: the merged stream
/// └─────────────────────────┘ places equal rows from stream 1
/// │ ╚═══╩═══╝ │ Stable sort if `enable_round_robin_repartition=false`:
/// └─────────────────────────┘ the merged stream places equal rows from stream 1
/// Stream 2
///
///
/// Input Streams Output stream
/// Input Partitions Output Partition
/// (sorted) (sorted)
/// ```
///
Expand All @@ -70,7 +82,7 @@ use log::{debug, trace};
/// the output and inputs are not polled again.
#[derive(Debug, Clone)]
pub struct SortPreservingMergeExec {
/// Input plan
/// Input plan with sorted partitions
input: Arc<dyn ExecutionPlan>,
/// Sort expressions
expr: LexOrdering,
Expand All @@ -80,7 +92,9 @@ pub struct SortPreservingMergeExec {
fetch: Option<usize>,
/// Cache holding plan properties like equivalences, output partitioning etc.
cache: PlanProperties,
/// Configuration parameter to enable round-robin selection of tied winners of loser tree.
/// Use round-robin selection of tied winners of loser tree
///
/// See [`Self::with_round_robin_repartition`] for more information.
enable_round_robin_repartition: bool,
}

Expand All @@ -105,6 +119,14 @@ impl SortPreservingMergeExec {
}

/// Sets the selection strategy of tied winners of the loser tree algorithm
///
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the actual public facing API descriptin change

/// When true (the default) equal output rows are placed in the merged
/// stream when ready, which is faster but not stable (can vary from
/// run to run).
Copy link
Contributor

@jayzhan211 jayzhan211 Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I remember, round robin repartition choose the smaller index if other condition is equals (value and polled count is equal). If there are 3 streams, we choose 1, 2, 3 and back to 1, 2, 3 (because we know the polled count) until the value advanced. Do you know why the result vary from run to run and is not always the same?

Copy link
Contributor

@jayzhan211 jayzhan211 Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Poll may be pending because of networking so value is not yet ready? Better includes the reason in the comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right -- I double checked the other parts of the code -- I think I misudnerstood the root need for this option.

I removed the discssion of polling when not ready.

///
/// If false, equal output rows are placed in the merged stream in the order
/// of the inputs, resulting in potentially slower execution but in a stable
/// output order.
pub fn with_round_robin_repartition(
mut self,
enable_round_robin_repartition: bool,
Expand All @@ -128,7 +150,8 @@ impl SortPreservingMergeExec {
self.fetch
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
/// Creates the cache object that stores the plan properties
/// such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(
input: &Arc<dyn ExecutionPlan>,
ordering: LexOrdering,
Expand Down
Loading