Skip to content

Commit

Permalink
Clean up list serialization code (#3547)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Oct 8, 2022
1 parent 488b2ce commit e54110f
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 535 deletions.
16 changes: 4 additions & 12 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,9 @@ message Union{
}

message ScalarListValue{
// encode null explicitly to distinguish a list with a null value
// from a list with no values)
bool is_null = 3;
Field field = 1;
repeated ScalarValue values = 2;
}
Expand Down Expand Up @@ -768,7 +771,7 @@ message ScalarValue{
//Literal Date32 value always has a unit of day
int32 date_32_value = 14;
ScalarListValue list_value = 17;
ScalarType null_list_value = 18;
//WAS: ScalarType null_list_value = 18;

Decimal128 decimal128_value = 20;
int64 date_64_value = 21;
Expand Down Expand Up @@ -825,17 +828,6 @@ enum PrimitiveScalarType{
TIME64 = 27;
}

message ScalarType{
oneof datatype{
PrimitiveScalarType scalar = 1;
ScalarListType list = 2;
}
}

message ScalarListType{
repeated string field_names = 3;
PrimitiveScalarType deepest_type = 2;
}

// Broke out into multiple message types so that type
// metadata did not need to be in separate message
Expand Down
123 changes: 28 additions & 95 deletions datafusion/proto/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,6 @@ impl Error {
Error::MissingRequiredField(field.into())
}

fn at_least_one(field: impl Into<String>) -> Error {
Error::AtLeastOneValue(field.into())
}

fn unknown(name: impl Into<String>, value: i32) -> Error {
Error::UnknownEnumVariant {
name: name.into(),
Expand Down Expand Up @@ -559,56 +555,6 @@ impl TryFrom<&i32> for protobuf::AggregateFunction {
}
}

impl TryFrom<&protobuf::scalar_type::Datatype> for DataType {
type Error = Error;

fn try_from(
scalar_type: &protobuf::scalar_type::Datatype,
) -> Result<Self, Self::Error> {
use protobuf::scalar_type::Datatype;

Ok(match scalar_type {
Datatype::Scalar(scalar_type) => {
protobuf::PrimitiveScalarType::try_from(scalar_type)?.into()
}
Datatype::List(protobuf::ScalarListType {
deepest_type,
field_names,
}) => {
if field_names.is_empty() {
return Err(Error::at_least_one("field_names"));
}
let field_type =
protobuf::PrimitiveScalarType::try_from(deepest_type)?.into();
// Because length is checked above it is safe to unwrap .last()
let mut scalar_type = DataType::List(Box::new(Field::new(
field_names.last().unwrap().as_str(),
field_type,
true,
)));
// Iterate over field names in reverse order except for the last item in the vector
for name in field_names.iter().rev().skip(1) {
let new_datatype = DataType::List(Box::new(Field::new(
name.as_str(),
scalar_type,
true,
)));
scalar_type = new_datatype;
}
scalar_type
}
})
}
}

impl TryFrom<&protobuf::ScalarType> for DataType {
type Error = Error;

fn try_from(scalar: &protobuf::ScalarType) -> Result<Self, Self::Error> {
scalar.datatype.as_ref().required("datatype")
}
}

impl TryFrom<&protobuf::Schema> for Schema {
type Error = Error;

Expand Down Expand Up @@ -676,36 +622,6 @@ impl TryFrom<&protobuf::PrimitiveScalarType> for ScalarValue {
}
}

impl TryFrom<&protobuf::ScalarListType> for DataType {
type Error = Error;
fn try_from(scalar: &protobuf::ScalarListType) -> Result<Self, Self::Error> {
use protobuf::PrimitiveScalarType;

let protobuf::ScalarListType {
deepest_type,
field_names,
} = scalar;

let depth = field_names.len();
if depth == 0 {
return Err(Error::at_least_one("field_names"));
}

let mut curr_type = Self::List(Box::new(Field::new(
// Since checked vector is not empty above this is safe to unwrap
field_names.last().unwrap(),
PrimitiveScalarType::try_from(deepest_type)?.into(),
true,
)));
// Iterates over field names in reverse order except for the last item in the vector
for name in field_names.iter().rev().skip(1) {
let temp_curr_type = Self::List(Box::new(Field::new(name, curr_type, true)));
curr_type = temp_curr_type;
}
Ok(curr_type)
}
}

impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
type Error = Error;

Expand Down Expand Up @@ -734,23 +650,23 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
Value::Date32Value(v) => Self::Date32(Some(*v)),
Value::ListValue(scalar_list) => {
let protobuf::ScalarListValue {
is_null,
values,
field: opt_field,
field,
} = &scalar_list;

let field = opt_field.as_ref().required("field")?;
let field: Field = field.as_ref().required("field")?;
let field = Box::new(field);

let typechecked_values: Vec<ScalarValue> = values
.iter()
.map(|val| val.try_into())
.collect::<Result<Vec<_>, _>>()?;
let values: Result<Vec<ScalarValue>, Error> =
values.iter().map(|val| val.try_into()).collect();
let values = values?;

Self::List(Some(typechecked_values), field)
}
Value::NullListValue(v) => {
let field = Field::new("item", v.try_into()?, true);
Self::List(None, Box::new(field))
validate_list_values(field.as_ref(), &values)?;

let values = if *is_null { None } else { Some(values) };

Self::List(values, field)
}
Value::NullValue(v) => {
let null_type_enum = protobuf::PrimitiveScalarType::try_from(v)?;
Expand Down Expand Up @@ -840,6 +756,23 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
}
}

/// Ensures that all `values` are of type DataType::List and have the
/// same type as field
fn validate_list_values(field: &Field, values: &[ScalarValue]) -> Result<(), Error> {
for value in values {
let field_type = field.data_type();
let value_type = value.get_datatype();

if field_type != &value_type {
return Err(proto_error(format!(
"Expected field type {:?}, got scalar of type: {:?}",
field_type, value_type
)));
}
}
Ok(())
}

pub fn parse_expr(
proto: &protobuf::LogicalExprNode,
registry: &dyn FunctionRegistry,
Expand Down
Loading

0 comments on commit e54110f

Please sign in to comment.