Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/216 tracing with nulls #219

Merged
merged 11 commits into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Changes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Change log

## 0.11.7

- Fix tracing of JSON mixing nulls with non-null data

## 0.11.6

- Add `arrow=52` support
Expand Down
29 changes: 28 additions & 1 deletion serde_arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,31 @@ features = [
"serde-with-float",
# NOTE activating this feature breaks JSON -> float processing
# "serde-with-arbitrary-precision",
]
]

[lints.rust.unexpected_cfgs]
level = "warn"
check-cfg = [
'cfg(has_arrow2)',
'cfg(has_arrow2_0_17)',
'cfg(has_arrow2_0_16)',
'cfg(has_arrow)',
'cfg(has_arrow_fixed_binary_support)',
# arrow-version:insert: 'cfg(has_arrow_{version})',
'cfg(has_arrow_52)',
'cfg(has_arrow_51)',
'cfg(has_arrow_50)',
'cfg(has_arrow_49)',
'cfg(has_arrow_48)',
'cfg(has_arrow_47)',
'cfg(has_arrow_46)',
'cfg(has_arrow_45)',
'cfg(has_arrow_44)',
'cfg(has_arrow_43)',
'cfg(has_arrow_42)',
'cfg(has_arrow_41)',
'cfg(has_arrow_40)',
'cfg(has_arrow_39)',
'cfg(has_arrow_38)',
'cfg(has_arrow_37)',
]
182 changes: 89 additions & 93 deletions serde_arrow/src/internal/schema/tracer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,9 @@ impl Tracer {
self.enforce_depth_limit()?;

match self {
this @ Self::Unknown(_) => {
this if matches!(this, Self::Unknown(_))
|| matches!(this, Self::Primitive(ref tracer) if tracer.item_type == GenericDataType::Null) =>
{
let field_names = fields
.iter()
.map(|field| field.to_string())
Expand Down Expand Up @@ -314,7 +316,9 @@ impl Tracer {
self.enforce_depth_limit()?;

match self {
this @ Self::Unknown(_) => {
this if matches!(this, Self::Unknown(_))
|| matches!(this, Self::Primitive(ref tracer) if tracer.item_type == GenericDataType::Null) =>
{
let tracer = dispatch_tracer!(this, tracer => TupleTracer {
name: tracer.name.clone(),
path: tracer.path.clone(),
Expand Down Expand Up @@ -346,7 +350,9 @@ impl Tracer {
self.enforce_depth_limit()?;

match self {
this @ Self::Unknown(_) => {
this if matches!(this, Self::Unknown(_))
|| matches!(this, Self::Primitive(ref tracer) if tracer.item_type == GenericDataType::Null) =>
{
let tracer = dispatch_tracer!(this, tracer => UnionTracer {
name: tracer.name.clone(),
path: tracer.path.clone(),
Expand Down Expand Up @@ -382,7 +388,9 @@ impl Tracer {
self.enforce_depth_limit()?;

match self {
this @ Self::Unknown(_) => {
this if matches!(this, Self::Unknown(_))
|| matches!(this, Self::Primitive(ref tracer) if tracer.item_type == GenericDataType::Null) =>
{
let tracer = dispatch_tracer!(this, tracer => ListTracer {
name: tracer.name.clone(),
path: tracer.path.clone(),
Expand All @@ -409,7 +417,9 @@ impl Tracer {
self.enforce_depth_limit()?;

match self {
this @ Self::Unknown(_) => {
this if matches!(this, Self::Unknown(_))
|| matches!(this, Self::Primitive(ref tracer) if tracer.item_type == GenericDataType::Null) =>
{
let tracer = dispatch_tracer!(this, tracer => MapTracer {
name: tracer.name.clone(),
path: tracer.path.clone(),
Expand Down Expand Up @@ -442,112 +452,98 @@ impl Tracer {
item_type: GenericDataType,
strategy: Option<Strategy>,
) -> Result<()> {
if self.is_unknown() {
let tracer = dispatch_tracer!(self, tracer => PrimitiveTracer::new(
tracer.name.clone(),
tracer.path.clone(),
tracer.options.clone(),
item_type,
tracer.nullable,
))
.with_strategy(strategy);
*self = Self::Primitive(tracer);
} else if let Tracer::Primitive(tracer) = self {
use {
GenericDataType::Date64, GenericDataType::LargeUtf8, Strategy::NaiveStrAsDate64,
Strategy::UtcStrAsDate64,
};
let (item_type, strategy) = match ((&tracer.item_type), (item_type)) {
(Date64, Date64) => match (&tracer.strategy, strategy) {
(Some(NaiveStrAsDate64), Some(NaiveStrAsDate64)) => {
(Date64, Some(NaiveStrAsDate64))
}
(Some(UtcStrAsDate64), Some(UtcStrAsDate64)) => (Date64, Some(UtcStrAsDate64)),
// incompatible strategies, coerce to string
(_, _) => (LargeUtf8, None),
},
(LargeUtf8, _) | (_, LargeUtf8) => (LargeUtf8, None),
(prev_ty, new_ty) => {
fail!("mismatched types, previous {prev_ty}, current {new_ty}")
}
};
tracer.item_type = item_type;
tracer.strategy = strategy;
} else {
let Some(ty) = self.get_type() else {
unreachable!("tracer cannot be unknown");
};
fail!("mismatched types, previous {ty}, current {item_type}");
}
Ok(())
self.ensure_primitive_with_strategy(item_type, strategy)
}

pub fn ensure_primitive(&mut self, item_type: GenericDataType) -> Result<()> {
match self {
this @ Self::Unknown(_) => {
let tracer = dispatch_tracer!(this, tracer => PrimitiveTracer::new(
tracer.name.clone(),
tracer.path.clone(),
tracer.options.clone(),
item_type,
tracer.nullable,
));
*this = Self::Primitive(tracer);
}
Self::Primitive(tracer) if tracer.item_type == item_type => {}
_ => fail!(
"mismatched types, previous {:?}, current {:?}",
self.get_type(),
item_type
),
}
Ok(())
self.ensure_primitive_with_strategy(item_type, None)
}

pub fn ensure_number(&mut self, item_type: GenericDataType) -> Result<()> {
self.ensure_primitive_with_strategy(item_type, None)
}

fn ensure_primitive_with_strategy(
&mut self,
item_type: GenericDataType,
strategy: Option<Strategy>,
) -> Result<()> {
match self {
this @ Self::Unknown(_) => {
let tracer = dispatch_tracer!(this, tracer => PrimitiveTracer::new(
tracer.name.clone(),
tracer.path.clone(),
tracer.options.clone(),
let is_null_type = matches!(item_type, GenericDataType::Null);
let tracer = dispatch_tracer!(this, tracer => PrimitiveTracer {
name: tracer.name.clone(),
path: tracer.path.clone(),
options: tracer.options.clone(),
nullable: tracer.nullable || is_null_type,
item_type,
tracer.nullable,
));
strategy,
});
*this = Self::Primitive(tracer);
}
Self::Primitive(tracer) if tracer.options.coerce_numbers => {
use GenericDataType::{F32, F64, I16, I32, I64, I8, U16, U32, U64, U8};
let item_type = match (&tracer.item_type, item_type) {
// unsigned x unsigned -> u64
(U8 | U16 | U32 | U64, U8 | U16 | U32 | U64) => U64,
// signed x signed -> i64
(I8 | I16 | I32 | I64, I8 | I16 | I32 | I64) => I64,
// signed x unsigned -> i64
(I8 | I16 | I32 | I64, U8 | U16 | U32 | U64) => I64,
// unsigned x signed -> i64
(U8 | U16 | U32 | U64, I8 | I16 | I32 | I64) => I64,
// float x float -> f64
(F32 | F64, F32 | F64) => F64,
// int x float -> f64
(I8 | I16 | I32 | I64 | U8 | U16 | U32 | U64, F32 | F64) => F64,
// float x int -> f64
(F32 | F64, I8 | I16 | I32 | I64 | U8 | U16 | U32 | U64) => F64,
(ty, ev) => fail!("Cannot accept event {ev} for tracer of primitive type {ty}"),
};
this @ (Self::List(_)
| Self::Map(_)
| Self::Struct(_)
| Self::Tuple(_)
| Self::Union(_)) => {
if matches!(item_type, GenericDataType::Null) {
dispatch_tracer!(this, tracer => { tracer.nullable = true });
} else {
fail!("Cannot merge {ty:?} with {item_type}", ty = this.get_type());
}
}
Self::Primitive(tracer) => {
let (item_type, nullable, strategy) = coerce_primitive_type(
(&tracer.item_type, tracer.nullable, tracer.strategy.as_ref()),
(item_type, strategy),
tracer.options.as_ref(),
)?;

tracer.item_type = item_type;
tracer.strategy = strategy;
tracer.nullable = nullable;
}
Self::Primitive(tracer) if tracer.item_type == item_type => {}
_ => fail!(
"mismatched types, previous {:?}, current {:?}",
self.get_type(),
item_type
),
}
Ok(())
}
}

fn coerce_primitive_type(
prev: (&GenericDataType, bool, Option<&Strategy>),
curr: (GenericDataType, Option<Strategy>),
options: &TracingOptions,
) -> Result<(GenericDataType, bool, Option<Strategy>)> {
use GenericDataType::{
Date64, LargeUtf8, Null, F32, F64, I16, I32, I64, I8, U16, U32, U64, U8,
};

let res = match (prev, curr) {
((prev_ty, nullable, prev_st), (curr_ty, curr_st)) if prev_ty == &curr_ty && prev_st == curr_st.as_ref() => (curr_ty, nullable, curr_st),
((Null, _, _), (curr_ty, curr_st)) => (curr_ty, true, curr_st),
((prev_ty, _, prev_st), (Null, _)) => (prev_ty.clone(), true, prev_st.cloned()),
// unsigned x unsigned -> u64
((U8 | U16 | U32 | U64, nullable, _), (U8 | U16 | U32 | U64, _,)) if options.coerce_numbers => (U64, nullable, None),
// signed x signed -> i64
((I8 | I16 | I32 | I64, nullable, _), (I8 | I16 | I32 | I64, _)) if options.coerce_numbers => (I64, nullable, None),
// signed x unsigned -> i64
((I8 | I16 | I32 | I64, nullable, _), (U8 | U16 | U32 | U64, _)) if options.coerce_numbers => (I64, nullable, None),
// unsigned x signed -> i64
((U8 | U16 | U32 | U64, nullable, _), (I8 | I16 | I32 | I64, _)) if options.coerce_numbers => (I64, nullable, None),
// float x float -> f64
((F32 | F64, nullable, _), (F32 | F64, _)) if options.coerce_numbers=> (F64, nullable, None),
// int x float -> f64
((I8 | I16 | I32 | I64 | U8 | U16 | U32 | U64, nullable, _), (F32 | F64, _)) if options.coerce_numbers => (F64, nullable, None),
// float x int -> f64
((F32 | F64, nullable, _), (I8 | I16 | I32 | I64 | U8 | U16 | U32 | U64, _)) if options.coerce_numbers => (F64, nullable, None),
// incompatible formats, coerce to string
((Date64, nullable, _), (LargeUtf8, _)) => (LargeUtf8, nullable, None),
((LargeUtf8, nullable, _), (Date64, _)) => (LargeUtf8, nullable, None),
((Date64, nullable, prev_st), (Date64, curr_st)) if prev_st != curr_st.as_ref() => (LargeUtf8, nullable, None),
((prev_ty, _, prev_st), (curr_ty, curr_st)) => fail!("Cannot accept event {curr_ty} with strategy {curr_st:?} for tracer of primitive type {prev_ty} with strategy {prev_st:?}"),
};
Ok(res)
}

#[derive(Debug, PartialEq, Clone)]
pub struct UnknownTracer {
pub name: String,
Expand Down
1 change: 0 additions & 1 deletion serde_arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,6 @@ pub mod _impl {
#[cfg(has_arrow_39)] build_arrow_crate!(arrow_array_39, arrow_buffer_39, arrow_data_39, arrow_schema_39);
#[cfg(has_arrow_38)] build_arrow_crate!(arrow_array_38, arrow_buffer_38, arrow_data_38, arrow_schema_38);
#[cfg(has_arrow_37)] build_arrow_crate!(arrow_array_37, arrow_buffer_37, arrow_data_37, arrow_schema_37);
#[cfg(has_arrow_36)] build_arrow_crate!(arrow_array_36, arrow_buffer_36, arrow_data_36, arrow_schema_36);

/// Documentation
pub mod docs {
Expand Down
1 change: 1 addition & 0 deletions serde_arrow/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ mod api_chrono;
mod decimal_representations;
mod error;
mod schema_like;
mod schema_tracing;
Loading