Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: logical Node for find files #2194

Merged
merged 13 commits into from
Mar 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions crates/core/src/delta_datafusion/find_files/logical.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use std::collections::HashSet;

use datafusion_common::DFSchemaRef;
use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};

#[derive(Debug, Hash, Eq, PartialEq, Clone)]
pub struct FindFilesNode {
pub id: String,
pub input: LogicalPlan,
pub predicates: Vec<Expr>,
pub files: Vec<String>,
pub schema: DFSchemaRef,
}
Copy link
Collaborator

@Blajda Blajda Feb 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The internal of this structure should be opaque and corresponding new functions should be made.
Users will not provide the list of files to scan, instead they will provide some reference to the DeltaTable (i.e EagerSnapshot). From that snapshot you can obtain the schema and files.

You also shouldn't need input here. This operation should function as a source.


impl UserDefinedLogicalNodeCore for FindFilesNode {
fn name(&self) -> &str {
"FindFiles"
}

fn inputs(&self) -> Vec<&LogicalPlan> {
vec![]
}

fn schema(&self) -> &DFSchemaRef {
&self.schema
}

fn expressions(&self) -> Vec<Expr> {
vec![]
}

fn prevent_predicate_push_down_columns(&self) -> HashSet<String> {
HashSet::new()
}

fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"FindFiles id={}, predicate={:?}, files={:?}",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having files here will be a bit too verbose here. Would be better to print the version of the table.

&self.id, self.predicates, self.files
)
}

fn from_template(&self, _exprs: &[Expr], _inputs: &[LogicalPlan]) -> Self {
self.clone()
}
}
160 changes: 160 additions & 0 deletions crates/core/src/delta_datafusion/find_files/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
use std::sync::Arc;

use arrow_array::{RecordBatch, StringArray};
use arrow_array::cast::AsArray;
use arrow_schema::{DataType, Field, Schema};
use arrow_schema::SchemaBuilder;
use async_trait::async_trait;
use datafusion::execution::context::{QueryPlanner, SessionState};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use datafusion_common::Result;
use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode};

use crate::delta_datafusion::find_files::logical::FindFilesNode;
use crate::delta_datafusion::find_files::physical::FindFilesExec;
use crate::delta_datafusion::PATH_COLUMN;

pub mod logical;
pub mod physical;

#[inline]
fn only_file_path_schema() -> Arc<Schema> {
let mut builder = SchemaBuilder::new();
builder.push(Field::new(PATH_COLUMN, DataType::Utf8, false));
Arc::new(builder.finish())
}

struct FindFilesPlannerExtension {}

struct FindFilesPlanner {}

#[async_trait]
impl ExtensionPlanner for FindFilesPlannerExtension {
async fn plan_extension(
&self,
_planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
_logical_inputs: &[&LogicalPlan],
_physical_inputs: &[Arc<dyn ExecutionPlan>],
_session_state: &SessionState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if let Some(node) = node.as_any().downcast_ref::<FindFilesNode>() {
dbg!(&node.files, &node.predicates);
let schema = Arc::new(Schema::from(node.schema.as_ref()));

return Ok(Some(Arc::new(FindFilesExec::new(
node.files.clone(),
node.predicates[0].clone(),
schema,
)?)));
}
Ok(None)
}
}

#[async_trait]
impl QueryPlanner for FindFilesPlanner {
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>> {
let planner = Arc::new(Box::new(DefaultPhysicalPlanner::with_extension_planners(
vec![Arc::new(FindFilesPlannerExtension {})],
)));
planner
.create_physical_plan(logical_plan, session_state)
.await
}
}

async fn scan_memory_table_batch(batch: RecordBatch, predicate: Expr) -> Result<RecordBatch> {
let ctx = SessionContext::new();
let mut batches = vec![];

if let Some(column) = batch.column_by_name(PATH_COLUMN) {
let mut column_iter = column.as_string::<i32>().into_iter();
while let Some(Some(row)) = column_iter.next() {
let df = ctx
.read_parquet(row, ParquetReadOptions::default())
.await?
.filter(predicate.to_owned())?;
if df.count().await? > 0 {
batches.push(row);
}
}
}
let str_array = Arc::new(StringArray::from(batches));
RecordBatch::try_new(only_file_path_schema(), vec![str_array]).map_err(Into::into)
Copy link
Collaborator

@Blajda Blajda Feb 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple enhancements can be made here. The current implementation of find_files is able to perform a projection which reads only data required to evaluate the predicate. Keeping the projection behavior is a must since it reduces how much is read.

Something else to consider here is that FindFiles only needs to know if at least one record satisfies the predicate hence once one match is found the scan of that file can be stopped. Further optimization can be done by having a limit of 1 for the filter.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing I have to add is when it's partition columns we don't need to read the file at all since it can be inferred from the path itself.

}

#[cfg(test)]
pub mod tests {
use std::sync::Arc;

use arrow_cast::pretty::print_batches;
use arrow_schema::{DataType, Field, Fields, Schema, SchemaBuilder};
use datafusion::prelude::{DataFrame, SessionContext};
use datafusion_common::ToDFSchema;
use datafusion_expr::{col, Extension, lit, LogicalPlan, LogicalPlanBuilder};

use crate::{DeltaOps, DeltaResult};
use crate::delta_datafusion::find_files::FindFilesPlanner;
use crate::delta_datafusion::find_files::logical::FindFilesNode;
use crate::delta_datafusion::PATH_COLUMN;
use crate::operations::collect_sendable_stream;
use crate::writer::test_utils::{create_bare_table, get_record_batch};

#[inline]
fn find_files_schema(fields: &Fields) -> Arc<Schema> {
let mut builder = SchemaBuilder::from(fields);
builder.reverse();
builder.push(Field::new(PATH_COLUMN, DataType::Utf8, false));
builder.reverse();
Arc::new(builder.finish())
}

async fn make_table() -> DeltaOps {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.await
.unwrap();
DeltaOps(write)
}

#[tokio::test]
pub async fn test_find_files() -> DeltaResult<()> {
let ctx = SessionContext::new();
let state = ctx
.state()
.with_query_planner(Arc::new(FindFilesPlanner {}));
let table = make_table().await;
let files = table.0.get_file_uris()?.collect::<Vec<String>>();
let plan = LogicalPlanBuilder::empty(false).build()?;

let schema = find_files_schema(table.0.snapshot()?.arrow_schema()?.fields()).to_dfschema_ref()?;
let find_files_node = LogicalPlan::Extension(Extension {
node: Arc::new(FindFilesNode {
id: "my_cool_id".to_string(),
input: plan,
predicates: vec![col("id").eq(lit("A"))],
files,
schema,
}),
});
let df = DataFrame::new(state.clone(), find_files_node);
let p = state
.clone()
.create_physical_plan(df.logical_plan())
.await
.unwrap();

let e = p.execute(0, state.task_ctx())?;
let s = collect_sendable_stream(e).await.unwrap();
print_batches(&s)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should assert on the final output result here. See other operations for an example of comparing a string batch representation to a batch.

Ok(())
}
}
124 changes: 124 additions & 0 deletions crates/core/src/delta_datafusion/find_files/physical.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use std::any::Any;
use std::fmt::{Debug, Formatter};

use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use arrow_array::{RecordBatch, StringArray};
use arrow_schema::SchemaRef;
use datafusion::error::Result;
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
use datafusion_expr::Expr;
use datafusion_physical_expr::{Partitioning, PhysicalSortExpr};
use futures::stream::BoxStream;
use futures::{Stream, StreamExt, TryStreamExt};

use crate::delta_datafusion::find_files::{only_file_path_schema, scan_memory_table_batch};

pub struct FindFilesExec {
schema: SchemaRef,
files: Vec<String>,
predicate: Expr,
}

impl FindFilesExec {
pub fn new(files: Vec<String>, predicate: Expr, schema: SchemaRef) -> Result<Self> {
Ok(Self {
schema,
files,
predicate,
})
}
}

struct FindFilesStream<'a> {
mem_stream: BoxStream<'a, Result<RecordBatch>>,
}

impl<'a> FindFilesStream<'a> {
pub fn new(mem_stream: BoxStream<'a, Result<RecordBatch>>) -> Result<Self> {
Ok(Self { mem_stream })
}
}

impl<'a> RecordBatchStream for FindFilesStream<'a> {
fn schema(&self) -> SchemaRef {
only_file_path_schema()
}
}

impl<'a> Stream for FindFilesStream<'a> {
type Item = Result<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.as_mut().mem_stream.poll_next_unpin(cx)
}
}

impl Debug for FindFilesExec {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"FindFilesExec[schema={:?}, files={:?}]",
self.schema, self.files
)
}
}

impl DisplayAs for FindFilesExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"FindFilesExec[schema={:?}, files={:?}]",
self.schema, self.files
)
}
}

impl ExecutionPlan for FindFilesExec {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This operation should not be limited a single partition. I.e think of each partition as a cpu thread here ideally we should be able to divide the files being scanned to each available cpu thread.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just a lazy initial implementation, I can fix that

}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}

fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}

fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let array = Arc::new(StringArray::from(self.files.clone()));
let record_batch = RecordBatch::try_new(only_file_path_schema(), vec![array])?;
let predicate = self.predicate.clone();
let mem_stream =
MemoryStream::try_new(vec![record_batch.clone()], only_file_path_schema(), None)?
.and_then(move |batch| scan_memory_table_batch(batch, predicate.clone()))
.boxed();

Ok(Box::pin(FindFilesStream::new(mem_stream)?))
}
}
5 changes: 4 additions & 1 deletion crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ pub mod expr;
pub mod logical;
pub mod physical;

mod find_files;

impl From<DeltaTableError> for DataFusionError {
fn from(err: DeltaTableError) -> Self {
match err {
Expand Down Expand Up @@ -1145,6 +1147,7 @@ impl TreeNodeVisitor for FindFilesExprProperties {
}
}

#[derive(Debug, Hash, Eq, PartialEq)]
/// Representing the result of the [find_files] function.
pub struct FindFiles {
/// A list of `Add` objects that match the given predicate
Expand Down Expand Up @@ -1198,7 +1201,7 @@ fn join_batches_with_add_actions(
Ok(files)
}

/// Determine which files contain a record that statisfies the predicate
/// Determine which files contain a record that satisfies the predicate
pub(crate) async fn find_files_scan<'a>(
snapshot: &DeltaTableState,
log_store: LogStoreRef,
Expand Down
8 changes: 4 additions & 4 deletions crates/core/src/table/state_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl DeltaTableState {
.fields
.iter()
.map(|field| Cow::Owned(field.name().clone()))
.zip(partition_cols_batch.columns().iter().map(Arc::clone)),
.zip(partition_cols_batch.columns().iter().cloned()),
)
}

Expand All @@ -103,7 +103,7 @@ impl DeltaTableState {
.fields
.iter()
.map(|field| Cow::Owned(field.name().clone()))
.zip(stats.columns().iter().map(Arc::clone)),
.zip(stats.columns().iter().cloned()),
);
}
if files.iter().any(|add| add.deletion_vector.is_some()) {
Expand All @@ -114,7 +114,7 @@ impl DeltaTableState {
.fields
.iter()
.map(|field| Cow::Owned(field.name().clone()))
.zip(delvs.columns().iter().map(Arc::clone)),
.zip(delvs.columns().iter().cloned()),
);
}
if files.iter().any(|add| {
Expand All @@ -129,7 +129,7 @@ impl DeltaTableState {
.fields
.iter()
.map(|field| Cow::Owned(field.name().clone()))
.zip(tags.columns().iter().map(Arc::clone)),
.zip(tags.columns().iter().cloned()),
);
}

Expand Down
Loading