Skip to content

Commit

Permalink
chore: Remove IR info from DSL (#18712)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Sep 12, 2024
1 parent 470d8c4 commit cdd64d0
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 67 deletions.
1 change: 0 additions & 1 deletion crates/polars-lazy/src/scan/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ impl LazyFileListReader for LazyJsonLineReader {
Ok(LazyFrame::from(DslPlan::Scan {
sources: Arc::new(Mutex::new(self.sources.to_dsl(false))),
file_info: Arc::new(RwLock::new(None)),
predicate: None,
file_options,
scan_type,
}))
Expand Down
14 changes: 1 addition & 13 deletions crates/polars-plan/src/plans/builder_dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use polars_io::HiveOptions;
#[cfg(any(feature = "parquet", feature = "csv", feature = "ipc"))]
use polars_io::RowIndex;

use crate::constants::UNLIMITED_CACHE;
#[cfg(feature = "python")]
use crate::prelude::python_udf::PythonFunction;
use crate::prelude::*;
Expand Down Expand Up @@ -63,7 +62,6 @@ impl DslBuilder {
is_expanded: true,
})),
file_info: Arc::new(RwLock::new(Some(file_info))),
predicate: None,
file_options,
scan_type: FileScan::Anonymous {
function,
Expand Down Expand Up @@ -106,7 +104,6 @@ impl DslBuilder {
Ok(DslPlan::Scan {
sources: Arc::new(Mutex::new(sources)),
file_info: Arc::new(RwLock::new(None)),
predicate: None,
file_options: options,
scan_type: FileScan::Parquet {
options: ParquetOptions {
Expand Down Expand Up @@ -148,7 +145,6 @@ impl DslBuilder {
glob: true,
include_file_paths,
},
predicate: None,
scan_type: FileScan::Ipc {
options,
cloud_options,
Expand Down Expand Up @@ -190,7 +186,6 @@ impl DslBuilder {
sources: Arc::new(Mutex::new(sources)),
file_info: Arc::new(RwLock::new(None)),
file_options: options,
predicate: None,
scan_type: FileScan::Csv {
options: read_options,
cloud_options,
Expand All @@ -202,12 +197,7 @@ impl DslBuilder {
pub fn cache(self) -> Self {
let input = Arc::new(self.0);
let id = input.as_ref() as *const DslPlan as usize;
DslPlan::Cache {
input,
id,
cache_hits: UNLIMITED_CACHE,
}
.into()
DslPlan::Cache { input, id }.into()
}

pub fn drop(self, to_drop: Vec<Selector>, strict: bool) -> Self {
Expand Down Expand Up @@ -321,8 +311,6 @@ impl DslBuilder {
DslPlan::DataFrameScan {
df: Arc::new(df),
schema,
output_schema: None,
filter: None,
}
.into()
}
Expand Down
26 changes: 6 additions & 20 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
DslPlan::Scan {
sources,
file_info,
predicate,
mut file_options,
mut scan_type,
} => {
Expand Down Expand Up @@ -276,9 +275,7 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
file_info: resolved_file_info,
hive_parts,
output_schema: None,
predicate: predicate
.map(|expr| to_expr_ir(expr, ctxt.expr_arena))
.transpose()?,
predicate: None,
scan_type,
file_options,
}
Expand Down Expand Up @@ -374,18 +371,11 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_input!(slice)))?;
IR::Slice { input, offset, len }
},
DslPlan::DataFrameScan {
DslPlan::DataFrameScan { df, schema } => IR::DataFrameScan {
df,
schema,
output_schema,
filter: selection,
} => IR::DataFrameScan {
df,
schema,
output_schema,
filter: selection
.map(|expr| to_expr_ir(expr, ctxt.expr_arena))
.transpose()?,
output_schema: None,
filter: None,
},
DslPlan::Select {
expr,
Expand Down Expand Up @@ -488,17 +478,13 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult

return run_conversion(lp, ctxt, "sort");
},
DslPlan::Cache {
input,
id,
cache_hits,
} => {
DslPlan::Cache { input, id } => {
let input =
to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_input!(cache)))?;
IR::Cache {
input,
id,
cache_hits,
cache_hits: crate::constants::UNLIMITED_CACHE,
}
},
DslPlan::GroupBy {
Expand Down
22 changes: 6 additions & 16 deletions crates/polars-plan/src/plans/conversion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl IR {
sources,
file_info,
hive_parts: _,
predicate,
predicate: _,
scan_type,
output_schema: _,
file_options: options,
Expand All @@ -63,7 +63,6 @@ impl IR {
is_expanded: true,
})),
file_info: Arc::new(RwLock::new(Some(file_info))),
predicate: predicate.map(|e| e.to_expr(expr_arena)),
scan_type,
file_options: options,
},
Expand Down Expand Up @@ -109,14 +108,9 @@ impl IR {
IR::DataFrameScan {
df,
schema,
output_schema,
filter: selection,
} => DslPlan::DataFrameScan {
df,
schema,
output_schema,
filter: selection.map(|e| e.to_expr(expr_arena)),
},
output_schema: _,
filter: _,
} => DslPlan::DataFrameScan { df, schema },
IR::Select {
expr,
input,
Expand Down Expand Up @@ -170,14 +164,10 @@ impl IR {
IR::Cache {
input,
id,
cache_hits,
cache_hits: _,
} => {
let input = Arc::new(convert_to_lp(input, lp_arena));
DslPlan::Cache {
input,
id,
cache_hits,
}
DslPlan::Cache { input, id }
},
IR::GroupBy {
input,
Expand Down
19 changes: 4 additions & 15 deletions crates/polars-plan/src/plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ pub struct DslScanSources {
pub is_expanded: bool,
}

// https://stackoverflow.com/questions/1031076/what-are-projection-and-selection
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum DslPlan {
#[cfg(feature = "python")]
Expand All @@ -75,11 +74,7 @@ pub enum DslPlan {
predicate: Expr,
},
/// Cache the input at this point in the LP
Cache {
input: Arc<DslPlan>,
id: usize,
cache_hits: u32,
},
Cache { input: Arc<DslPlan>, id: usize },
Scan {
sources: Arc<Mutex<DslScanSources>>,
// Option as this is mostly materialized on the IR phase.
Expand All @@ -88,7 +83,6 @@ pub enum DslPlan {
// are used as base of different queries in a loop. That way
// the expensive schema resolving is cached.
file_info: Arc<RwLock<Option<FileInfo>>>,
predicate: Option<Expr>,
file_options: FileScanOptions,
scan_type: FileScan,
},
Expand All @@ -97,9 +91,6 @@ pub enum DslPlan {
DataFrameScan {
df: Arc<DataFrame>,
schema: SchemaRef,
// schema of the projected file
output_schema: Option<SchemaRef>,
filter: Option<Expr>,
},
/// Polars' `select` operation, this can mean projection, but also full data access.
Select {
Expand Down Expand Up @@ -196,9 +187,9 @@ impl Clone for DslPlan {
#[cfg(feature = "python")]
Self::PythonScan { options } => Self::PythonScan { options: options.clone() },
Self::Filter { input, predicate } => Self::Filter { input: input.clone(), predicate: predicate.clone() },
Self::Cache { input, id, cache_hits } => Self::Cache { input: input.clone(), id: id.clone(), cache_hits: cache_hits.clone() },
Self::Scan { sources, file_info, predicate, file_options, scan_type } => Self::Scan { sources: sources.clone(), file_info: file_info.clone(), predicate: predicate.clone(), file_options: file_options.clone(), scan_type: scan_type.clone() },
Self::DataFrameScan { df, schema, output_schema, filter: selection } => Self::DataFrameScan { df: df.clone(), schema: schema.clone(), output_schema: output_schema.clone(), filter: selection.clone() },
Self::Cache { input, id } => Self::Cache { input: input.clone(), id: id.clone() },
Self::Scan { sources, file_info, file_options, scan_type } => Self::Scan { sources: sources.clone(), file_info: file_info.clone(), file_options: file_options.clone(), scan_type: scan_type.clone() },
Self::DataFrameScan { df, schema, } => Self::DataFrameScan { df: df.clone(), schema: schema.clone(), },
Self::Select { expr, input, options } => Self::Select { expr: expr.clone(), input: input.clone(), options: options.clone() },
Self::GroupBy { input, keys, aggs, apply, maintain_order, options } => Self::GroupBy { input: input.clone(), keys: keys.clone(), aggs: aggs.clone(), apply: apply.clone(), maintain_order: maintain_order.clone(), options: options.clone() },
Self::Join { input_left, input_right, left_on, right_on, predicates, options } => Self::Join { input_left: input_left.clone(), input_right: input_right.clone(), left_on: left_on.clone(), right_on: right_on.clone(), options: options.clone(), predicates: predicates.clone() },
Expand All @@ -223,8 +214,6 @@ impl Default for DslPlan {
DslPlan::DataFrameScan {
df: Arc::new(df),
schema: Arc::new(schema),
output_schema: None,
filter: None,
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions py-polars/polars/lazyframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,8 +699,6 @@ def serialize(
>>> lf = pl.LazyFrame({"a": [1, 2, 3]}).sum()
>>> bytes = lf.serialize()
>>> bytes # doctest: +ELLIPSIS
b'\xa1kMapFunction\xa2einput\xa1mDataFrameScan\xa4bdf\xa1gcolumns\x81\xa4d...'
The bytes can later be deserialized back into a LazyFrame.
Expand Down

0 comments on commit cdd64d0

Please sign in to comment.