Skip to content

Commit

Permalink
for debug
Browse files Browse the repository at this point in the history
iniyt

init

remove .gz
  • Loading branch information
Lordworms committed Mar 27, 2024
1 parent 0b95577 commit 7c07545
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 0 deletions.
96 changes: 96 additions & 0 deletions datafusion/core/src/physical_optimizer/limit_pushdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! The [`Limit`] rule tries to modify a given plan so that it can
//! accommodate infinite sources and utilize statistical information (if there
//! is any) to obtain more performant plans. To achieve the first goal, it
//! tries to transform a non-runnable query (with the given infinite sources)
//! into a runnable query by replacing pipeline-breaking join operations with
//! pipeline-friendly ones. To achieve the second goal, it selects the proper
//! `PartitionMode` and the build side using the available statistics for hash joins.
use std::sync::Arc;

use crate::datasource::physical_plan::CsvExec;
use crate::physical_optimizer::utils::is_global_limit;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::aggregates::AggregateExec;
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::Result;
use datafusion_optimizer::push_down_limit;
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use hashbrown::raw::Global;
use itertools::Itertools;
pub struct LimitPushdown {}

impl LimitPushdown {
fn new() -> Self {
Self {}
}
}

impl PhysicalOptimizerRule for LimitPushdown {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
// if this node is not a global limit, then directly return
if !is_global_limit(&plan) {
return Ok(plan);
}
// we traverse the treenode to try to push down the limit same logic as project push down
plan.transform_down(&push_down_limit).data()
}

fn name() -> &str {
"LimitPushdown"
}

fn schema_check(&self) -> bool {
true
}
}
impl LimitPushdown {}
fn new_global_limit_with_input() {}
// try to push down current limit, based on the son
fn push_down_limit(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
let modified =
if let Some(global_limit) = plan.as_any().downcast_ref::<GlobalLimitExec>() {
let input = global_limit.input().as_any();
if let Some(coalesce_partition_exec) =
input.downcast_ref::<CoalescePartitionsExec>()
{
general_swap(plan)
} else if let Some(coalesce_batch_exec) =
input.downcast_ref::<CoalesceBatchesExec>()
{
general_swap(plan)
} else {
None
}
} else {
Ok(Transformed::no(plan))
};
}
fn general_swap(plan: &GlobalLimitExec) -> Result<Option<Arc<dyn ExecutionPlan>>> {}
1 change: 1 addition & 0 deletions datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod combine_partial_final_agg;
pub mod enforce_distribution;
pub mod enforce_sorting;
pub mod join_selection;
pub mod limit_pushdown;
pub mod limited_distinct_aggregation;
pub mod optimizer;
pub mod output_requirements;
Expand Down
29 changes: 29 additions & 0 deletions datafusion/core/src/physical_optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use std::sync::Arc;

use crate::datasource::physical_plan::ArrowExec;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
Expand All @@ -28,7 +29,9 @@ use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties};

use datafusion_physical_expr::{LexRequirement, PhysicalSortRequirement};
use datafusion_physical_plan::joins::{CrossJoinExec, HashJoinExec};
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::test::exec::StatisticsExec;
use datafusion_physical_plan::tree_node::PlanContext;

/// This utility function adds a `SortExec` above an operator according to the
Expand Down Expand Up @@ -107,3 +110,29 @@ pub fn is_union(plan: &Arc<dyn ExecutionPlan>) -> bool {
pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<RepartitionExec>()
}

/// Check whether the given operator is a [`GlobalLimitExec`].
pub fn is_global_limit(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<GlobalLimitExec>()
}
/// Check whether the given plan is a terminator of [`GlobalLimitExec`].
pub fn is_limit_terminator(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<HashJoinExec>()
|| plan.as_any().is::<CrossJoinExec>()
|| plan.as_any().is::<MemoryExec>()
|| plan.as_any().is::<NestedLoopJoinExec>()
|| plan.as_any().is::<UnionExec>()
|| plan.as_any().is::<MemoryExec>()
|| plan.as_any().is::<PartialSortExec>()
|| plan.as_any().is::<ArrowExec>()
|| plan.as_any().is::<AvroExec>()
|| plan.as_any().is::<CsvExecExec>()
|| plan.as_any().is::<NdJsonExec>()
|| plan.as_any().is::<StatisticsExec>()
|| plan.as_any().is::<PlaceholderRowExec>()
|| plan.as_any().is::<RecursiveQueryExec>()
|| plan.as_any().is::<StreamingTableExec>()
|| plan.as_any().is::<InterleaveExec>()
|| plan.as_any().is::<UnnestExec>()
|| plan.as_any().is::<WindowAggExec>()
}

0 comments on commit 7c07545

Please sign in to comment.