Skip to content

Commit

Permalink
[CHORE] Pass in file size and num rows to Rust query planner (#1282)
Browse files Browse the repository at this point in the history
Currently, file metadata (size, numrows) is computed before constructing
the query plan. Then it's passed into the Python query planner. This PR
passes it into the Rust query planner too.

Eventually, we will likely want to dynamically get metadata at query
execution time; this PR is a feature parity stopgap only.

Manually verified that source scan tasks are being emitted with memory
requirements now.

---------

Co-authored-by: Xiayue Charles Lin <charles@eventualcomputing.com>
  • Loading branch information
xcharleslin and Xiayue Charles Lin authored Aug 21, 2023
1 parent 857162c commit 3c49bb5
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 21 deletions.
5 changes: 4 additions & 1 deletion daft/logical/rust_logical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,11 @@ def from_tabular_scan(
)
paths_details = file_info_partition_set.to_pydict()
filepaths = paths_details[runner_io.FS_LISTING_PATH_COLUMN_NAME]
filesizes = paths_details[runner_io.FS_LISTING_SIZE_COLUMN_NAME]
filerows = paths_details[runner_io.FS_LISTING_ROWS_COLUMN_NAME]

rs_schema = inferred_or_provided_schema._schema
builder = _LogicalPlanBuilder.table_scan(filepaths, rs_schema, file_format_config)
builder = _LogicalPlanBuilder.table_scan(filepaths, filesizes, filerows, rs_schema, file_format_config)
return cls(builder)

def project(
Expand Down
4 changes: 3 additions & 1 deletion src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@ impl LogicalPlanBuilder {
#[staticmethod]
pub fn table_scan(
file_paths: Vec<String>,
file_sizes: Vec<Option<i64>>,
file_rows: Vec<Option<i64>>,
schema: &PySchema,
file_format_config: PyFileFormatConfig,
) -> PyResult<LogicalPlanBuilder> {
let num_partitions = file_paths.len();
let source_info = SourceInfo::ExternalInfo(ExternalSourceInfo::new(
schema.schema.clone(),
InputFileInfo::new(file_paths, None, None, None).into(),
InputFileInfo::new(file_paths, file_sizes, file_rows).into(),
file_format_config.into(),
));
let partition_spec = PartitionSpec::new(PartitionScheme::Unknown, num_partitions, None);
Expand Down
4 changes: 2 additions & 2 deletions src/daft-plan/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult<PhysicalPlan> {
PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into()))
};

let _second_stage_agg = PhysicalPlan::Aggregate(Aggregate::new(
let second_stage_agg = PhysicalPlan::Aggregate(Aggregate::new(
gather_plan.into(),
second_stage_aggs.values().cloned().collect(),
groupby.clone(),
Expand All @@ -379,7 +379,7 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult<PhysicalPlan> {
PhysicalPlan::Project(Project::new(
final_exprs,
Default::default(),
_second_stage_agg.into(),
second_stage_agg.into(),
))
}
};
Expand Down
23 changes: 6 additions & 17 deletions src/daft-plan/src/source_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,23 +140,20 @@ impl ExternalInfo {
#[derive(Debug, Serialize, Deserialize)]
pub struct FileInfo {
pub file_paths: Vec<String>,
pub file_sizes: Option<Vec<i64>>,
pub num_rows: Option<Vec<i64>>,
pub file_formats: Option<Vec<FileFormat>>,
pub file_sizes: Vec<Option<i64>>,
pub num_rows: Vec<Option<i64>>,
}

impl FileInfo {
pub fn new(
file_paths: Vec<String>,
file_sizes: Option<Vec<i64>>,
num_rows: Option<Vec<i64>>,
file_formats: Option<Vec<FileFormat>>,
file_sizes: Vec<Option<i64>>,
num_rows: Vec<Option<i64>>,
) -> Self {
Self {
file_paths,
file_sizes,
num_rows,
file_formats,
}
}
pub fn to_table(&self) -> DaftResult<Table> {
Expand All @@ -170,11 +167,7 @@ impl FileInfo {
))?,
Series::try_from((
"size",
arrow2::array::PrimitiveArray::<i64>::new_null(
arrow2::datatypes::DataType::Int64,
num_files,
)
.to_boxed(),
arrow2::array::PrimitiveArray::<i64>::from(&self.file_sizes).to_boxed(),
))?,
Series::try_from((
"type",
Expand All @@ -186,11 +179,7 @@ impl FileInfo {
))?,
Series::try_from((
"rows",
arrow2::array::PrimitiveArray::<i64>::new_null(
arrow2::datatypes::DataType::Int64,
num_files,
)
.to_boxed(),
arrow2::array::PrimitiveArray::<i64>::from(&self.num_rows).to_boxed(),
))?,
];
Table::new(
Expand Down

0 comments on commit 3c49bb5

Please sign in to comment.