Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Properly write nested NullArray in Parquet #17807

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ chrono = { workspace = true }
chrono-tz = { workspace = true, optional = true }
dyn-clone = { version = "1" }
either = { workspace = true }
foreign_vec = { version = "0.1" }
hashbrown = { workspace = true }
num-traits = { workspace = true }
parking_lot = { workspace = true }
polars-error = { workspace = true }
polars-utils = { workspace = true }
serde = { workspace = true, optional = true }
Expand Down
21 changes: 18 additions & 3 deletions crates/polars-arrow/src/array/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ use crate::ffi;
#[derive(Clone)]
pub struct NullArray {
data_type: ArrowDataType,

/// Validity mask. This is always all-zeroes.
validity: Bitmap,

length: usize,
}

Expand All @@ -25,7 +29,13 @@ impl NullArray {
polars_bail!(ComputeError: "NullArray can only be initialized with a DataType whose physical type is Null");
}

Ok(Self { data_type, length })
let validity = Bitmap::new_zeroed(length);

Ok(Self {
data_type,
validity,
length,
})
}

/// Returns a new [`NullArray`].
Expand Down Expand Up @@ -66,8 +76,9 @@ impl NullArray {
///
/// # Safety
/// The caller must ensure that `offset + length < self.len()`.
pub unsafe fn slice_unchecked(&mut self, _offset: usize, length: usize) {
pub unsafe fn slice_unchecked(&mut self, offset: usize, length: usize) {
self.length = length;
self.validity.slice_unchecked(offset, length);
}

#[inline]
Expand All @@ -80,7 +91,7 @@ impl Array for NullArray {
impl_common_array!();

fn validity(&self) -> Option<&Bitmap> {
None
Some(&self.validity)
}

fn with_validity(&self, _: Option<Bitmap>) -> Box<dyn Array> {
Expand Down Expand Up @@ -179,13 +190,17 @@ impl Splitable for NullArray {
}

unsafe fn _split_at_unchecked(&self, offset: usize) -> (Self, Self) {
let (lhs, rhs) = self.validity.split_at(offset);

(
Self {
data_type: self.data_type.clone(),
validity: lhs,
length: offset,
},
Self {
data_type: self.data_type.clone(),
validity: rhs,
length: self.len() - offset,
},
)
Expand Down
56 changes: 54 additions & 2 deletions crates/polars-arrow/src/bitmap/immutable.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::ops::Deref;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::{Arc, OnceLock};

use either::Either;
use parking_lot::RwLockUpgradableReadGuard;
use polars_error::{polars_bail, PolarsResult};

use super::utils::{count_zeros, fmt, get_bit, get_bit_unchecked, BitChunk, BitChunks, BitmapIter};
Expand Down Expand Up @@ -399,7 +400,58 @@ impl Bitmap {
/// Initializes an new [`Bitmap`] filled with unset values.
#[inline]
pub fn new_zeroed(length: usize) -> Self {
Self::new_with_value(false, length)
// There are quite some situations where we just want a zeroed out Bitmap, since that would
// constantly need to reallocate we make a static that contains the largest allocation.
// Then, we can just take an Arc::clone of that slice everytime or grow it if needed.
static GLOBAL_ZERO_BYTES: OnceLock<parking_lot::RwLock<Arc<Bytes<u8>>>> = OnceLock::new();

let rwlock_zero_bytes = GLOBAL_ZERO_BYTES.get_or_init(|| {
let byte_length = length.div_ceil(8).next_power_of_two();
parking_lot::RwLock::new(Arc::new(Bytes::from(vec![0; byte_length])))
});

let unset_bit_count_cache = AtomicU64::new(length as u64);

let zero_bytes = rwlock_zero_bytes.upgradable_read();
if zero_bytes.len() * 8 >= length {
let bytes = zero_bytes.clone();
return Bitmap {
bytes,
offset: 0,
length,
unset_bit_count_cache,
};
}

let mut zero_bytes = RwLockUpgradableReadGuard::upgrade(zero_bytes);

// Race Condition:
// By the time we got here, another Guard could have been upgraded, and the buffer
// could have been expanded already. So we want to check again whether we cannot just take
// that buffer.
if zero_bytes.len() * 8 >= length {
let bytes = zero_bytes.clone();
return Bitmap {
bytes,
offset: 0,
length,
unset_bit_count_cache,
};
}

// Let do exponential increases so that we are not constantly allocating new
// buffers.
let byte_length = length.div_ceil(8).next_power_of_two();

let bytes = Arc::new(Bytes::from(vec![0; byte_length]));
*zero_bytes = bytes.clone();

Bitmap {
bytes,
offset: 0,
length,
unset_bit_count_cache,
}
}

/// Initializes an new [`Bitmap`] filled with the given value.
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub(crate) enum BytesAllocator {
#[allow(dead_code)]
Arrow(arrow_buffer::Buffer),
}
pub(crate) type BytesInner<T> = foreign_vec::ForeignVec<BytesAllocator, T>;
pub(crate) type BytesInner<T> = polars_utils::foreign_vec::ForeignVec<BytesAllocator, T>;

/// Bytes representation.
#[repr(transparent)]
Expand Down
5 changes: 2 additions & 3 deletions crates/polars-parquet/src/arrow/write/primitive/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ use crate::parquet::page::DataPage;
use crate::parquet::schema::types::PrimitiveType;
use crate::parquet::types::NativeType;

pub fn array_to_page<'a, T, R>(
array: &'a PrimitiveArray<T>,
pub fn array_to_page<T, R>(
array: &PrimitiveArray<T>,
options: WriteOptions,
type_: PrimitiveType,
nested: &[Nested],
) -> PolarsResult<DataPage>
where
PrimitiveArray<T>: polars_compute::min_max::MinMaxKernel<Scalar<'a> = T>,
T: ArrowNativeType,
R: NativeType,
T: num_traits::AsPrimitive<R>,
Expand Down
100 changes: 100 additions & 0 deletions crates/polars-utils/src/foreign_vec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/// This is pulled out of https://github.com/DataEngineeringLabs/foreign_vec
use std::mem::ManuallyDrop;
use std::ops::DerefMut;
use std::vec::Vec;

/// Mode of deallocating memory regions
enum Allocation<D> {
/// Native allocation
Native,
// A foreign allocator and its ref count
Foreign(D),
}

/// A continuous memory region that may be allocated externally.
///
/// In the most common case, this is created from [`Vec`].
/// However, this region may also be allocated by a foreign allocator `D`
/// and behave as `&[T]`.
pub struct ForeignVec<D, T> {
/// An implementation using an `enum` of a `Vec` or a foreign pointer is not used
/// because `deref` is at least 50% more expensive than the deref of a `Vec`.
data: ManuallyDrop<Vec<T>>,
/// the region was allocated
allocation: Allocation<D>,
}

impl<D, T> ForeignVec<D, T> {
/// Takes ownership of an allocated memory region.
/// # Panics
/// This function panics if and only if pointer is not null
/// # Safety
/// This function is safe if and only if `ptr` is valid for `length`
/// # Implementation
/// This function leaks if and only if `owner` does not deallocate
/// the region `[ptr, ptr+length[` when dropped.
#[inline]
pub unsafe fn from_foreign(ptr: *const T, length: usize, owner: D) -> Self {
assert!(!ptr.is_null());
// This line is technically outside the assumptions of `Vec::from_raw_parts`, since
// `ptr` was not allocated by `Vec`. However, one of the invariants of this struct
// is that we do never expose this region as a `Vec`; we only use `Vec` on it to provide
// immutable access to the region (via `Vec::deref` to `&[T]`).
let data = Vec::from_raw_parts(ptr as *mut T, length, length);
let data = ManuallyDrop::new(data);

Self {
data,
allocation: Allocation::Foreign(owner),
}
}

/// Returns a `Some` mutable reference of [`Vec<T>`] iff this was initialized
/// from a [`Vec<T>`] and `None` otherwise.
pub fn get_vec(&mut self) -> Option<&mut Vec<T>> {
match &self.allocation {
Allocation::Foreign(_) => None,
Allocation::Native => Some(self.data.deref_mut()),
}
}
}

impl<D, T> Drop for ForeignVec<D, T> {
#[inline]
fn drop(&mut self) {
match self.allocation {
Allocation::Foreign(_) => {
// the foreign is dropped via its `Drop`
},
Allocation::Native => {
let data = core::mem::take(&mut self.data);
let _ = ManuallyDrop::into_inner(data);
},
}
}
}

impl<D, T> core::ops::Deref for ForeignVec<D, T> {
type Target = [T];

#[inline]
fn deref(&self) -> &[T] {
&self.data
}
}

impl<D, T: core::fmt::Debug> core::fmt::Debug for ForeignVec<D, T> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
core::fmt::Debug::fmt(&**self, f)
}
}

impl<D, T> From<Vec<T>> for ForeignVec<D, T> {
#[inline]
fn from(data: Vec<T>) -> Self {
Self {
data: ManuallyDrop::new(data),
allocation: Allocation::Native,
}
}
}
1 change: 1 addition & 0 deletions crates/polars-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod contention_pool;
pub mod cpuid;
mod error;
pub mod floor_divmod;
pub mod foreign_vec;
pub mod functions;
pub mod hashing;
pub mod idx_vec;
Expand Down
8 changes: 8 additions & 0 deletions py-polars/tests/unit/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1226,6 +1226,14 @@ def test_read_byte_stream_split_arrays(
assert_frame_equal(read, df)


@pytest.mark.write_disk()
def test_parquet_nested_null_array_17795(tmp_path: Path) -> None:
filename = tmp_path / "nested_null.parquet"

pl.DataFrame([{"struct": {"field": None}}]).write_parquet(filename)
pq.read_table(filename)


@pytest.mark.write_disk()
def test_parquet_record_batches_pyarrow_fixed_size_list_16614(tmp_path: Path) -> None:
filename = tmp_path / "a.parquet"
Expand Down