Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Move from parquet-format-safe to polars-parquet-format #19275

Merged
merged 3 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/polars-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ hashbrown = { workspace = true }
num-traits = { workspace = true }
polars-compute = { workspace = true, features = ["approx_unique"] }
polars-error = { workspace = true }
polars-parquet-format = { git = "https://github.com/pola-rs/parquet-format" }
polars-utils = { workspace = true, features = ["mmap"] }
simdutf8 = { workspace = true }

parquet-format-safe = "0.2"
streaming-decompression = "0.1"

async-stream = { version = "0.3.3", optional = true }
Expand Down Expand Up @@ -61,6 +61,6 @@ gzip_zlib_ng = ["flate2/zlib-ng"]
lz4 = ["dep:lz4"]
lz4_flex = ["dep:lz4_flex"]

async = ["async-stream", "futures", "parquet-format-safe/async"]
async = ["async-stream", "futures", "polars-parquet-format/async"]
bloom_filter = ["xxhash-rust"]
serde_types = ["serde"]
4 changes: 2 additions & 2 deletions crates/polars-parquet/src/parquet/bloom_filter/read.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::io::{Read, Seek, SeekFrom};

use parquet_format_safe::thrift::protocol::TCompactInputProtocol;
use parquet_format_safe::{
use polars_parquet_format::thrift::protocol::TCompactInputProtocol;
use polars_parquet_format::{
BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHeader, SplitBlockAlgorithm,
Uncompressed,
};
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-parquet/src/parquet/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ impl From<lz4_flex::block::CompressError> for ParquetError {
}
}

impl From<parquet_format_safe::thrift::Error> for ParquetError {
fn from(e: parquet_format_safe::thrift::Error) -> ParquetError {
impl From<polars_parquet_format::thrift::Error> for ParquetError {
fn from(e: polars_parquet_format::thrift::Error) -> ParquetError {
ParquetError::OutOfSpec(format!("Invalid thrift: {}", e))
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use parquet_format_safe::{ColumnChunk, ColumnMetaData, Encoding};
use polars_parquet_format::{ColumnChunk, ColumnMetaData, Encoding};

use super::column_descriptor::ColumnDescriptor;
use crate::parquet::compression::Compression;
Expand All @@ -10,7 +10,7 @@ use crate::parquet::statistics::Statistics;
mod serde_types {
pub use std::io::Cursor;

pub use parquet_format_safe::thrift::protocol::{
pub use polars_parquet_format::thrift::protocol::{
TCompactInputProtocol, TCompactOutputProtocol,
};
pub use serde::de::Error as DeserializeError;
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-parquet/src/parquet/metadata/file_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use parquet_format_safe::ColumnOrder as TColumnOrder;
use polars_parquet_format::ColumnOrder as TColumnOrder;

use super::column_order::ColumnOrder;
use super::schema_descriptor::SchemaDescriptor;
Expand All @@ -8,7 +8,7 @@ use crate::parquet::metadata::get_sort_order;
pub use crate::parquet::thrift_format::KeyValue;

/// Metadata for a Parquet file.
// This is almost equal to [`parquet_format_safe::FileMetaData`] but contains the descriptors,
// This is almost equal to [`polars_parquet_format::FileMetaData`] but contains the descriptors,
// which are crucial to deserialize pages.
#[derive(Debug, Clone)]
pub struct FileMetadata {
Expand Down Expand Up @@ -65,7 +65,7 @@ impl FileMetadata {

/// Deserializes [`crate::parquet::thrift_format::FileMetadata`] into this struct
pub fn try_from_thrift(
metadata: parquet_format_safe::FileMetaData,
metadata: polars_parquet_format::FileMetaData,
) -> Result<Self, ParquetError> {
let schema_descr = SchemaDescriptor::try_from_thrift(&metadata.schema)?;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use hashbrown::hash_map::RawEntryMut;
use parquet_format_safe::{RowGroup, SortingColumn};
use polars_parquet_format::{RowGroup, SortingColumn};
use polars_utils::aliases::{InitHashMaps, PlHashMap};
use polars_utils::idx_vec::UnitVec;
use polars_utils::pl_str::PlSmallStr;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use parquet_format_safe::SchemaElement;
use polars_parquet_format::SchemaElement;
use polars_utils::pl_str::PlSmallStr;
#[cfg(feature = "serde_types")]
use serde::{Deserialize, Serialize};
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-parquet/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub mod write;

use std::ops::Deref;

use parquet_format_safe as thrift_format;
use polars_parquet_format as thrift_format;
use polars_utils::mmap::MemSlice;
pub use streaming_decompression::{fallible_streaming_iterator, FallibleStreamingIterator};

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-parquet/src/parquet/read/compression.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use parquet_format_safe::DataPageHeaderV2;
use polars_parquet_format::DataPageHeaderV2;

use super::PageReader;
use crate::parquet::compression::{self, Compression};
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-parquet/src/parquet/read/metadata.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::cmp::min;
use std::io::{Read, Seek, SeekFrom};

use parquet_format_safe::thrift::protocol::TCompactInputProtocol;
use parquet_format_safe::FileMetaData as TFileMetadata;
use polars_parquet_format::thrift::protocol::TCompactInputProtocol;
use polars_parquet_format::FileMetaData as TFileMetadata;

use super::super::metadata::FileMetadata;
use super::super::{DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE, HEADER_SIZE, PARQUET_MAGIC};
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-parquet/src/parquet/read/page/reader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::io::Seek;
use std::sync::OnceLock;

use parquet_format_safe::thrift::protocol::TCompactInputProtocol;
use polars_parquet_format::thrift::protocol::TCompactInputProtocol;
use polars_utils::mmap::{MemReader, MemSlice};

use super::PageIterator;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-parquet/src/parquet/read/page/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::io::SeekFrom;

use async_stream::try_stream;
use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, Stream};
use parquet_format_safe::thrift::protocol::TCompactInputStreamProtocol;
use polars_parquet_format::thrift::protocol::TCompactInputStreamProtocol;
use polars_utils::mmap::MemSlice;

use super::reader::{finish_page, PageMetaData};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
//! println!("{:?}", schema);
//! ```

use parquet_format_safe::Type;
use polars_parquet_format::Type;
use polars_utils::pl_str::PlSmallStr;
use types::PrimitiveLogicalType;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use parquet_format_safe::SchemaElement;
use polars_parquet_format::SchemaElement;
use polars_utils::pl_str::PlSmallStr;

use super::super::types::ParquetType;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use parquet_format_safe::{ConvertedType, SchemaElement};
use polars_parquet_format::{ConvertedType, SchemaElement};

use super::super::types::ParquetType;
use crate::parquet::schema::types::PrimitiveType;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use parquet_format_safe::ConvertedType;
use polars_parquet_format::ConvertedType;
#[cfg(feature = "serde_types")]
use serde::{Deserialize, Serialize};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use parquet_format_safe::Type;
use polars_parquet_format::Type;
#[cfg(feature = "serde_types")]
use serde::{Deserialize, Serialize};

Expand Down
6 changes: 4 additions & 2 deletions crates/polars-parquet/src/parquet/statistics/binary.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use parquet_format_safe::Statistics as ParquetStatistics;
use polars_parquet_format::Statistics as ParquetStatistics;

use crate::parquet::error::ParquetResult;
use crate::parquet::schema::types::PrimitiveType;
Expand Down Expand Up @@ -32,8 +32,10 @@ impl BinaryStatistics {
distinct_count: self.distinct_count,
max_value: self.max_value.clone(),
min_value: self.min_value.clone(),
min: None,
max: None,
min: None,
is_max_value_exact: None,
is_min_value_exact: None,
}
}
}
6 changes: 4 additions & 2 deletions crates/polars-parquet/src/parquet/statistics/boolean.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use parquet_format_safe::Statistics as ParquetStatistics;
use polars_parquet_format::Statistics as ParquetStatistics;

use crate::parquet::error::{ParquetError, ParquetResult};

Expand Down Expand Up @@ -49,8 +49,10 @@ impl BooleanStatistics {
distinct_count: self.distinct_count,
max_value: self.max_value.map(|x| vec![x as u8]),
min_value: self.min_value.map(|x| vec![x as u8]),
min: None,
max: None,
min: None,
is_max_value_exact: None,
is_min_value_exact: None,
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use parquet_format_safe::Statistics as ParquetStatistics;
use polars_parquet_format::Statistics as ParquetStatistics;

use crate::parquet::error::{ParquetError, ParquetResult};
use crate::parquet::schema::types::PrimitiveType;
Expand Down Expand Up @@ -54,8 +54,10 @@ impl FixedLenStatistics {
distinct_count: self.distinct_count,
max_value: self.max_value.clone(),
min_value: self.min_value.clone(),
min: None,
max: None,
min: None,
is_max_value_exact: None,
is_min_value_exact: None,
}
}
}
12 changes: 12 additions & 0 deletions crates/polars-parquet/src/parquet/statistics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub use primitive::PrimitiveStatistics;
use crate::parquet::error::ParquetResult;
use crate::parquet::schema::types::{PhysicalType, PrimitiveType};
pub use crate::parquet::thrift_format::Statistics as ParquetStatistics;
use crate::read::ParquetError;

#[derive(Debug, PartialEq)]
pub enum Statistics {
Expand Down Expand Up @@ -50,6 +51,17 @@ impl Statistics {
statistics: &ParquetStatistics,
primitive_type: PrimitiveType,
) -> ParquetResult<Self> {
if statistics.is_min_value_exact.is_some() {
return Err(ParquetError::not_supported(
"is_min_value_exact in statistics",
));
}
if statistics.is_max_value_exact.is_some() {
return Err(ParquetError::not_supported(
"is_max_value_exact in statistics",
));
}

use {PhysicalType as T, PrimitiveStatistics as PrimStat};
Ok(match primitive_type.physical_type {
T::ByteArray => BinaryStatistics::deserialize(statistics, primitive_type)?.into(),
Expand Down
6 changes: 4 additions & 2 deletions crates/polars-parquet/src/parquet/statistics/primitive.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use parquet_format_safe::Statistics as ParquetStatistics;
use polars_parquet_format::Statistics as ParquetStatistics;

use crate::parquet::error::{ParquetError, ParquetResult};
use crate::parquet::schema::types::PrimitiveType;
Expand Down Expand Up @@ -50,8 +50,10 @@ impl<T: types::NativeType> PrimitiveStatistics<T> {
distinct_count: self.distinct_count,
max_value: self.max_value.map(|x| x.to_le_bytes().as_ref().to_vec()),
min_value: self.min_value.map(|x| x.to_le_bytes().as_ref().to_vec()),
min: None,
max: None,
min: None,
is_max_value_exact: None,
is_min_value_exact: None,
}
}
}
8 changes: 5 additions & 3 deletions crates/polars-parquet/src/parquet/write/column_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use std::io::Write;

#[cfg(feature = "async")]
use futures::AsyncWrite;
use parquet_format_safe::thrift::protocol::TCompactOutputProtocol;
use polars_parquet_format::thrift::protocol::TCompactOutputProtocol;
#[cfg(feature = "async")]
use parquet_format_safe::thrift::protocol::TCompactOutputStreamProtocol;
use parquet_format_safe::{ColumnChunk, ColumnMetaData, Type};
use polars_parquet_format::thrift::protocol::TCompactOutputStreamProtocol;
use polars_parquet_format::{ColumnChunk, ColumnMetaData, Type};
use polars_utils::aliases::PlHashSet;

#[cfg(feature = "async")]
Expand Down Expand Up @@ -195,6 +195,8 @@ fn build_column_chunk(
statistics,
encoding_stats: None,
bloom_filter_offset: None,
bloom_filter_length: None,
size_statistics: None,
};

Ok(ColumnChunk {
Expand Down
10 changes: 6 additions & 4 deletions crates/polars-parquet/src/parquet/write/file.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::io::Write;

use parquet_format_safe::thrift::protocol::TCompactOutputProtocol;
use parquet_format_safe::RowGroup;
use polars_parquet_format::thrift::protocol::TCompactOutputProtocol;
use polars_parquet_format::RowGroup;

use super::indexes::{write_column_index, write_offset_index};
use super::page::PageWriteSpec;
Expand Down Expand Up @@ -39,15 +39,17 @@ pub(super) fn end_file<W: Write>(
Ok(metadata_len as u64 + FOOTER_SIZE)
}

fn create_column_orders(schema_desc: &SchemaDescriptor) -> Vec<parquet_format_safe::ColumnOrder> {
fn create_column_orders(schema_desc: &SchemaDescriptor) -> Vec<polars_parquet_format::ColumnOrder> {
// We only include ColumnOrder for leaf nodes.
// Currently only supported ColumnOrder is TypeDefinedOrder so we set this
// for all leaf nodes.
// Even if the column has an undefined sort order, such as INTERVAL, this
// is still technically the defined TYPEORDER so it should still be set.
(0..schema_desc.columns().len())
.map(|_| {
parquet_format_safe::ColumnOrder::TYPEORDER(parquet_format_safe::TypeDefinedOrder {})
polars_parquet_format::ColumnOrder::TYPEORDER(
polars_parquet_format::TypeDefinedOrder {},
)
})
.collect()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use parquet_format_safe::{BoundaryOrder, ColumnIndex, OffsetIndex, PageLocation};
use polars_parquet_format::{BoundaryOrder, ColumnIndex, OffsetIndex, PageLocation};

use crate::parquet::error::{ParquetError, ParquetResult};
use crate::parquet::write::page::{is_data_page, PageWriteSpec};
Expand Down Expand Up @@ -48,6 +48,8 @@ pub fn serialize_column_index(pages: &[PageWriteSpec]) -> ParquetResult<ColumnIn
max_values,
boundary_order: BoundaryOrder::UNORDERED,
null_counts: Some(null_counts),
repetition_level_histograms: None,
definition_level_histograms: None,
})
}

Expand All @@ -68,5 +70,8 @@ pub fn serialize_offset_index(pages: &[PageWriteSpec]) -> ParquetResult<OffsetIn
})
.collect::<ParquetResult<Vec<_>>>()?;

Ok(OffsetIndex { page_locations })
Ok(OffsetIndex {
page_locations,
unencoded_byte_array_data_bytes: None,
})
}
Loading