Skip to content

Commit

Permalink
Make StructType and DataType::struct_type more general
Browse files Browse the repository at this point in the history
  • Loading branch information
scovich committed Oct 9, 2024
1 parent 1c4b9ce commit 6ca7ce2
Show file tree
Hide file tree
Showing 14 changed files with 115 additions and 130 deletions.
4 changes: 2 additions & 2 deletions derive-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream
impl crate::actions::schemas::ToDataType for #struct_ident {
fn to_data_type() -> crate::schema::DataType {
use crate::actions::schemas::{ToDataType, GetStructField, GetNullableContainerStructField};
crate::schema::StructType::new(vec![
crate::schema::DataType::struct_type([
#schema_fields
]).into()
])
}
}
};
Expand Down
5 changes: 2 additions & 3 deletions kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ fn main() {
println!("{:#?}", files);
}
Commands::Actions { forward } => {
let action_types = vec![
let action_types = [
//ActionType::CommitInfo,
ActionType::Metadata,
ActionType::Protocol,
Expand All @@ -109,8 +109,7 @@ fn main() {
action_types
.iter()
.map(|a| a.schema_field())
.cloned()
.collect(),
.cloned(),
));

let batches = snapshot
Expand Down
24 changes: 10 additions & 14 deletions kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,21 +154,17 @@ fn try_main() -> DeltaResult<()> {
// process the columns requested and build a schema from them
let read_schema_opt = cli
.columns
.map(|cols| {
use itertools::Itertools;
.map(|cols| -> DeltaResult<_> {
let table_schema = snapshot.schema();
let selected_fields = cols
.iter()
.map(|col| {
table_schema
.field(col)
.cloned()
.ok_or(delta_kernel::Error::Generic(format!(
"Table has no such column: {col}"
)))
})
.try_collect();
selected_fields.map(|selected_fields| Arc::new(Schema::new(selected_fields)))
let selected_fields = cols.iter().map(|col| {
table_schema
.field(col)
.cloned()
.ok_or(delta_kernel::Error::Generic(format!(
"Table has no such column: {col}"
)))
});
Schema::try_new(selected_fields).map(Arc::new)
})
.transpose()?;

Expand Down
23 changes: 10 additions & 13 deletions kernel/examples/read-table-single-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,17 @@ fn try_main() -> DeltaResult<()> {

let read_schema_opt = cli
.columns
.map(|cols| {
.map(|cols| -> DeltaResult<_> {
let table_schema = snapshot.schema();
let selected_fields = cols
.iter()
.map(|col| {
table_schema
.field(col)
.cloned()
.ok_or(delta_kernel::Error::Generic(format!(
"Table has no such column: {col}"
)))
})
.try_collect();
selected_fields.map(|selected_fields| Arc::new(Schema::new(selected_fields)))
let selected_fields = cols.iter().map(|col| {
table_schema
.field(col)
.cloned()
.ok_or(delta_kernel::Error::Generic(format!(
"Table has no such column: {col}"
)))
});
Schema::try_new(selected_fields).map(Arc::new)
})
.transpose()?;
let scan = snapshot
Expand Down
24 changes: 12 additions & 12 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub(crate) const PROTOCOL_NAME: &str = "protocol";
pub(crate) const TRANSACTION_NAME: &str = "txn";

static LOG_SCHEMA: LazyLock<StructType> = LazyLock::new(|| {
StructType::new(vec![
StructType::new([
Option::<Add>::get_struct_field(ADD_NAME),
Option::<Remove>::get_struct_field(REMOVE_NAME),
Option::<Metadata>::get_struct_field(METADATA_NAME),
Expand Down Expand Up @@ -261,15 +261,15 @@ mod tests {
.project(&["metaData"])
.expect("Couldn't get metaData field");

let expected = Arc::new(StructType::new(vec![StructField::new(
let expected = Arc::new(StructType::new([StructField::new(
"metaData",
StructType::new(vec![
StructType::new([
StructField::new("id", DataType::STRING, false),
StructField::new("name", DataType::STRING, true),
StructField::new("description", DataType::STRING, true),
StructField::new(
"format",
StructType::new(vec![
StructType::new([
StructField::new("provider", DataType::STRING, false),
StructField::new(
"options",
Expand Down Expand Up @@ -303,9 +303,9 @@ mod tests {
.project(&["add"])
.expect("Couldn't get add field");

let expected = Arc::new(StructType::new(vec![StructField::new(
let expected = Arc::new(StructType::new([StructField::new(
"add",
StructType::new(vec![
StructType::new([
StructField::new("path", DataType::STRING, false),
StructField::new(
"partitionValues",
Expand Down Expand Up @@ -350,13 +350,13 @@ mod tests {
fn deletion_vector_field() -> StructField {
StructField::new(
"deletionVector",
DataType::Struct(Box::new(StructType::new(vec![
DataType::struct_type([
StructField::new("storageType", DataType::STRING, false),
StructField::new("pathOrInlineDv", DataType::STRING, false),
StructField::new("offset", DataType::INTEGER, true),
StructField::new("sizeInBytes", DataType::INTEGER, false),
StructField::new("cardinality", DataType::LONG, false),
]))),
]),
true,
)
}
Expand All @@ -366,9 +366,9 @@ mod tests {
let schema = get_log_schema()
.project(&["remove"])
.expect("Couldn't get remove field");
let expected = Arc::new(StructType::new(vec![StructField::new(
let expected = Arc::new(StructType::new([StructField::new(
"remove",
StructType::new(vec![
StructType::new([
StructField::new("path", DataType::STRING, false),
StructField::new("deletionTimestamp", DataType::LONG, true),
StructField::new("dataChange", DataType::BOOLEAN, false),
Expand All @@ -391,9 +391,9 @@ mod tests {
.project(&["txn"])
.expect("Couldn't get transaction field");

let expected = Arc::new(StructType::new(vec![StructField::new(
let expected = Arc::new(StructType::new([StructField::new(
"txn",
StructType::new(vec![
StructType::new([
StructField::new("appId", DataType::STRING, false),
StructField::new("version", DataType::LONG, false),
StructField::new("lastUpdated", DataType::LONG, true),
Expand Down
20 changes: 7 additions & 13 deletions kernel/src/engine/arrow_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,12 @@ impl TryFrom<&ArrowSchema> for StructType {
type Error = ArrowError;

fn try_from(arrow_schema: &ArrowSchema) -> Result<Self, ArrowError> {
let new_fields: Result<Vec<StructField>, _> = arrow_schema
.fields()
.iter()
.map(|field| field.as_ref().try_into())
.collect();
Ok(StructType::new(new_fields?))
StructType::try_new(
arrow_schema
.fields()
.iter()
.map(|field| field.as_ref().try_into()),
)
}
}

Expand Down Expand Up @@ -211,13 +211,7 @@ impl TryFrom<&ArrowDataType> for DataType {
Ok(DataType::TIMESTAMP)
}
ArrowDataType::Struct(fields) => {
let converted_fields: Result<Vec<StructField>, _> = fields
.iter()
.map(|field| field.as_ref().try_into())
.collect();
Ok(DataType::Struct(Box::new(StructType::new(
converted_fields?,
))))
DataType::try_struct_type(fields.iter().map(|field| field.as_ref().try_into()))
}
ArrowDataType::List(field) => Ok(DataType::Array(Box::new(ArrayType::new(
(*field).data_type().try_into()?,
Expand Down
10 changes: 5 additions & 5 deletions kernel/src/engine/arrow_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ mod tests {
let field = Arc::new(Field::new("item", DataType::Int32, true));
let arr_field = Arc::new(Field::new("item", DataType::List(field.clone()), true));

let schema = Schema::new(vec![arr_field.clone()]);
let schema = Schema::new([arr_field.clone()]);

let array = ListArray::new(field.clone(), offsets, Arc::new(values), None);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array.clone())]).unwrap();
Expand Down Expand Up @@ -472,7 +472,7 @@ mod tests {
fn test_bad_right_type_array() {
let values = Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
let field = Arc::new(Field::new("item", DataType::Int32, true));
let schema = Schema::new(vec![field.clone()]);
let schema = Schema::new([field.clone()]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(values.clone())]).unwrap();

let in_op = Expression::binary(
Expand All @@ -493,7 +493,7 @@ mod tests {
#[test]
fn test_literal_type_array() {
let field = Arc::new(Field::new("item", DataType::Int32, true));
let schema = Schema::new(vec![field.clone()]);
let schema = Schema::new([field.clone()]);
let batch = RecordBatch::new_empty(Arc::new(schema));

let in_op = Expression::binary(
Expand All @@ -517,7 +517,7 @@ mod tests {
let field = Arc::new(Field::new("item", DataType::Int32, true));
let arr_field = Arc::new(Field::new("item", DataType::List(field.clone()), true));

let schema = Schema::new(vec![arr_field.clone()]);
let schema = Schema::new([arr_field.clone()]);

let array = ListArray::new(field.clone(), offsets, Arc::new(values), None);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array.clone())]).unwrap();
Expand Down Expand Up @@ -545,7 +545,7 @@ mod tests {
let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 3, 6, 9]));
let field = Arc::new(Field::new("item", DataType::Utf8, true));
let arr_field = Arc::new(Field::new("item", DataType::List(field.clone()), true));
let schema = Schema::new(vec![arr_field.clone()]);
let schema = Schema::new([arr_field.clone()]);
let array = ListArray::new(field.clone(), offsets, Arc::new(values), None);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array.clone())]).unwrap();

Expand Down
Loading

0 comments on commit 6ca7ce2

Please sign in to comment.