Skip to content

Commit

Permalink
Parquet doc tweaks (#4680)
Browse files Browse the repository at this point in the history
* Parquet doc tweaks

* Update parquet/src/arrow/mod.rs
  • Loading branch information
tustvold authored Aug 18, 2023
1 parent 1afc7c3 commit 2c487d0
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 32 deletions.
2 changes: 2 additions & 0 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ pub struct SyncReader<T: ChunkReader>(T);
/// A synchronous builder used to construct [`ParquetRecordBatchReader`] for a file
///
/// For an async API see [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`]
///
/// See [`ArrowReaderBuilder`] for additional member functions
pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;

impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
Expand Down
51 changes: 43 additions & 8 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
//! # #[tokio::main(flavor="current_thread")]
//! # async fn main() {
//! #
//! use arrow_array::RecordBatch;
//! use arrow::util::pretty::pretty_format_batches;
//! use futures::TryStreamExt;
//! use tokio::fs::File;
//!
//! use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
//!
//! # use arrow_array::RecordBatch;
//! # use arrow::util::pretty::pretty_format_batches;
//! # use futures::TryStreamExt;
//! # use tokio::fs::File;
//! #
//! # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
//! #
//! # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
//! # let formatted = pretty_format_batches(batches).unwrap().to_string();
//! # let actual_lines: Vec<_> = formatted.trim().lines().collect();
Expand All @@ -38,7 +38,7 @@
//! # expected_lines, actual_lines
//! # );
//! # }
//!
//! #
//! let testdata = arrow::util::test_util::parquet_test_data();
//! let path = format!("{}/alltypes_plain.parquet", testdata);
//! let file = File::open(path).await.unwrap();
Expand Down Expand Up @@ -241,6 +241,8 @@ pub struct AsyncReader<T>(T);
/// In particular, this handles reading the parquet file metadata, allowing consumers
/// to use this information to select what specific columns, row groups, etc...
/// they wish to be read by the resulting stream
///
/// See [`ArrowReaderBuilder`] for additional member functions
pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>;

impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
Expand All @@ -263,6 +265,39 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
///
/// This allows loading metadata once and using it to create multiple builders with
/// potentially different settings
///
/// ```
/// # use std::fs::metadata;
/// # use std::sync::Arc;
/// # use bytes::Bytes;
/// # use arrow_array::{Int32Array, RecordBatch};
/// # use arrow_schema::{DataType, Field, Schema};
/// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
/// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
/// # use tempfile::tempfile;
/// # use futures::StreamExt;
/// # #[tokio::main(flavor="current_thread")]
/// # async fn main() {
/// #
/// let mut file = tempfile().unwrap();
/// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
/// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
/// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
/// # writer.write(&batch).unwrap();
/// # writer.close().unwrap();
/// #
/// let mut file = tokio::fs::File::from_std(file);
/// let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await.unwrap();
/// let mut a = ParquetRecordBatchStreamBuilder::new_with_metadata(
/// file.try_clone().await.unwrap(),
/// meta.clone()
/// ).build().unwrap();
/// let mut b = ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta).build().unwrap();
///
/// // Should be able to read from both in parallel
/// assert_eq!(a.next().await.unwrap().unwrap(), b.next().await.unwrap().unwrap());
/// # }
/// ```
pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
Self::new_builder(AsyncReader(input), metadata)
}
Expand Down
41 changes: 17 additions & 24 deletions parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,47 +25,40 @@
//!# Example of writing Arrow record batch to Parquet file
//!
//!```rust
//! use arrow_array::{Int32Array, ArrayRef};
//! use arrow_array::RecordBatch;
//! use parquet::arrow::arrow_writer::ArrowWriter;
//! use parquet::file::properties::WriterProperties;
//! use std::fs::File;
//! use std::sync::Arc;
//! # use arrow_array::{Int32Array, ArrayRef};
//! # use arrow_array::RecordBatch;
//! # use parquet::arrow::arrow_writer::ArrowWriter;
//! # use parquet::file::properties::WriterProperties;
//! # use tempfile::tempfile;
//! # use std::sync::Arc;
//! # use parquet::basic::Compression;
//! let ids = Int32Array::from(vec![1, 2, 3, 4]);
//! let vals = Int32Array::from(vec![5, 6, 7, 8]);
//! let batch = RecordBatch::try_from_iter(vec![
//! ("id", Arc::new(ids) as ArrayRef),
//! ("val", Arc::new(vals) as ArrayRef),
//! ]).unwrap();
//!
//! let file = File::create("data.parquet").unwrap();
//! let file = tempfile().unwrap();
//!
//! let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap();
//! // WriterProperties can be used to set Parquet file options
//! let props = WriterProperties::builder()
//! .set_compression(Compression::SNAPPY)
//! .build();
//!
//! let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).unwrap();
//!
//! writer.write(&batch).expect("Writing batch");
//!
//! // writer must be closed to write footer
//! writer.close().unwrap();
//! ```
//!
//! `WriterProperties` can be used to set Parquet file options
//! ```rust
//! use parquet::file::properties::WriterProperties;
//! use parquet::basic::{ Compression, Encoding };
//! use parquet::file::properties::WriterVersion;
//!
//! // File compression
//! let props = WriterProperties::builder()
//! .set_compression(Compression::SNAPPY)
//! .build();
//! ```
//!
//! # Example of reading parquet file into arrow record batch
//!
//! ```rust
//! use std::fs::File;
//! use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
//!
//! # use std::fs::File;
//! # use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
//! # use std::sync::Arc;
//! # use arrow_array::Int32Array;
//! # use arrow::datatypes::{DataType, Field, Schema};
Expand All @@ -88,7 +81,7 @@
//! # writer.write(&batch).expect("Writing batch");
//! # }
//! # writer.close().unwrap();
//!
//! #
//! let file = File::open("data.parquet").unwrap();
//!
//! let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
Expand Down

0 comments on commit 2c487d0

Please sign in to comment.