Skip to content

Commit

Permalink
feat: add partitons schema function to snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Aug 10, 2024
1 parent da824cd commit 6e61c75
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 2 deletions.
1 change: 0 additions & 1 deletion crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ pub(crate) fn files_matching_predicate<'a>(
if let Some(Some(predicate)) =
(!filters.is_empty()).then_some(conjunction(filters.iter().cloned()))
{
//let expr = logical_expr_to_physical_expr(predicate, snapshot.arrow_schema()?.as_ref());
let expr = SessionContext::new()
.create_physical_expr(predicate, &snapshot.arrow_schema()?.to_dfschema()?)?;
let pruning_predicate = PruningPredicate::try_new(expr, snapshot.arrow_schema()?)?;
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,9 @@ impl DeltaTableError {
);
Self::NotATable(msg)
}

/// Create a [Generic](DeltaTableError::Generic) error with the given message.
pub fn generic(msg: impl ToString) -> Self {
Self::Generic(msg.to_string())
}
}
118 changes: 117 additions & 1 deletion crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,31 @@ impl Snapshot {
),
]))
}

/// Get the partition values schema of the snapshot
pub fn partitions_schema(
&self,
table_schema: Option<&StructType>,
) -> DeltaResult<Option<StructType>> {
if self.metadata().partition_columns.is_empty() {
return Ok(None);
}
let schema = table_schema.unwrap_or_else(|| self.schema());
Ok(Some(StructType::new(
self.metadata
.partition_columns
.iter()
.map(|col| {
schema.field(col).map(|field| field.clone()).ok_or_else(|| {
DeltaTableError::Generic(format!(
"Partition column {} not found in schema",
col
))
})
})
.collect::<Result<Vec<_>, _>>()?,
)))
}
}

/// A snapshot of a Delta table that has been eagerly loaded into memory.
Expand All @@ -369,7 +394,7 @@ pub struct EagerSnapshot {

// NOTE: this is a Vec of RecordBatch instead of a single RecordBatch because
// we do not yet enforce a consistent schema across all batches we read from the log.
files: Vec<RecordBatch>,
pub(crate) files: Vec<RecordBatch>,
}

impl EagerSnapshot {
Expand Down Expand Up @@ -956,4 +981,95 @@ mod tests {

Ok(())
}

#[test]
fn test_partition_schema() {
let schema = StructType::new(vec![
StructField::new("id", DataType::LONG, true),
StructField::new("name", DataType::STRING, true),
StructField::new("date", DataType::DATE, true),
]);

let partition_columns = vec!["date".to_string()];
let metadata = Metadata {
id: "id".to_string(),
name: None,
description: None,
format: Default::default(),
schema_string: serde_json::to_string(&schema).unwrap(),
partition_columns,
configuration: Default::default(),
created_time: None,
};

let protocol = Protocol {
min_reader_version: 1,
min_writer_version: 2,
reader_features: Default::default(),
writer_features: Default::default(),
};
let commit_data = CommitData::new(
vec![
Action::Protocol(protocol.clone()),
Action::Metadata(metadata.clone()),
],
DeltaOperation::Write {
mode: SaveMode::Append,
partition_by: None,
predicate: None,
},
HashMap::new(),
vec![],
);
let (log_segment, _) = LogSegment::new_test(vec![&commit_data]).unwrap();

let snapshot = Snapshot {
log_segment: log_segment.clone(),
config: Default::default(),
protocol: Default::default(),
metadata,
schema: schema.clone(),
table_url: "table".to_string(),
};

let expected = StructType::new(vec![StructField::new("date", DataType::DATE, true)]);
assert_eq!(snapshot.partitions_schema(None).unwrap(), Some(expected));

let metadata = Metadata {
id: "id".to_string(),
name: None,
description: None,
format: Default::default(),
schema_string: serde_json::to_string(&schema).unwrap(),
partition_columns: vec![],
configuration: Default::default(),
created_time: None,
};

let commit_data = CommitData::new(
vec![
Action::Protocol(protocol.clone()),
Action::Metadata(metadata.clone()),
],
DeltaOperation::Write {
mode: SaveMode::Append,
partition_by: None,
predicate: None,
},
HashMap::new(),
vec![],
);
let (log_segment, _) = LogSegment::new_test(vec![&commit_data]).unwrap();

let snapshot = Snapshot {
log_segment,
config: Default::default(),
protocol: protocol.clone(),
metadata,
schema: schema.clone(),
table_url: "table".to_string(),
};

assert_eq!(snapshot.partitions_schema(None).unwrap(), None);
}
}

0 comments on commit 6e61c75

Please sign in to comment.