Skip to content

Commit

Permalink
feat: more efficient parquet writer and more statistics (#1397)
Browse files Browse the repository at this point in the history
# Description

* Removed the data copies in a tight loop, which were extremely bad for
performance when writing files > 100MB.
* Rewrote statistics handling to collect null values from metadata, just
like min and max.
* Added support for more types in statistics.

# Related Issue(s)

- closes #1394
- closes #1209 
- closes #1208


# Documentation

<!---
Share links to useful documentation
--->
  • Loading branch information
wjones127 authored May 28, 2023
1 parent e70504a commit b8bc0f7
Show file tree
Hide file tree
Showing 6 changed files with 509 additions and 370 deletions.
41 changes: 11 additions & 30 deletions rust/src/operations/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::collections::HashMap;
use crate::action::Add;
use crate::storage::ObjectStoreRef;
use crate::writer::record_batch::{divide_by_partition_values, PartitionResult};
use crate::writer::stats::{apply_null_counts, create_add, NullCounts};
use crate::writer::stats::create_add;
use crate::writer::utils::{
arrow_schema_without_partitions, record_batch_without_partitions, PartitionPath,
ShareableBuffer,
Expand All @@ -16,7 +16,6 @@ use arrow::datatypes::SchemaRef as ArrowSchemaRef;
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use bytes::Bytes;
use log::warn;
use object_store::{path::Path, ObjectStore};
use parquet::arrow::ArrowWriter;
use parquet::basic::Compression;
Expand Down Expand Up @@ -269,7 +268,6 @@ pub(crate) struct PartitionWriter {
buffer: ShareableBuffer,
arrow_writer: ArrowWriter<ShareableBuffer>,
part_counter: usize,
null_counts: NullCounts,
files_written: Vec<Add>,
}

Expand All @@ -293,7 +291,6 @@ impl PartitionWriter {
buffer,
arrow_writer,
part_counter: 0,
null_counts: NullCounts::new(),
files_written: Vec::new(),
})
}
Expand All @@ -307,11 +304,8 @@ impl PartitionWriter {
self.config.prefix.child(file_name)
}

fn replace_arrow_buffer(
&mut self,
seed: impl AsRef<[u8]>,
) -> DeltaResult<(ArrowWriter<ShareableBuffer>, ShareableBuffer)> {
let new_buffer = ShareableBuffer::from_bytes(seed.as_ref());
fn reset_writer(&mut self) -> DeltaResult<(ArrowWriter<ShareableBuffer>, ShareableBuffer)> {
let new_buffer = ShareableBuffer::default();
let arrow_writer = ArrowWriter::try_new(
new_buffer.clone(),
self.config.file_schema.clone(),
Expand All @@ -324,40 +318,27 @@ impl PartitionWriter {
}

fn write_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
// copy current cursor bytes so we can recover from failures
// TODO is copying this something we should be doing?
let buffer_bytes = self.buffer.to_vec();
match self.arrow_writer.write(batch) {
Ok(_) => {
apply_null_counts(&batch.clone().into(), &mut self.null_counts, 0);
Ok(())
}
Err(err) => {
// if a write fails we need to reset the state of the PartitionWriter
warn!("error writing to arrow buffer, resetting writer state.");
self.replace_arrow_buffer(buffer_bytes)?;
Err(err.into())
}
}
Ok(self.arrow_writer.write(batch)?)
}

async fn flush_arrow_writer(&mut self) -> DeltaResult<()> {
// replace counter / buffers and close the current writer
let (writer, buffer) = self.replace_arrow_buffer(vec![])?;
let null_counts = std::mem::take(&mut self.null_counts);
let (writer, buffer) = self.reset_writer()?;
let metadata = writer.close()?;
let buffer = match buffer.into_inner() {
Some(buffer) => Bytes::from(buffer),
None => return Ok(()), // Nothing to write
};

// collect metadata
let path = self.next_data_path();
let obj_bytes = Bytes::from(buffer.to_vec());
let file_size = obj_bytes.len() as i64;
let file_size = buffer.len() as i64;

// write file to object store
self.object_store.put(&path, obj_bytes).await?;
self.object_store.put(&path, buffer).await?;
self.files_written.push(
create_add(
&self.config.partition_values,
null_counts,
path.to_string(),
file_size,
&metadata,
Expand Down
12 changes: 2 additions & 10 deletions rust/src/writer/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::Arc;

use super::stats::{apply_null_counts, create_add, NullCounts};
use super::stats::create_add;
use super::utils::{
arrow_schema_without_partitions, next_data_path, record_batch_from_message,
record_batch_without_partitions, stringified_partition_value,
Expand Down Expand Up @@ -42,7 +42,6 @@ pub(crate) struct DataArrowWriter {
buffer: ShareableBuffer,
arrow_writer: ArrowWriter<ShareableBuffer>,
partition_values: HashMap<String, Option<String>>,
null_counts: NullCounts,
buffered_record_batch_count: usize,
}

Expand Down Expand Up @@ -120,7 +119,6 @@ impl DataArrowWriter {
match result {
Ok(_) => {
self.buffered_record_batch_count += 1;
apply_null_counts(&record_batch.into(), &mut self.null_counts, 0);
Ok(())
}
// If a write fails we need to reset the state of the DeltaArrowWriter
Expand Down Expand Up @@ -152,7 +150,6 @@ impl DataArrowWriter {
)?;

let partition_values = HashMap::new();
let null_counts = NullCounts::new();
let buffered_record_batch_count = 0;

Ok(Self {
Expand All @@ -161,7 +158,6 @@ impl DataArrowWriter {
buffer,
arrow_writer,
partition_values,
null_counts,
buffered_record_batch_count,
})
}
Expand Down Expand Up @@ -363,19 +359,15 @@ impl DeltaWriter<Vec<Value>> for JsonWriter {
let writers = std::mem::take(&mut self.arrow_writers);
let mut actions = Vec::new();

for (_, mut writer) in writers {
for (_, writer) in writers {
let metadata = writer.arrow_writer.close()?;
let path = next_data_path(&self.partition_columns, &writer.partition_values, None)?;
let obj_bytes = Bytes::from(writer.buffer.to_vec());
let file_size = obj_bytes.len() as i64;
self.storage.put(&path, obj_bytes).await?;

// Replace self null_counts with an empty map. Use the other for stats.
let null_counts = std::mem::take(&mut writer.null_counts);

actions.push(create_add(
&writer.partition_values,
null_counts,
path.to_string(),
file_size,
&metadata,
Expand Down
16 changes: 11 additions & 5 deletions rust/src/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
use crate::action::{Action, Add, ColumnCountStat};
use crate::{DeltaTable, DeltaTableError};

use arrow::{datatypes::SchemaRef, datatypes::*, error::ArrowError};
use arrow::{datatypes::SchemaRef, error::ArrowError};
use async_trait::async_trait;
use object_store::Error as ObjectStoreError;
use parquet::{basic::LogicalType, errors::ParquetError};
use parquet::errors::ParquetError;
use serde_json::Value;

pub use json::JsonWriter;
Expand Down Expand Up @@ -55,9 +55,15 @@ pub(crate) enum DeltaWriterError {
},

/// Serialization of delta log statistics failed.
#[error("Serialization of delta log statistics failed: {source}")]
StatsSerializationFailed {
/// error raised during stats serialization.
#[error("Failed to write statistics value {debug_value} with logical type {logical_type:?}")]
StatsParsingFailed {
debug_value: String,
logical_type: Option<parquet::basic::LogicalType>,
},

/// JSON serialization failed
#[error("Failed to serialize data to JSON: {source}")]
JSONSerializationFailed {
#[from]
source: serde_json::Error,
},
Expand Down
49 changes: 21 additions & 28 deletions rust/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,32 @@
//! }))
//! }
//! ```
use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::Arc;

use arrow_array::{ArrayRef, RecordBatch, UInt32Array};
use arrow_ord::{partition::lexicographical_partition_ranges, sort::SortColumn};
use std::{collections::HashMap, sync::Arc};

use super::{
stats::create_add,
utils::{
arrow_schema_without_partitions, next_data_path, record_batch_without_partitions,
stringified_partition_value, PartitionPath,
},
DeltaWriter, DeltaWriterError,
};
use crate::builder::DeltaTableBuilder;
use crate::writer::utils::ShareableBuffer;
use crate::DeltaTableError;
use crate::{action::Add, storage::DeltaObjectStore, DeltaTable, DeltaTableMetaData, Schema};
use arrow::array::{Array, UInt32Array};
use arrow::compute::{lexicographical_partition_ranges, take, SortColumn};
use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use arrow_array::ArrayRef;
use arrow_row::{RowConverter, SortField};
use arrow_schema::{ArrowError, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use arrow_select::take::take;
use bytes::Bytes;
use object_store::ObjectStore;
use parquet::{arrow::ArrowWriter, errors::ParquetError};
use parquet::{basic::Compression, file::properties::WriterProperties};

use super::stats::{create_add, NullCounts};
use super::utils::{
arrow_schema_without_partitions, next_data_path, record_batch_without_partitions,
stringified_partition_value, PartitionPath,
};
use super::{DeltaTableError, DeltaWriter, DeltaWriterError};
use crate::builder::DeltaTableBuilder;
use crate::writer::{stats::apply_null_counts, utils::ShareableBuffer};
use crate::{action::Add, storage::DeltaObjectStore, DeltaTable, DeltaTableMetaData, Schema};

/// Writes messages to a delta lake table.
pub struct RecordBatchWriter {
storage: Arc<DeltaObjectStore>,
Expand Down Expand Up @@ -225,19 +226,15 @@ impl DeltaWriter<RecordBatch> for RecordBatchWriter {
let writers = std::mem::take(&mut self.arrow_writers);
let mut actions = Vec::new();

for (_, mut writer) in writers {
for (_, writer) in writers {
let metadata = writer.arrow_writer.close()?;
let path = next_data_path(&self.partition_columns, &writer.partition_values, None)?;
let obj_bytes = Bytes::from(writer.buffer.to_vec());
let file_size = obj_bytes.len() as i64;
self.storage.put(&path, obj_bytes).await?;

// Replace self null_counts with an empty map. Use the other for stats.
let null_counts = std::mem::take(&mut writer.null_counts);

actions.push(create_add(
&writer.partition_values,
null_counts,
path.to_string(),
file_size,
&metadata,
Expand All @@ -262,7 +259,6 @@ struct PartitionWriter {
pub(super) buffer: ShareableBuffer,
pub(super) arrow_writer: ArrowWriter<ShareableBuffer>,
pub(super) partition_values: HashMap<String, Option<String>>,
pub(super) null_counts: NullCounts,
pub(super) buffered_record_batch_count: usize,
}

Expand All @@ -279,7 +275,6 @@ impl PartitionWriter {
Some(writer_properties.clone()),
)?;

let null_counts = NullCounts::new();
let buffered_record_batch_count = 0;

Ok(Self {
Expand All @@ -288,7 +283,6 @@ impl PartitionWriter {
buffer,
arrow_writer,
partition_values,
null_counts,
buffered_record_batch_count,
})
}
Expand All @@ -310,7 +304,6 @@ impl PartitionWriter {
match self.arrow_writer.write(record_batch) {
Ok(_) => {
self.buffered_record_batch_count += 1;
apply_null_counts(&record_batch.clone().into(), &mut self.null_counts, 0);
Ok(())
}
// If a write fails we need to reset the state of the PartitionWriter
Expand Down
Loading

0 comments on commit b8bc0f7

Please sign in to comment.