Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Jul 3, 2024
1 parent 760d5e3 commit 8bd264b
Show file tree
Hide file tree
Showing 17 changed files with 353 additions and 81 deletions.
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/doc/lib.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fn main() -> Result<()> {
write_statistics: true,
compression: CompressionOptions::Snappy,
version: Version::V1,
data_pagesize_limit: None,
data_page_size: None,
};

let row_groups = RowGroupIterator::try_new(
Expand Down
1 change: 0 additions & 1 deletion crates/polars-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ pub mod ndjson;
mod options;
#[cfg(feature = "parquet")]
pub mod parquet;
#[cfg(feature = "partition")]
pub mod partition;
#[cfg(feature = "async")]
pub mod pl_async;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/write/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub struct ParquetWriteOptions {
/// If `None` will be all written to a single row group.
pub row_group_size: Option<usize>,
/// if `None` will be 1024^2 bytes
pub data_pagesize_limit: Option<usize>,
pub data_page_size: Option<usize>,
/// maintain the order the data was processed
pub maintain_order: bool,
}
Expand Down
16 changes: 15 additions & 1 deletion crates/polars-io/src/parquet/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,23 @@ use polars_parquet::write::{

use super::batched_writer::BatchedWriter;
use super::options::ParquetCompression;
use super::ParquetWriteOptions;
use crate::prelude::chunk_df_for_writing;
use crate::shared::schema_to_arrow_checked;

impl ParquetWriteOptions {
pub fn to_writer<F>(&self, f: F) -> ParquetWriter<F>
where
F: Write,
{
ParquetWriter::new(f)
.with_compression(self.compression)
.with_statistics(self.statistics)
.with_row_group_size(self.row_group_size)
.with_data_page_size(self.data_page_size)
}
}

/// Write a DataFrame to Parquet format.
#[must_use]
pub struct ParquetWriter<W> {
Expand Down Expand Up @@ -103,7 +117,7 @@ where
statistics: self.statistics,
compression: self.compression,
version: Version::V1,
data_pagesize_limit: self.data_page_size,
data_page_size: self.data_page_size,
}
}

Expand Down
105 changes: 105 additions & 0 deletions crates/polars-io/src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use polars_core::series::IsSorted;
use polars_core::POOL;
use rayon::prelude::*;

use crate::parquet::write::ParquetWriteOptions;
use crate::utils::resolve_homedir;
use crate::WriterFactory;

Expand Down Expand Up @@ -127,3 +128,107 @@ where
}
path
}

pub fn write_partitioned_dataset<S>(
df: &DataFrame,
path: &Path,
partition_by: &[S],
write_options: &ParquetWriteOptions,
) -> PolarsResult<()>
where
S: AsRef<str>,
{
let base_path = path;

for (path_part, part_df) in get_hive_partitions_iter(df, partition_by.as_ref())? {
let dir = base_path.join(path_part);
std::fs::create_dir_all(&dir)?;

const ROWS_PER_FILE: usize = 1_048_576;
fn get_path_for_index(i: usize) -> String {
format!("{:013x}.parquet", i)
}

for (i, slice_start) in (0..part_df.height()).step_by(ROWS_PER_FILE).enumerate() {
let f = std::fs::File::create(dir.join(get_path_for_index(i)))?;

write_options
.to_writer(f)
.finish(&mut part_df.slice(slice_start as i64, ROWS_PER_FILE))?;
}
}

Ok(())
}

/// Creates an iterator of (hive partition path, DataFrame) pairs, e.g.:
/// ("a=1/b=1", DataFrame)
fn get_hive_partitions_iter<'a, S>(
df: &'a DataFrame,
partition_by: &'a [S],
) -> PolarsResult<Box<dyn Iterator<Item = (String, DataFrame)> + 'a>>
where
S: AsRef<str>,
{
let schema = df.schema();

let partition_by_col_idx = partition_by
.iter()
.map(|x| {
let Some(i) = schema.index_of(x.as_ref()) else {
polars_bail!(ColumnNotFound: "{}", x.as_ref())
};
Ok(i)
})
.collect::<PolarsResult<Vec<_>>>()?;

let get_hive_path_part = move |df: &DataFrame| {
const CHAR_SET: &percent_encoding::AsciiSet = &percent_encoding::CONTROLS
.add(b'/')
.add(b'=')
.add(b':')
.add(b' ');

let cols = df.get_columns();

partition_by_col_idx
.iter()
.map(|&i| {
let s = &cols[i].slice(0, 1).cast(&DataType::String).unwrap();

format!(
"{}={}",
s.name(),
percent_encoding::percent_encode(
s.str()
.unwrap()
.get(0)
.unwrap_or("__HIVE_DEFAULT_PARTITION__")
.as_bytes(),
CHAR_SET
)
)
})
.collect::<Vec<_>>()
.join("/")
};

let groups = df.group_by(partition_by)?;
let groups = groups.take_groups();

let out: Box<dyn Iterator<Item = (String, DataFrame)>> = match groups {
GroupsProxy::Idx(idx) => Box::new(idx.into_iter().map(move |(_, group)| {
let part_df =
unsafe { df._take_unchecked_slice_sorted(&group, false, IsSorted::Ascending) };
(get_hive_path_part(&part_df), part_df)
})),
GroupsProxy::Slice { groups, .. } => {
Box::new(groups.into_iter().map(move |[offset, len]| {
let part_df = df.slice(offset as i64, len as usize);
(get_hive_path_part(&part_df), part_df)
}))
},
};

Ok(out)
}
4 changes: 2 additions & 2 deletions crates/polars-parquet/src/arrow/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub struct WriteOptions {
/// The compression to apply to every page
pub compression: CompressionOptions,
/// The size to flush a page, defaults to 1024 * 1024 if None
pub data_pagesize_limit: Option<usize>,
pub data_page_size: Option<usize>,
}

use arrow::compute::aggregate::estimated_bytes_size;
Expand Down Expand Up @@ -298,7 +298,7 @@ pub fn array_to_pages(
let byte_size = estimated_bytes_size(primitive_array);

const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
let max_page_size = options.data_pagesize_limit.unwrap_or(DEFAULT_PAGE_SIZE);
let max_page_size = options.data_page_size.unwrap_or(DEFAULT_PAGE_SIZE);
let max_page_size = max_page_size.min(2usize.pow(31) - 2usize.pow(25)); // allowed maximum page size
let bytes_per_row = if number_of_rows == 0 {
0
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-pipe/src/executors/sinks/output/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl ParquetSink {
let file = std::fs::File::create(path)?;
let writer = ParquetWriter::new(file)
.with_compression(options.compression)
.with_data_page_size(options.data_pagesize_limit)
.with_data_page_size(options.data_page_size)
.with_statistics(options.statistics)
.with_row_group_size(options.row_group_size)
// This is important! Otherwise we will deadlock
Expand Down Expand Up @@ -154,7 +154,7 @@ impl ParquetCloudSink {
let cloud_writer = polars_io::cloud::CloudWriter::new(uri, cloud_options).await?;
let writer = ParquetWriter::new(cloud_writer)
.with_compression(parquet_options.compression)
.with_data_page_size(parquet_options.data_pagesize_limit)
.with_data_page_size(parquet_options.data_page_size)
.with_statistics(parquet_options.statistics)
.with_row_group_size(parquet_options.row_group_size)
// This is important! Otherwise we will deadlock
Expand Down
31 changes: 13 additions & 18 deletions crates/polars-plan/src/plans/hive.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::path::{Path, PathBuf};

use percent_encoding::percent_decode;
use polars_core::error::to_compute_err;
use polars_core::prelude::*;
use polars_io::predicates::{BatchStats, ColumnStats};
use polars_io::prelude::schema_inference::{finish_infer_field_schema, infer_field_schema};
Expand Down Expand Up @@ -68,26 +66,22 @@ pub fn hive_partitions_from_paths(
reader_schema: &Schema,
try_parse_dates: bool,
) -> PolarsResult<Option<Arc<[HivePartitions]>>> {
let paths = paths
.iter()
.map(|x| {
Ok(PathBuf::from(
percent_decode(x.to_str().unwrap().as_bytes())
.decode_utf8()
.map_err(to_compute_err)?
.as_ref(),
))
})
.collect::<PolarsResult<Vec<PathBuf>>>()?;
let paths = paths.as_slice();

let Some(path) = paths.first() else {
return Ok(None);
};

let sep = separator(path);
let path_string = path.to_str().unwrap();

fn parse_hive_string_and_decode(part: &'_ str) -> Option<(&'_ str, std::borrow::Cow<'_, str>)> {
let (k, v) = parse_hive_string(part)?;
let v = percent_encoding::percent_decode(v.as_bytes())
.decode_utf8()
.ok()?;

Some((k, v))
}

macro_rules! get_hive_parts_iter {
($e:expr) => {{
let path_parts = $e[hive_start_idx..].split(sep);
Expand All @@ -97,7 +91,8 @@ pub fn hive_partitions_from_paths(
if index == file_index {
return None;
}
parse_hive_string(part)

parse_hive_string_and_decode(part)
})
}};
}
Expand Down Expand Up @@ -158,7 +153,7 @@ pub fn hive_partitions_from_paths(
continue;
}

entry.insert(infer_field_schema(value, try_parse_dates, false));
entry.insert(infer_field_schema(value.as_ref(), try_parse_dates, false));
}
}

Expand Down Expand Up @@ -264,7 +259,7 @@ fn parse_hive_string(part: &'_ str) -> Option<(&'_ str, &'_ str)> {
// Files are not Hive partitions, so globs are not valid.
if value.contains('*') {
return None;
}
};

Some((name, value))
}
2 changes: 1 addition & 1 deletion crates/polars/tests/it/io/parquet/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,7 @@ fn integration_write(
statistics: StatisticsOptions::full(),
compression: CompressionOptions::Uncompressed,
version: Version::V1,
data_pagesize_limit: None,
data_page_size: None,
};

let encodings = schema
Expand Down
4 changes: 2 additions & 2 deletions crates/polars/tests/it/io/parquet/arrow/read_indexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn pages(
statistics: StatisticsOptions::full(),
compression: CompressionOptions::Uncompressed,
version: Version::V1,
data_pagesize_limit: None,
data_page_size: None,
};

let pages1 = [array11, array12, array13]
Expand Down Expand Up @@ -82,7 +82,7 @@ fn read_with_indexes(
statistics: StatisticsOptions::full(),
compression: CompressionOptions::Uncompressed,
version: Version::V1,
data_pagesize_limit: None,
data_page_size: None,
};

let to_compressed = |pages: Vec<Page>| {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars/tests/it/io/parquet/arrow/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn round_trip_opt_stats(
statistics: StatisticsOptions::full(),
compression,
version,
data_pagesize_limit: None,
data_page_size: None,
};

let iter = vec![RecordBatchT::try_new(vec![array.clone()])];
Expand Down
2 changes: 1 addition & 1 deletion crates/polars/tests/it/io/parquet/roundtrip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn round_trip(
statistics: StatisticsOptions::full(),
compression,
version,
data_pagesize_limit: None,
data_page_size: None,
};

let iter = vec![RecordBatchT::try_new(vec![array.clone()])];
Expand Down
Loading

0 comments on commit 8bd264b

Please sign in to comment.