Skip to content

Commit

Permalink
Merge pull request #219 from chmp/fix/216-tracing-with-nulls
Browse files Browse the repository at this point in the history
Fix/216 tracing with nulls
  • Loading branch information
chmp committed Aug 31, 2024
2 parents eb8d37a + b82c134 commit 03c3766
Show file tree
Hide file tree
Showing 6 changed files with 550 additions and 95 deletions.
4 changes: 4 additions & 0 deletions Changes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Change log

## 0.11.7

- Fix tracing of JSON mixing nulls with non-null data

## 0.11.6

- Add `arrow=52` support
Expand Down
29 changes: 28 additions & 1 deletion serde_arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,31 @@ features = [
"serde-with-float",
# NOTE activating this feature breaks JSON -> float processing
# "serde-with-arbitrary-precision",
]
]

[lints.rust.unexpected_cfgs]
level = "warn"
check-cfg = [
'cfg(has_arrow2)',
'cfg(has_arrow2_0_17)',
'cfg(has_arrow2_0_16)',
'cfg(has_arrow)',
'cfg(has_arrow_fixed_binary_support)',
# arrow-version:insert: 'cfg(has_arrow_{version})',
'cfg(has_arrow_52)',
'cfg(has_arrow_51)',
'cfg(has_arrow_50)',
'cfg(has_arrow_49)',
'cfg(has_arrow_48)',
'cfg(has_arrow_47)',
'cfg(has_arrow_46)',
'cfg(has_arrow_45)',
'cfg(has_arrow_44)',
'cfg(has_arrow_43)',
'cfg(has_arrow_42)',
'cfg(has_arrow_41)',
'cfg(has_arrow_40)',
'cfg(has_arrow_39)',
'cfg(has_arrow_38)',
'cfg(has_arrow_37)',
]
182 changes: 89 additions & 93 deletions serde_arrow/src/internal/schema/tracer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,9 @@ impl Tracer {
self.enforce_depth_limit()?;

match self {
this @ Self::Unknown(_) => {
this if matches!(this, Self::Unknown(_))
|| matches!(this, Self::Primitive(ref tracer) if tracer.item_type == GenericDataType::Null) =>
{
let field_names = fields
.iter()
.map(|field| field.to_string())
Expand Down Expand Up @@ -314,7 +316,9 @@ impl Tracer {
self.enforce_depth_limit()?;

match self {
this @ Self::Unknown(_) => {
this if matches!(this, Self::Unknown(_))
|| matches!(this, Self::Primitive(ref tracer) if tracer.item_type == GenericDataType::Null) =>
{
let tracer = dispatch_tracer!(this, tracer => TupleTracer {
name: tracer.name.clone(),
path: tracer.path.clone(),
Expand Down Expand Up @@ -346,7 +350,9 @@ impl Tracer {
self.enforce_depth_limit()?;

match self {
this @ Self::Unknown(_) => {
this if matches!(this, Self::Unknown(_))
|| matches!(this, Self::Primitive(ref tracer) if tracer.item_type == GenericDataType::Null) =>
{
let tracer = dispatch_tracer!(this, tracer => UnionTracer {
name: tracer.name.clone(),
path: tracer.path.clone(),
Expand Down Expand Up @@ -382,7 +388,9 @@ impl Tracer {
self.enforce_depth_limit()?;

match self {
this @ Self::Unknown(_) => {
this if matches!(this, Self::Unknown(_))
|| matches!(this, Self::Primitive(ref tracer) if tracer.item_type == GenericDataType::Null) =>
{
let tracer = dispatch_tracer!(this, tracer => ListTracer {
name: tracer.name.clone(),
path: tracer.path.clone(),
Expand All @@ -409,7 +417,9 @@ impl Tracer {
self.enforce_depth_limit()?;

match self {
this @ Self::Unknown(_) => {
this if matches!(this, Self::Unknown(_))
|| matches!(this, Self::Primitive(ref tracer) if tracer.item_type == GenericDataType::Null) =>
{
let tracer = dispatch_tracer!(this, tracer => MapTracer {
name: tracer.name.clone(),
path: tracer.path.clone(),
Expand Down Expand Up @@ -442,112 +452,98 @@ impl Tracer {
item_type: GenericDataType,
strategy: Option<Strategy>,
) -> Result<()> {
if self.is_unknown() {
let tracer = dispatch_tracer!(self, tracer => PrimitiveTracer::new(
tracer.name.clone(),
tracer.path.clone(),
tracer.options.clone(),
item_type,
tracer.nullable,
))
.with_strategy(strategy);
*self = Self::Primitive(tracer);
} else if let Tracer::Primitive(tracer) = self {
use {
GenericDataType::Date64, GenericDataType::LargeUtf8, Strategy::NaiveStrAsDate64,
Strategy::UtcStrAsDate64,
};
let (item_type, strategy) = match ((&tracer.item_type), (item_type)) {
(Date64, Date64) => match (&tracer.strategy, strategy) {
(Some(NaiveStrAsDate64), Some(NaiveStrAsDate64)) => {
(Date64, Some(NaiveStrAsDate64))
}
(Some(UtcStrAsDate64), Some(UtcStrAsDate64)) => (Date64, Some(UtcStrAsDate64)),
// incompatible strategies, coerce to string
(_, _) => (LargeUtf8, None),
},
(LargeUtf8, _) | (_, LargeUtf8) => (LargeUtf8, None),
(prev_ty, new_ty) => {
fail!("mismatched types, previous {prev_ty}, current {new_ty}")
}
};
tracer.item_type = item_type;
tracer.strategy = strategy;
} else {
let Some(ty) = self.get_type() else {
unreachable!("tracer cannot be unknown");
};
fail!("mismatched types, previous {ty}, current {item_type}");
}
Ok(())
self.ensure_primitive_with_strategy(item_type, strategy)
}

pub fn ensure_primitive(&mut self, item_type: GenericDataType) -> Result<()> {
match self {
this @ Self::Unknown(_) => {
let tracer = dispatch_tracer!(this, tracer => PrimitiveTracer::new(
tracer.name.clone(),
tracer.path.clone(),
tracer.options.clone(),
item_type,
tracer.nullable,
));
*this = Self::Primitive(tracer);
}
Self::Primitive(tracer) if tracer.item_type == item_type => {}
_ => fail!(
"mismatched types, previous {:?}, current {:?}",
self.get_type(),
item_type
),
}
Ok(())
self.ensure_primitive_with_strategy(item_type, None)
}

pub fn ensure_number(&mut self, item_type: GenericDataType) -> Result<()> {
self.ensure_primitive_with_strategy(item_type, None)
}

fn ensure_primitive_with_strategy(
&mut self,
item_type: GenericDataType,
strategy: Option<Strategy>,
) -> Result<()> {
match self {
this @ Self::Unknown(_) => {
let tracer = dispatch_tracer!(this, tracer => PrimitiveTracer::new(
tracer.name.clone(),
tracer.path.clone(),
tracer.options.clone(),
let is_null_type = matches!(item_type, GenericDataType::Null);
let tracer = dispatch_tracer!(this, tracer => PrimitiveTracer {
name: tracer.name.clone(),
path: tracer.path.clone(),
options: tracer.options.clone(),
nullable: tracer.nullable || is_null_type,
item_type,
tracer.nullable,
));
strategy,
});
*this = Self::Primitive(tracer);
}
Self::Primitive(tracer) if tracer.options.coerce_numbers => {
use GenericDataType::{F32, F64, I16, I32, I64, I8, U16, U32, U64, U8};
let item_type = match (&tracer.item_type, item_type) {
// unsigned x unsigned -> u64
(U8 | U16 | U32 | U64, U8 | U16 | U32 | U64) => U64,
// signed x signed -> i64
(I8 | I16 | I32 | I64, I8 | I16 | I32 | I64) => I64,
// signed x unsigned -> i64
(I8 | I16 | I32 | I64, U8 | U16 | U32 | U64) => I64,
// unsigned x signed -> i64
(U8 | U16 | U32 | U64, I8 | I16 | I32 | I64) => I64,
// float x float -> f64
(F32 | F64, F32 | F64) => F64,
// int x float -> f64
(I8 | I16 | I32 | I64 | U8 | U16 | U32 | U64, F32 | F64) => F64,
// float x int -> f64
(F32 | F64, I8 | I16 | I32 | I64 | U8 | U16 | U32 | U64) => F64,
(ty, ev) => fail!("Cannot accept event {ev} for tracer of primitive type {ty}"),
};
this @ (Self::List(_)
| Self::Map(_)
| Self::Struct(_)
| Self::Tuple(_)
| Self::Union(_)) => {
if matches!(item_type, GenericDataType::Null) {
dispatch_tracer!(this, tracer => { tracer.nullable = true });
} else {
fail!("Cannot merge {ty:?} with {item_type}", ty = this.get_type());
}
}
Self::Primitive(tracer) => {
let (item_type, nullable, strategy) = coerce_primitive_type(
(&tracer.item_type, tracer.nullable, tracer.strategy.as_ref()),
(item_type, strategy),
tracer.options.as_ref(),
)?;

tracer.item_type = item_type;
tracer.strategy = strategy;
tracer.nullable = nullable;
}
Self::Primitive(tracer) if tracer.item_type == item_type => {}
_ => fail!(
"mismatched types, previous {:?}, current {:?}",
self.get_type(),
item_type
),
}
Ok(())
}
}

fn coerce_primitive_type(
prev: (&GenericDataType, bool, Option<&Strategy>),
curr: (GenericDataType, Option<Strategy>),
options: &TracingOptions,
) -> Result<(GenericDataType, bool, Option<Strategy>)> {
use GenericDataType::{
Date64, LargeUtf8, Null, F32, F64, I16, I32, I64, I8, U16, U32, U64, U8,
};

let res = match (prev, curr) {
((prev_ty, nullable, prev_st), (curr_ty, curr_st)) if prev_ty == &curr_ty && prev_st == curr_st.as_ref() => (curr_ty, nullable, curr_st),
((Null, _, _), (curr_ty, curr_st)) => (curr_ty, true, curr_st),
((prev_ty, _, prev_st), (Null, _)) => (prev_ty.clone(), true, prev_st.cloned()),
// unsigned x unsigned -> u64
((U8 | U16 | U32 | U64, nullable, _), (U8 | U16 | U32 | U64, _,)) if options.coerce_numbers => (U64, nullable, None),
// signed x signed -> i64
((I8 | I16 | I32 | I64, nullable, _), (I8 | I16 | I32 | I64, _)) if options.coerce_numbers => (I64, nullable, None),
// signed x unsigned -> i64
((I8 | I16 | I32 | I64, nullable, _), (U8 | U16 | U32 | U64, _)) if options.coerce_numbers => (I64, nullable, None),
// unsigned x signed -> i64
((U8 | U16 | U32 | U64, nullable, _), (I8 | I16 | I32 | I64, _)) if options.coerce_numbers => (I64, nullable, None),
// float x float -> f64
((F32 | F64, nullable, _), (F32 | F64, _)) if options.coerce_numbers=> (F64, nullable, None),
// int x float -> f64
((I8 | I16 | I32 | I64 | U8 | U16 | U32 | U64, nullable, _), (F32 | F64, _)) if options.coerce_numbers => (F64, nullable, None),
// float x int -> f64
((F32 | F64, nullable, _), (I8 | I16 | I32 | I64 | U8 | U16 | U32 | U64, _)) if options.coerce_numbers => (F64, nullable, None),
// incompatible formats, coerce to string
((Date64, nullable, _), (LargeUtf8, _)) => (LargeUtf8, nullable, None),
((LargeUtf8, nullable, _), (Date64, _)) => (LargeUtf8, nullable, None),
((Date64, nullable, prev_st), (Date64, curr_st)) if prev_st != curr_st.as_ref() => (LargeUtf8, nullable, None),
((prev_ty, _, prev_st), (curr_ty, curr_st)) => fail!("Cannot accept event {curr_ty} with strategy {curr_st:?} for tracer of primitive type {prev_ty} with strategy {prev_st:?}"),
};
Ok(res)
}

#[derive(Debug, PartialEq, Clone)]
pub struct UnknownTracer {
pub name: String,
Expand Down
1 change: 0 additions & 1 deletion serde_arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,6 @@ pub mod _impl {
#[cfg(has_arrow_39)] build_arrow_crate!(arrow_array_39, arrow_buffer_39, arrow_data_39, arrow_schema_39);
#[cfg(has_arrow_38)] build_arrow_crate!(arrow_array_38, arrow_buffer_38, arrow_data_38, arrow_schema_38);
#[cfg(has_arrow_37)] build_arrow_crate!(arrow_array_37, arrow_buffer_37, arrow_data_37, arrow_schema_37);
#[cfg(has_arrow_36)] build_arrow_crate!(arrow_array_36, arrow_buffer_36, arrow_data_36, arrow_schema_36);

/// Documentation
pub mod docs {
Expand Down
1 change: 1 addition & 0 deletions serde_arrow/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ mod api_chrono;
mod decimal_representations;
mod error;
mod schema_like;
mod schema_tracing;
Loading

0 comments on commit 03c3766

Please sign in to comment.