From e34767dd1b137670cc031c2e9c8912171a377522 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Mon, 1 Apr 2024 09:37:26 +0200 Subject: [PATCH] fix: add snappy compression on checkpoint files (#2365) # Description Noticed that we didn't set any compression, this should bring down the file size a bit for users that have large checkpoints --- crates/core/src/protocol/checkpoints.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index e18729bcb8..7e93e99780 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -11,7 +11,9 @@ use futures::{StreamExt, TryStreamExt}; use lazy_static::lazy_static; use object_store::{Error, ObjectStore}; use parquet::arrow::ArrowWriter; +use parquet::basic::Compression; use parquet::errors::ParquetError; +use parquet::file::properties::WriterProperties; use regex::Regex; use serde_json::Value; use tracing::{debug, error}; @@ -333,7 +335,15 @@ fn parquet_bytes_from_state( debug!("Writing to checkpoint parquet buffer..."); // Write the Checkpoint parquet file. let mut bytes = vec![]; - let mut writer = ArrowWriter::try_new(&mut bytes, arrow_schema.clone(), None)?; + let mut writer = ArrowWriter::try_new( + &mut bytes, + arrow_schema.clone(), + Some( + WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(), + ), + )?; let mut decoder = ReaderBuilder::new(arrow_schema) .with_batch_size(CHECKPOINT_RECORD_BATCH_SIZE) .build_decoder()?;