From 2c487d0eba33569086887d434d971129a77db4eb Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Fri, 18 Aug 2023 12:37:12 +0100 Subject: [PATCH] Parquet doc tweaks (#4680) * Parquet doc tweaks * Update parquet/src/arrow/mod.rs --- parquet/src/arrow/arrow_reader/mod.rs | 2 ++ parquet/src/arrow/async_reader/mod.rs | 51 ++++++++++++++++++++++----- parquet/src/arrow/mod.rs | 41 +++++++++------------ 3 files changed, 62 insertions(+), 32 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index f7cecabb01d8..5f95a8664b4b 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -304,6 +304,8 @@ pub struct SyncReader(T); /// A synchronous builder used to construct [`ParquetRecordBatchReader`] for a file /// /// For an async API see [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`] +/// +/// See [`ArrowReaderBuilder`] for additional member functions pub type ParquetRecordBatchReaderBuilder = ArrowReaderBuilder>; impl ParquetRecordBatchReaderBuilder { diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 54793c47fea1..7d30580ece93 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -22,13 +22,13 @@ //! # #[tokio::main(flavor="current_thread")] //! # async fn main() { //! # -//! use arrow_array::RecordBatch; -//! use arrow::util::pretty::pretty_format_batches; -//! use futures::TryStreamExt; -//! use tokio::fs::File; -//! -//! use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; -//! +//! # use arrow_array::RecordBatch; +//! # use arrow::util::pretty::pretty_format_batches; +//! # use futures::TryStreamExt; +//! # use tokio::fs::File; +//! # +//! # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; +//! # //! # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) { //! # let formatted = pretty_format_batches(batches).unwrap().to_string(); //! # let actual_lines: Vec<_> = formatted.trim().lines().collect(); @@ -38,7 +38,7 @@ //! # expected_lines, actual_lines //! # ); //! # } -//! +//! # //! let testdata = arrow::util::test_util::parquet_test_data(); //! let path = format!("{}/alltypes_plain.parquet", testdata); //! let file = File::open(path).await.unwrap(); @@ -241,6 +241,8 @@ pub struct AsyncReader(T); /// In particular, this handles reading the parquet file metadata, allowing consumers /// to use this information to select what specific columns, row groups, etc... /// they wish to be read by the resulting stream +/// +/// See [`ArrowReaderBuilder`] for additional member functions pub type ParquetRecordBatchStreamBuilder = ArrowReaderBuilder>; impl ParquetRecordBatchStreamBuilder { @@ -263,6 +265,39 @@ impl ParquetRecordBatchStreamBuilder { /// /// This allows loading metadata once and using it to create multiple builders with /// potentially different settings + /// + /// ``` + /// # use std::fs::metadata; + /// # use std::sync::Arc; + /// # use bytes::Bytes; + /// # use arrow_array::{Int32Array, RecordBatch}; + /// # use arrow_schema::{DataType, Field, Schema}; + /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata; + /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder}; + /// # use tempfile::tempfile; + /// # use futures::StreamExt; + /// # #[tokio::main(flavor="current_thread")] + /// # async fn main() { + /// # + /// let mut file = tempfile().unwrap(); + /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)])); + /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap(); + /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap(); + /// # writer.write(&batch).unwrap(); + /// # writer.close().unwrap(); + /// # + /// let mut file = tokio::fs::File::from_std(file); + /// let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await.unwrap(); + /// let mut a = ParquetRecordBatchStreamBuilder::new_with_metadata( + /// file.try_clone().await.unwrap(), + /// meta.clone() + /// ).build().unwrap(); + /// let mut b = ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta).build().unwrap(); + /// + /// // Should be able to read from both in parallel + /// assert_eq!(a.next().await.unwrap().unwrap(), b.next().await.unwrap().unwrap()); + /// # } + /// ``` pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self { Self::new_builder(AsyncReader(input), metadata) } diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 8cca79b40e93..0174db6b517f 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -25,12 +25,13 @@ //!# Example of writing Arrow record batch to Parquet file //! //!```rust -//! use arrow_array::{Int32Array, ArrayRef}; -//! use arrow_array::RecordBatch; -//! use parquet::arrow::arrow_writer::ArrowWriter; -//! use parquet::file::properties::WriterProperties; -//! use std::fs::File; -//! use std::sync::Arc; +//! # use arrow_array::{Int32Array, ArrayRef}; +//! # use arrow_array::RecordBatch; +//! # use parquet::arrow::arrow_writer::ArrowWriter; +//! # use parquet::file::properties::WriterProperties; +//! # use tempfile::tempfile; +//! # use std::sync::Arc; +//! # use parquet::basic::Compression; //! let ids = Int32Array::from(vec![1, 2, 3, 4]); //! let vals = Int32Array::from(vec![5, 6, 7, 8]); //! let batch = RecordBatch::try_from_iter(vec![ @@ -38,9 +39,14 @@ //! ("val", Arc::new(vals) as ArrayRef), //! ]).unwrap(); //! -//! let file = File::create("data.parquet").unwrap(); +//! let file = tempfile().unwrap(); //! -//! let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap(); +//! // WriterProperties can be used to set Parquet file options +//! let props = WriterProperties::builder() +//! .set_compression(Compression::SNAPPY) +//! .build(); +//! +//! let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).unwrap(); //! //! writer.write(&batch).expect("Writing batch"); //! @@ -48,24 +54,11 @@ //! writer.close().unwrap(); //! ``` //! -//! `WriterProperties` can be used to set Parquet file options -//! ```rust -//! use parquet::file::properties::WriterProperties; -//! use parquet::basic::{ Compression, Encoding }; -//! use parquet::file::properties::WriterVersion; -//! -//! // File compression -//! let props = WriterProperties::builder() -//! .set_compression(Compression::SNAPPY) -//! .build(); -//! ``` -//! //! # Example of reading parquet file into arrow record batch //! //! ```rust -//! use std::fs::File; -//! use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; -//! +//! # use std::fs::File; +//! # use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; //! # use std::sync::Arc; //! # use arrow_array::Int32Array; //! # use arrow::datatypes::{DataType, Field, Schema}; @@ -88,7 +81,7 @@ //! # writer.write(&batch).expect("Writing batch"); //! # } //! # writer.close().unwrap(); -//! +//! # //! let file = File::open("data.parquet").unwrap(); //! //! let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();