Skip to content

Commit

Permalink
feat: add architecture for polars-flavored IPC (#13734)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 15, 2024
1 parent a4234ba commit da3ae10
Show file tree
Hide file tree
Showing 59 changed files with 200 additions and 131 deletions.
12 changes: 6 additions & 6 deletions crates/polars-core/src/chunked_array/arithmetic/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ impl Add for &DecimalChunked {
self.arithmetic_helper(
rhs,
decimal::add,
|lhs, rhs_val| decimal::add_scalar(lhs, rhs_val, &rhs.dtype().to_arrow()),
|lhs_val, rhs| decimal::add_scalar(rhs, lhs_val, &self.dtype().to_arrow()),
|lhs, rhs_val| decimal::add_scalar(lhs, rhs_val, &rhs.dtype().to_arrow(true)),
|lhs_val, rhs| decimal::add_scalar(rhs, lhs_val, &self.dtype().to_arrow(true)),
)
}
}
Expand All @@ -130,8 +130,8 @@ impl Mul for &DecimalChunked {
self.arithmetic_helper(
rhs,
decimal::mul,
|lhs, rhs_val| decimal::mul_scalar(lhs, rhs_val, &rhs.dtype().to_arrow()),
|lhs_val, rhs| decimal::mul_scalar(rhs, lhs_val, &self.dtype().to_arrow()),
|lhs, rhs_val| decimal::mul_scalar(lhs, rhs_val, &rhs.dtype().to_arrow(true)),
|lhs_val, rhs| decimal::mul_scalar(rhs, lhs_val, &self.dtype().to_arrow(true)),
)
}
}
Expand All @@ -143,8 +143,8 @@ impl Div for &DecimalChunked {
self.arithmetic_helper(
rhs,
decimal::div,
|lhs, rhs_val| decimal::div_scalar(lhs, rhs_val, &rhs.dtype().to_arrow()),
|lhs_val, rhs| decimal::div_scalar_swapped(lhs_val, &self.dtype().to_arrow(), rhs),
|lhs, rhs_val| decimal::div_scalar(lhs, rhs_val, &rhs.dtype().to_arrow(true)),
|lhs_val, rhs| decimal::div_scalar_swapped(lhs_val, &self.dtype().to_arrow(true), rhs),
)
}
}
6 changes: 3 additions & 3 deletions crates/polars-core/src/chunked_array/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl ArrayChunked {
/// Get the inner values as `Series`
pub fn get_inner(&self) -> Series {
let ca = self.rechunk();
let inner_dtype = self.inner_dtype().to_arrow();
let inner_dtype = self.inner_dtype().to_arrow(true);
let arr = ca.downcast_iter().next().unwrap();
unsafe {
Series::_try_from_arrow_unchecked(
Expand All @@ -51,7 +51,7 @@ impl ArrayChunked {
) -> PolarsResult<ArrayChunked> {
// Rechunk or the generated Series will have wrong length.
let ca = self.rechunk();
let inner_dtype = self.inner_dtype().to_arrow();
let inner_dtype = self.inner_dtype().to_arrow(true);

let chunks = ca.downcast_iter().map(|arr| {
let elements = unsafe {
Expand All @@ -73,7 +73,7 @@ impl ArrayChunked {
let values = out.chunks()[0].clone();

let inner_dtype =
FixedSizeListArray::default_datatype(out.dtype().to_arrow(), ca.width());
FixedSizeListArray::default_datatype(out.dtype().to_arrow(true), ca.width());
let arr = FixedSizeListArray::new(inner_dtype, values, arr.validity().cloned());
Ok(arr)
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,12 @@ impl FixedSizeListBuilder for AnonymousOwnedFixedSizeListBuilder {

fn finish(&mut self) -> ArrayChunked {
let arr = std::mem::take(&mut self.inner)
.finish(self.inner_dtype.as_ref().map(|dt| dt.to_arrow()).as_ref())
.finish(
self.inner_dtype
.as_ref()
.map(|dt| dt.to_arrow(true))
.as_ref(),
)
.unwrap();
ChunkedArray::with_chunk(self.name.as_str(), arr)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ impl<'a> AnonymousListBuilder<'a> {
} else {
let inner_dtype = slf.inner_dtype.materialize();

let inner_dtype_physical = inner_dtype.as_ref().map(|dt| dt.to_physical().to_arrow());
let inner_dtype_physical = inner_dtype
.as_ref()
.map(|dt| dt.to_physical().to_arrow(true));
let arr = slf.builder.finish(inner_dtype_physical.as_ref()).unwrap();

let list_dtype_logical = match inner_dtype {
Expand Down Expand Up @@ -153,7 +155,9 @@ impl ListBuilderTrait for AnonymousOwnedListBuilder {
let inner_dtype = std::mem::take(&mut self.inner_dtype).materialize();
// Don't use self from here on out.
let slf = std::mem::take(self);
let inner_dtype_physical = inner_dtype.as_ref().map(|dt| dt.to_physical().to_arrow());
let inner_dtype_physical = inner_dtype
.as_ref()
.map(|dt| dt.to_physical().to_arrow(true));
let arr = slf.builder.finish(inner_dtype_physical.as_ref()).unwrap();

let list_dtype_logical = match inner_dtype {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-core/src/chunked_array/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ where
T: PolarsNumericType,
{
fn from_slice(name: &str, v: &[T::Native]) -> Self {
let arr = PrimitiveArray::from_slice(v).to(T::get_dtype().to_arrow());
let arr = PrimitiveArray::from_slice(v).to(T::get_dtype().to_arrow(true));
ChunkedArray::with_chunk(name, arr)
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-core/src/chunked_array/builder/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ where
{
pub fn new(name: &str, capacity: usize) -> Self {
let array_builder = MutablePrimitiveArray::<T::Native>::with_capacity(capacity)
.to(T::get_dtype().to_arrow());
.to(T::get_dtype().to_arrow(true));

PrimitiveChunkedBuilder {
array_builder,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-core/src/chunked_array/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub(crate) fn cast_chunks(
}
};

let arrow_dtype = dtype.to_arrow();
let arrow_dtype = dtype.to_arrow(true);
chunks
.iter()
.map(|arr| arrow::compute::cast::cast(arr.as_ref(), &arrow_dtype, options))
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-core/src/chunked_array/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub(crate) fn prepare_collect_dtype(dtype: &DataType) -> ArrowDataType {
registry::get_object_physical_type()
},
},
dt => dt.to_arrow(),
dt => dt.to_arrow(true),
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-core/src/chunked_array/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ where
#[cfg(debug_assertions)]
{
if !chunks.is_empty() && dtype.is_primitive() {
assert_eq!(chunks[0].data_type(), &dtype.to_physical().to_arrow())
assert_eq!(chunks[0].data_type(), &dtype.to_physical().to_arrow(true))
}
}
let field = Arc::new(Field::new(name, dtype));
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-core/src/chunked_array/list/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl ListChunked {
/// Get the inner values as [`Series`], ignoring the list offsets.
pub fn get_inner(&self) -> Series {
let ca = self.rechunk();
let inner_dtype = self.inner_dtype().to_arrow();
let inner_dtype = self.inner_dtype().to_arrow(true);
let arr = ca.downcast_iter().next().unwrap();
unsafe {
Series::_try_from_arrow_unchecked(
Expand All @@ -60,7 +60,7 @@ impl ListChunked {
) -> PolarsResult<ListChunked> {
// generated Series will have wrong length otherwise.
let ca = self.rechunk();
let inner_dtype = self.inner_dtype().to_arrow();
let inner_dtype = self.inner_dtype().to_arrow(true);
let arr = ca.downcast_iter().next().unwrap();

let elements = unsafe {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn slots_to_mut(slots: &Utf8Array<i64>) -> MutableUtf8Array<i64> {
// all offsets are valid and the u8 data is valid utf8
unsafe {
MutableUtf8Array::new_unchecked(
DataType::String.to_arrow(),
ArrowDataType::LargeUtf8,
offset_buf,
values_buf,
validity_buf,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-core/src/chunked_array/logical/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl Int128Chunked {
let (_, values, validity) = default.into_inner();

*arr = PrimitiveArray::new(
DataType::Decimal(precision, Some(scale)).to_arrow(),
DataType::Decimal(precision, Some(scale)).to_arrow(true),
values,
validity,
);
Expand Down
8 changes: 4 additions & 4 deletions crates/polars-core/src/chunked_array/logical/struct_/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ fn fields_to_struct_array(fields: &[Series], physical: bool) -> (ArrayRef, Vec<S
let s = s.rechunk();
match s.dtype() {
#[cfg(feature = "object")]
DataType::Object(_, _) => s.to_arrow(0),
DataType::Object(_, _) => s.to_arrow(0, true),
_ => {
if physical {
s.chunks()[0].clone()
} else {
s.to_arrow(0)
s.to_arrow(0, true)
}
},
}
Expand Down Expand Up @@ -143,7 +143,7 @@ impl StructChunked {
.iter()
.map(|s| match s.dtype() {
#[cfg(feature = "object")]
DataType::Object(_, _) => s.to_arrow(i),
DataType::Object(_, _) => s.to_arrow(i, true),
_ => s.chunks()[i].clone(),
})
.collect::<Vec<_>>();
Expand Down Expand Up @@ -297,7 +297,7 @@ impl StructChunked {
let values = self
.fields
.iter()
.map(|s| s.to_arrow(i))
.map(|s| s.to_arrow(i, true))
.collect::<Vec<_>>();

// we determine fields from arrays as there might be object arrays
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-core/src/chunked_array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ pub(crate) fn to_primitive<T: PolarsNumericType>(
values: Vec<T::Native>,
validity: Option<Bitmap>,
) -> PrimitiveArray<T::Native> {
PrimitiveArray::new(T::get_dtype().to_arrow(), values.into(), validity)
PrimitiveArray::new(T::get_dtype().to_arrow(true), values.into(), validity)
}

pub(crate) fn to_array<T: PolarsNumericType>(
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-core/src/chunked_array/ops/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ where
let out: U::Array = arr
.values_iter()
.map(&mut op)
.collect_arr_with_dtype(dtype.to_arrow());
.collect_arr_with_dtype(dtype.to_arrow(true));
out.with_validity_typed(arr.validity().cloned())
} else {
let out: U::Array = arr
.iter()
.map(|opt| opt.map(&mut op))
.collect_arr_with_dtype(dtype.to_arrow());
.collect_arr_with_dtype(dtype.to_arrow(true));
out.with_validity_typed(arr.validity().cloned())
}
});
Expand Down Expand Up @@ -159,7 +159,7 @@ where
drop(arr);

let compute_immutable = |arr: &PrimitiveArray<S::Native>| {
arrow::compute::arity::unary(arr, f, S::get_dtype().to_arrow())
arrow::compute::arity::unary(arr, f, S::get_dtype().to_arrow(true))
};

if owned_arr.values().is_sliced() {
Expand Down
12 changes: 6 additions & 6 deletions crates/polars-core/src/chunked_array/ops/arity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ where
V::Array: ArrayFromIter<<F as UnaryFnMut<T::Physical<'a>>>::Ret>,
{
if ca.null_count() == ca.len() {
let arr = V::Array::full_null(ca.len(), V::get_dtype().to_arrow());
let arr = V::Array::full_null(ca.len(), V::get_dtype().to_arrow(true));
return ChunkedArray::with_chunk(ca.name(), arr);
}

Expand All @@ -102,7 +102,7 @@ where
V::Array: ArrayFromIter<K>,
{
if ca.null_count() == ca.len() {
let arr = V::Array::full_null(ca.len(), V::get_dtype().to_arrow());
let arr = V::Array::full_null(ca.len(), V::get_dtype().to_arrow(true));
return Ok(ChunkedArray::with_chunk(ca.name(), arr));
}

Expand Down Expand Up @@ -280,7 +280,7 @@ where
{
if lhs.null_count() == lhs.len() || rhs.null_count() == rhs.len() {
let len = lhs.len().min(rhs.len());
let arr = V::Array::full_null(len, V::get_dtype().to_arrow());
let arr = V::Array::full_null(len, V::get_dtype().to_arrow(true));

return ChunkedArray::with_chunk(lhs.name(), arr);
}
Expand Down Expand Up @@ -319,7 +319,7 @@ where
{
if lhs.null_count() == lhs.len() || rhs.null_count() == rhs.len() {
let len = lhs.len().min(rhs.len());
let arr = V::Array::full_null(len, V::get_dtype().to_arrow());
let arr = V::Array::full_null(len, V::get_dtype().to_arrow(true));

return Ok(ChunkedArray::with_chunk(lhs.name(), arr));
}
Expand Down Expand Up @@ -678,7 +678,7 @@ where
let min = lhs.len().min(rhs.len());
let max = lhs.len().max(rhs.len());
let len = if min == 1 { max } else { min };
let arr = V::Array::full_null(len, V::get_dtype().to_arrow());
let arr = V::Array::full_null(len, V::get_dtype().to_arrow(true));

return ChunkedArray::with_chunk(lhs.name(), arr);
}
Expand Down Expand Up @@ -714,7 +714,7 @@ where
let min = lhs.len().min(rhs.len());
let max = lhs.len().max(rhs.len());
let len = if min == 1 { max } else { min };
let arr = V::Array::full_null(len, V::get_dtype().to_arrow());
let arr = V::Array::full_null(len, V::get_dtype().to_arrow(true));

return Ok(ChunkedArray::with_chunk(lhs.name(), arr));
}
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-core/src/chunked_array/ops/bit_repr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn reinterpret_list_chunked<T: PolarsNumericType, U: PolarsNumericType>(
let pa =
PrimitiveArray::from_data_default(reinterpreted_buf, inner_arr.validity().cloned());
LargeListArray::new(
DataType::List(Box::new(U::get_dtype())).to_arrow(),
DataType::List(Box::new(U::get_dtype())).to_arrow(true),
array.offsets().clone(),
pa.to_boxed(),
array.validity().cloned(),
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-core/src/chunked_array/ops/explode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ where
unsafe { unset_bit_raw(validity_slice, i) }
}
let arr = PrimitiveArray::new(
T::get_dtype().to_arrow(),
T::get_dtype().to_arrow(true),
new_values.into(),
Some(validity.into()),
);
Expand Down Expand Up @@ -274,7 +274,7 @@ impl ExplodeByOffsets for ListChunked {
last = o;
}
process_range(start, last, &mut builder);
let arr = builder.finish(Some(&inner_type.to_arrow())).unwrap();
let arr = builder.finish(Some(&inner_type.to_arrow(true))).unwrap();
unsafe { self.copy_with_chunks(vec![Box::new(arr)], true, true) }.into_series()
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-core/src/chunked_array/ops/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl ChunkFilter<ListType> for ListChunked {
Some(true) => Ok(self.clone()),
_ => Ok(ListChunked::from_chunk_iter(
self.name(),
[ListArray::new_empty(self.dtype().to_arrow())],
[ListArray::new_empty(self.dtype().to_arrow(true))],
)),
};
}
Expand All @@ -126,7 +126,7 @@ impl ChunkFilter<FixedSizeListType> for ArrayChunked {
Some(true) => Ok(self.clone()),
_ => Ok(ArrayChunked::from_chunk_iter(
self.name(),
[FixedSizeListArray::new_empty(self.dtype().to_arrow())],
[FixedSizeListArray::new_empty(self.dtype().to_arrow(true))],
)),
};
}
Expand Down
12 changes: 6 additions & 6 deletions crates/polars-core/src/chunked_array/ops/full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ where
T: PolarsNumericType,
{
fn full_null(name: &str, length: usize) -> Self {
let arr = PrimitiveArray::new_null(T::get_dtype().to_arrow(), length);
let arr = PrimitiveArray::new_null(T::get_dtype().to_arrow(true), length);
ChunkedArray::with_chunk(name, arr)
}
}
Expand All @@ -39,7 +39,7 @@ impl ChunkFull<bool> for BooleanChunked {

impl ChunkFullNull for BooleanChunked {
fn full_null(name: &str, length: usize) -> Self {
let arr = BooleanArray::new_null(DataType::Boolean.to_arrow(), length);
let arr = BooleanArray::new_null(ArrowDataType::Boolean, length);
ChunkedArray::with_chunk(name, arr)
}
}
Expand All @@ -59,7 +59,7 @@ impl<'a> ChunkFull<&'a str> for StringChunked {

impl ChunkFullNull for StringChunked {
fn full_null(name: &str, length: usize) -> Self {
let arr = Utf8Array::new_null(DataType::String.to_arrow(), length);
let arr = Utf8Array::new_null(DataType::String.to_arrow(true), length);
ChunkedArray::with_chunk(name, arr)
}
}
Expand All @@ -79,7 +79,7 @@ impl<'a> ChunkFull<&'a [u8]> for BinaryChunked {

impl ChunkFullNull for BinaryChunked {
fn full_null(name: &str, length: usize) -> Self {
let arr = BinaryArray::new_null(DataType::Binary.to_arrow(), length);
let arr = BinaryArray::new_null(DataType::Binary.to_arrow(true), length);
ChunkedArray::with_chunk(name, arr)
}
}
Expand Down Expand Up @@ -111,7 +111,7 @@ impl ArrayChunked {
) -> ArrayChunked {
let arr = FixedSizeListArray::new_null(
ArrowDataType::FixedSizeList(
Box::new(ArrowField::new("item", inner_dtype.to_arrow(), true)),
Box::new(ArrowField::new("item", inner_dtype.to_arrow(true), true)),
width,
),
length,
Expand Down Expand Up @@ -150,7 +150,7 @@ impl ListChunked {
let arr: ListArray<i64> = ListArray::new_null(
ArrowDataType::LargeList(Box::new(ArrowField::new(
"item",
inner_dtype.to_physical().to_arrow(),
inner_dtype.to_physical().to_arrow(true),
true,
))),
length,
Expand Down
Loading

0 comments on commit da3ae10

Please sign in to comment.