From 151714725b6422b9fa54fd608a474645b5132385 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Wed, 3 Jul 2024 21:27:52 +1000 Subject: [PATCH] c --- crates/polars-arrow/src/doc/lib.md | 2 +- crates/polars-io/src/lib.rs | 1 - crates/polars-io/src/parquet/write/options.rs | 2 +- crates/polars-io/src/parquet/write/writer.rs | 16 ++- crates/polars-io/src/partition.rs | 88 ++++++++++++++ crates/polars-parquet/src/arrow/write/mod.rs | 4 +- .../src/executors/sinks/output/parquet.rs | 4 +- .../polars/tests/it/io/parquet/arrow/mod.rs | 2 +- .../tests/it/io/parquet/arrow/read_indexes.rs | 4 +- .../polars/tests/it/io/parquet/arrow/write.rs | 2 +- .../polars/tests/it/io/parquet/roundtrip.rs | 2 +- py-polars/polars/dataframe/frame.py | 84 +++++++++++++ py-polars/polars/lazyframe/frame.py | 6 +- py-polars/src/dataframe/io.rs | 36 ++++++ py-polars/src/lazyframe/mod.rs | 6 +- py-polars/tests/unit/io/test_hive.py | 113 +++++++++++------- 16 files changed, 309 insertions(+), 63 deletions(-) diff --git a/crates/polars-arrow/src/doc/lib.md b/crates/polars-arrow/src/doc/lib.md index 61bc87c4d7b38..dd10d361bd80e 100644 --- a/crates/polars-arrow/src/doc/lib.md +++ b/crates/polars-arrow/src/doc/lib.md @@ -42,7 +42,7 @@ fn main() -> Result<()> { write_statistics: true, compression: CompressionOptions::Snappy, version: Version::V1, - data_pagesize_limit: None, + data_page_size: None, }; let row_groups = RowGroupIterator::try_new( diff --git a/crates/polars-io/src/lib.rs b/crates/polars-io/src/lib.rs index d5fc527b822bd..c3e7a43546fb7 100644 --- a/crates/polars-io/src/lib.rs +++ b/crates/polars-io/src/lib.rs @@ -19,7 +19,6 @@ pub mod ndjson; mod options; #[cfg(feature = "parquet")] pub mod parquet; -#[cfg(feature = "partition")] pub mod partition; #[cfg(feature = "async")] pub mod pl_async; diff --git a/crates/polars-io/src/parquet/write/options.rs b/crates/polars-io/src/parquet/write/options.rs index d65a325522337..4e4bfa9e1edfb 100644 --- a/crates/polars-io/src/parquet/write/options.rs +++ b/crates/polars-io/src/parquet/write/options.rs @@ -16,7 +16,7 @@ pub struct ParquetWriteOptions { /// If `None` will be all written to a single row group. pub row_group_size: Option, /// if `None` will be 1024^2 bytes - pub data_pagesize_limit: Option, + pub data_page_size: Option, /// maintain the order the data was processed pub maintain_order: bool, } diff --git a/crates/polars-io/src/parquet/write/writer.rs b/crates/polars-io/src/parquet/write/writer.rs index cd03a77cfed81..5b17dfbe6eec5 100644 --- a/crates/polars-io/src/parquet/write/writer.rs +++ b/crates/polars-io/src/parquet/write/writer.rs @@ -10,9 +10,23 @@ use polars_parquet::write::{ use super::batched_writer::BatchedWriter; use super::options::ParquetCompression; +use super::ParquetWriteOptions; use crate::prelude::chunk_df_for_writing; use crate::shared::schema_to_arrow_checked; +impl ParquetWriteOptions { + pub fn to_writer(&self, f: F) -> ParquetWriter + where + F: Write, + { + ParquetWriter::new(f) + .with_compression(self.compression) + .with_statistics(self.statistics) + .with_row_group_size(self.row_group_size) + .with_data_page_size(self.data_page_size) + } +} + /// Write a DataFrame to Parquet format. #[must_use] pub struct ParquetWriter { @@ -103,7 +117,7 @@ where statistics: self.statistics, compression: self.compression, version: Version::V1, - data_pagesize_limit: self.data_page_size, + data_page_size: self.data_page_size, } } diff --git a/crates/polars-io/src/partition.rs b/crates/polars-io/src/partition.rs index b25f141898173..d9c1155836f25 100644 --- a/crates/polars-io/src/partition.rs +++ b/crates/polars-io/src/partition.rs @@ -9,6 +9,7 @@ use polars_core::series::IsSorted; use polars_core::POOL; use rayon::prelude::*; +use crate::parquet::write::ParquetWriteOptions; use crate::utils::resolve_homedir; use crate::WriterFactory; @@ -127,3 +128,90 @@ where } path } + +pub fn write_partitioned_dataset( + df: &DataFrame, + path: &Path, + partition_by: &[S], + write_options: &ParquetWriteOptions, +) -> PolarsResult<()> +where + S: AsRef, +{ + let base_path = path; + + for (path_part, mut part_df) in get_hive_partitions_iter(df, partition_by.as_ref())? { + let dir = base_path.join(path_part); + std::fs::create_dir_all(&dir)?; + let f = std::fs::File::create(dir.join("data.parquet"))?; + write_options.to_writer(f).finish(&mut part_df)?; + } + + Ok(()) +} + +/// Creates an iterator of (hive partition path, DataFrame) pairs, e.g.: +/// ("a=1/b=1", DataFrame) +fn get_hive_partitions_iter<'a, S>( + df: &'a DataFrame, + partition_by: &'a [S], +) -> PolarsResult + 'a>> +where + S: AsRef, +{ + for x in partition_by.iter() { + if df.get_column_index(x.as_ref()).is_none() { + polars_bail!(ColumnNotFound: "{}", x.as_ref()); + } + } + + let get_hive_path_part = |df: &DataFrame| { + const CHAR_SET: &percent_encoding::AsciiSet = &percent_encoding::CONTROLS + .add(b'/') + .add(b'=') + .add(b':') + .add(b' '); + + partition_by + .iter() + .map(|x| { + let s = df.column(x.as_ref()).unwrap().slice(0, 1); + + format!( + "{}={}", + s.name(), + percent_encoding::percent_encode( + s.cast(&DataType::String) + .unwrap() + .str() + .unwrap() + .get(0) + .unwrap_or("__HIVE_DEFAULT_PARTITION__") + .as_bytes(), + CHAR_SET + ) + ) + }) + .collect::>() + .join("/") + }; + + let groups = df.group_by(partition_by)?; + let groups = groups.take_groups(); + + let out: Box> = match groups { + GroupsProxy::Idx(idx) => Box::new(idx.into_iter().map(move |(_, group)| { + let part_df = + unsafe { df._take_unchecked_slice_sorted(&group, false, IsSorted::Ascending) }; + (get_hive_path_part(&part_df), part_df) + })), + GroupsProxy::Slice { groups, .. } => { + Box::new(groups.into_iter().map(move |[offset, len]| { + let part_df = df.slice(offset as i64, len as usize); + (get_hive_path_part(&part_df), part_df) + })) + }, + }; + + Ok(out) +} diff --git a/crates/polars-parquet/src/arrow/write/mod.rs b/crates/polars-parquet/src/arrow/write/mod.rs index 90ed2269c1a5b..9022bab0e2c94 100644 --- a/crates/polars-parquet/src/arrow/write/mod.rs +++ b/crates/polars-parquet/src/arrow/write/mod.rs @@ -81,7 +81,7 @@ pub struct WriteOptions { /// The compression to apply to every page pub compression: CompressionOptions, /// The size to flush a page, defaults to 1024 * 1024 if None - pub data_pagesize_limit: Option, + pub data_page_size: Option, } use arrow::compute::aggregate::estimated_bytes_size; @@ -298,7 +298,7 @@ pub fn array_to_pages( let byte_size = estimated_bytes_size(primitive_array); const DEFAULT_PAGE_SIZE: usize = 1024 * 1024; - let max_page_size = options.data_pagesize_limit.unwrap_or(DEFAULT_PAGE_SIZE); + let max_page_size = options.data_page_size.unwrap_or(DEFAULT_PAGE_SIZE); let max_page_size = max_page_size.min(2usize.pow(31) - 2usize.pow(25)); // allowed maximum page size let bytes_per_row = if number_of_rows == 0 { 0 diff --git a/crates/polars-pipe/src/executors/sinks/output/parquet.rs b/crates/polars-pipe/src/executors/sinks/output/parquet.rs index b3d64341d1293..e591279bbc74d 100644 --- a/crates/polars-pipe/src/executors/sinks/output/parquet.rs +++ b/crates/polars-pipe/src/executors/sinks/output/parquet.rs @@ -63,7 +63,7 @@ impl ParquetSink { let file = std::fs::File::create(path)?; let writer = ParquetWriter::new(file) .with_compression(options.compression) - .with_data_page_size(options.data_pagesize_limit) + .with_data_page_size(options.data_page_size) .with_statistics(options.statistics) .with_row_group_size(options.row_group_size) // This is important! Otherwise we will deadlock @@ -154,7 +154,7 @@ impl ParquetCloudSink { let cloud_writer = polars_io::cloud::CloudWriter::new(uri, cloud_options).await?; let writer = ParquetWriter::new(cloud_writer) .with_compression(parquet_options.compression) - .with_data_page_size(parquet_options.data_pagesize_limit) + .with_data_page_size(parquet_options.data_page_size) .with_statistics(parquet_options.statistics) .with_row_group_size(parquet_options.row_group_size) // This is important! Otherwise we will deadlock diff --git a/crates/polars/tests/it/io/parquet/arrow/mod.rs b/crates/polars/tests/it/io/parquet/arrow/mod.rs index f70866a58a502..2a3423ddb9e25 100644 --- a/crates/polars/tests/it/io/parquet/arrow/mod.rs +++ b/crates/polars/tests/it/io/parquet/arrow/mod.rs @@ -1260,7 +1260,7 @@ fn integration_write( statistics: StatisticsOptions::full(), compression: CompressionOptions::Uncompressed, version: Version::V1, - data_pagesize_limit: None, + data_page_size: None, }; let encodings = schema diff --git a/crates/polars/tests/it/io/parquet/arrow/read_indexes.rs b/crates/polars/tests/it/io/parquet/arrow/read_indexes.rs index d758557a6c1d1..8cffac4b35f68 100644 --- a/crates/polars/tests/it/io/parquet/arrow/read_indexes.rs +++ b/crates/polars/tests/it/io/parquet/arrow/read_indexes.rs @@ -32,7 +32,7 @@ fn pages( statistics: StatisticsOptions::full(), compression: CompressionOptions::Uncompressed, version: Version::V1, - data_pagesize_limit: None, + data_page_size: None, }; let pages1 = [array11, array12, array13] @@ -82,7 +82,7 @@ fn read_with_indexes( statistics: StatisticsOptions::full(), compression: CompressionOptions::Uncompressed, version: Version::V1, - data_pagesize_limit: None, + data_page_size: None, }; let to_compressed = |pages: Vec| { diff --git a/crates/polars/tests/it/io/parquet/arrow/write.rs b/crates/polars/tests/it/io/parquet/arrow/write.rs index 4dc71dd49ea2b..9c25f346c2e1b 100644 --- a/crates/polars/tests/it/io/parquet/arrow/write.rs +++ b/crates/polars/tests/it/io/parquet/arrow/write.rs @@ -48,7 +48,7 @@ fn round_trip_opt_stats( statistics: StatisticsOptions::full(), compression, version, - data_pagesize_limit: None, + data_page_size: None, }; let iter = vec![RecordBatchT::try_new(vec![array.clone()])]; diff --git a/crates/polars/tests/it/io/parquet/roundtrip.rs b/crates/polars/tests/it/io/parquet/roundtrip.rs index dd55eac0e9f5a..355f2b7325322 100644 --- a/crates/polars/tests/it/io/parquet/roundtrip.rs +++ b/crates/polars/tests/it/io/parquet/roundtrip.rs @@ -23,7 +23,7 @@ fn round_trip( statistics: StatisticsOptions::full(), compression, version, - data_pagesize_limit: None, + data_page_size: None, }; let iter = vec![RecordBatchT::try_new(vec![array.clone()])]; diff --git a/py-polars/polars/dataframe/frame.py b/py-polars/polars/dataframe/frame.py index c0cd3507c1a95..fb44e38e4e638 100644 --- a/py-polars/polars/dataframe/frame.py +++ b/py-polars/polars/dataframe/frame.py @@ -3618,6 +3618,90 @@ def write_parquet( data_page_size, ) + def write_parquet_partitioned( + self, + path: str | Path, + partition_by: str | Collection[str], + *, + compression: ParquetCompression = "zstd", + compression_level: int | None = None, + statistics: bool | str | dict[str, bool] = True, + row_group_size: int | None = None, + data_page_size: int | None = None, + ) -> None: + """ + Write a partitioned directory of parquet files. + + Parameters + ---------- + path + Path to the base directory for the partitioned dataset. + partition_by + Columns to partition by. + compression : {'lz4', 'uncompressed', 'snappy', 'gzip', 'lzo', 'brotli', 'zstd'} + Choose "zstd" for good compression performance. + Choose "lz4" for fast compression/decompression. + Choose "snappy" for more backwards compatibility guarantees + when you deal with older parquet readers. + compression_level + The level of compression to use. Higher compression means smaller files on + disk. + + - "gzip" : min-level: 0, max-level: 10. + - "brotli" : min-level: 0, max-level: 11. + - "zstd" : min-level: 1, max-level: 22. + + statistics + Write statistics to the parquet headers. This is the default behavior. + + Possible values: + + - `True`: enable default set of statistics (default) + - `False`: disable all statistics + - "full": calculate and write all available statistics. Cannot be + combined with `use_pyarrow`. + - `{ "statistic-key": True / False, ... }`. Cannot be combined with + `use_pyarrow`. Available keys: + + - "min": column minimum value (default: `True`) + - "max": column maximum value (default: `True`) + - "distinct_count": number of unique column values (default: `False`) + - "null_count": number of null values in column (default: `True`) + row_group_size + Size of the row groups in number of rows. Defaults to 512^2 rows. + data_page_size + Size of the data page in bytes. Defaults to 1024^2 bytes. + """ + path = normalize_filepath(path, check_not_directory=False) + partition_by = [partition_by] if isinstance(partition_by, str) else partition_by + + if isinstance(statistics, bool) and statistics: + statistics = { + "min": True, + "max": True, + "distinct_count": False, + "null_count": True, + } + elif isinstance(statistics, bool) and not statistics: + statistics = {} + elif statistics == "full": + statistics = { + "min": True, + "max": True, + "distinct_count": True, + "null_count": True, + } + + self._df.write_parquet_partitioned( + path, + partition_by, + compression, + compression_level, + statistics, + row_group_size, + data_page_size, + ) + def write_database( self, table_name: str, diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index 096e5e40f74e5..55f12f5223f3b 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -2154,7 +2154,7 @@ def sink_parquet( compression_level: int | None = None, statistics: bool | str | dict[str, bool] = True, row_group_size: int | None = None, - data_pagesize_limit: int | None = None, + data_page_size: int | None = None, maintain_order: bool = True, type_coercion: bool = True, predicate_pushdown: bool = True, @@ -2209,7 +2209,7 @@ def sink_parquet( If None (default), the chunks of the `DataFrame` are used. Writing in smaller chunks may reduce memory pressure and improve writing speeds. - data_pagesize_limit + data_page_size Size limit of individual data pages. If not set defaults to 1024 * 1024 bytes maintain_order @@ -2269,7 +2269,7 @@ def sink_parquet( compression_level=compression_level, statistics=statistics, row_group_size=row_group_size, - data_pagesize_limit=data_pagesize_limit, + data_page_size=data_page_size, maintain_order=maintain_order, ) diff --git a/py-polars/src/dataframe/io.rs b/py-polars/src/dataframe/io.rs index 4617afb7bf26c..2e176f416c4ee 100644 --- a/py-polars/src/dataframe/io.rs +++ b/py-polars/src/dataframe/io.rs @@ -438,6 +438,42 @@ impl PyDataFrame { Ok(()) } + #[cfg(feature = "parquet")] + #[pyo3(signature = (py_f, partition_by, compression, compression_level, statistics, row_group_size, data_page_size))] + pub fn write_parquet_partitioned( + &mut self, + py: Python, + py_f: PyObject, + partition_by: Vec, + compression: &str, + compression_level: Option, + statistics: Wrap, + row_group_size: Option, + data_page_size: Option, + ) -> PyResult<()> { + use std::path::Path; + + use polars_io::partition::write_partitioned_dataset; + + let Ok(path) = py_f.extract::(py) else { + return Err(PyPolarsErr::from(polars_err!(ComputeError: "expected path-like")).into()); + }; + let path = Path::new(&*path); + let compression = parse_parquet_compression(compression, compression_level)?; + + let write_options = ParquetWriteOptions { + compression, + statistics: statistics.0, + row_group_size, + data_page_size, + maintain_order: true, + }; + + write_partitioned_dataset(&self.df, path, partition_by.as_slice(), &write_options) + .map_err(PyPolarsErr::from)?; + Ok(()) + } + #[cfg(feature = "json")] pub fn write_json(&mut self, py_f: PyObject) -> PyResult<()> { let file = BufWriter::new(get_file_like(py_f, true)?); diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index 7345d54f03999..814e83febb0b4 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -614,7 +614,7 @@ impl PyLazyFrame { } #[cfg(all(feature = "streaming", feature = "parquet"))] - #[pyo3(signature = (path, compression, compression_level, statistics, row_group_size, data_pagesize_limit, maintain_order))] + #[pyo3(signature = (path, compression, compression_level, statistics, row_group_size, data_page_size, maintain_order))] fn sink_parquet( &self, py: Python, @@ -623,7 +623,7 @@ impl PyLazyFrame { compression_level: Option, statistics: Wrap, row_group_size: Option, - data_pagesize_limit: Option, + data_page_size: Option, maintain_order: bool, ) -> PyResult<()> { let compression = parse_parquet_compression(compression, compression_level)?; @@ -632,7 +632,7 @@ impl PyLazyFrame { compression, statistics: statistics.0, row_group_size, - data_pagesize_limit, + data_page_size, maintain_order, }; diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index 98ff82408eb68..0787c7cc77bfc 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -1,11 +1,9 @@ -import os import sys import urllib.parse import warnings from collections import OrderedDict from datetime import datetime from functools import partial -from multiprocessing import get_context from pathlib import Path from typing import Any, Callable @@ -88,54 +86,16 @@ def test_hive_partitioned_predicate_pushdown( ) -def init_env_spawned_single_threaded_async() -> None: - os.environ["SPAWNED_PROCESS"] = "1" - os.environ["POLARS_MAX_THREADS"] = "1" - os.environ["POLARS_PREFETCH_SIZE"] = "1" - - @pytest.mark.xdist_group("streaming") @pytest.mark.write_disk() -def test_hive_partitioned_predicate_pushdown_single_threaded_async( +def test_hive_partitioned_predicate_pushdown_single_threaded_async_17155( io_files_path: Path, tmp_path: Path, monkeypatch: Any, capfd: Any, ) -> None: - # We need to run this in a separate process to avoid leakage of - # `POLARS_MAX_THREADS`. You can test this locally (on a - # system with > 1 threads) by removing the process-spawning logic and - # directly calling `init_env_spawned_single_threaded_async`, and then - # running: - # ``` - # python -m pytest py-polars/tests/unit/io/ -m '' -k \ - # test_hive_partitioned_predicate_pushdown - # ``` - # And observe that the below assertion of `thread_pool_size` will fail. - if "SPAWNED_PROCESS" not in os.environ: - with get_context("spawn").Pool( - 1, initializer=init_env_spawned_single_threaded_async - ) as p: - pytest_path = Path(__file__).relative_to(Path.cwd()) - pytest_path: str = f"{pytest_path}::test_hive_partitioned_predicate_pushdown_single_threaded_async" # type: ignore[no-redef] - - assert ( - p.map( - pytest.main, # type: ignore[arg-type] - [ - [ - pytest_path, - "-m", - "", - ] - ], - )[0] - == 0 - ) - - return - - assert pl.thread_pool_size() == 1 + monkeypatch.setenv("POLARS_FORCE_ASYNC", "1") + monkeypatch.setenv("POLARS_PREFETCH_SIZE", "1") impl_test_hive_partitioned_predicate_pushdown( io_files_path, @@ -580,7 +540,7 @@ def assert_with_projections(lf: pl.LazyFrame, df: pl.DataFrame) -> None: @pytest.mark.write_disk() -def test_hive_partition_dates(tmp_path: Path, monkeypatch: Any) -> None: +def test_hive_partition_dates(tmp_path: Path) -> None: df = pl.DataFrame( { "date1": [ @@ -646,3 +606,68 @@ def test_hive_partition_dates(tmp_path: Path, monkeypatch: Any) -> None: lf.collect(), df.with_columns(pl.col("date1", "date2").cast(pl.String)), ) + + +@pytest.mark.parametrize( + "df", + [ + pl.select( + pl.Series("a", [1, 2, 3, 4], dtype=pl.Int8), + pl.Series("b", [1, 2, 3, 4], dtype=pl.Int8), + pl.Series("x", [1, 2, 3, 4]), + ), + pl.select( + pl.Series( + "a", + [1.2981275, 2.385974035, 3.1231892749185718397510, 4.129387128949156], + dtype=pl.Float64, + ), + pl.Series("b", ["a", "b", " / c = : ", "d"]), + pl.Series("x", [1, 2, 3, 4]), + ), + ], +) +@pytest.mark.write_disk() +def test_hive_write(tmp_path: Path, df: pl.DataFrame) -> None: + root = tmp_path + df.write_parquet_partitioned(root, ["a", "b"]) + + lf = pl.scan_parquet(root) + assert_frame_equal(lf.collect(), df) + + lf = pl.scan_parquet(root, hive_schema={"a": pl.String, "b": pl.String}) + assert_frame_equal(lf.collect(), df.with_columns(pl.col("a", "b").cast(pl.String))) + + +@pytest.mark.write_disk() +def test_hive_write_dates(tmp_path: Path) -> None: + df = pl.DataFrame( + { + "date1": [ + datetime(2024, 1, 1), + datetime(2024, 2, 1), + datetime(2024, 3, 1), + None, + ], + "date2": [ + datetime(2023, 1, 1), + datetime(2023, 2, 1), + None, + datetime(2023, 3, 1, 1, 1, 1, 1), + ], + "x": [1, 2, 3, 4], + }, + schema={"date1": pl.Date, "date2": pl.Datetime, "x": pl.Int32}, + ) + + root = tmp_path + df.write_parquet_partitioned(root, ["date1", "date2"]) + + lf = pl.scan_parquet(root) + assert_frame_equal(lf.collect(), df) + + lf = pl.scan_parquet(root, try_parse_hive_dates=False) + assert_frame_equal( + lf.collect(), + df.with_columns(pl.col("date1", "date2").cast(pl.String)), + )