diff --git a/Cargo.lock b/Cargo.lock index 6c76eaf94e..b8814c4dc8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -98,6 +98,7 @@ dependencies = [ "object_store 1.0.0", "parquet", "parquet_ext", + "pin-project-lite", "prometheus 0.12.0", "prost", "rand 0.7.3", @@ -4222,6 +4223,7 @@ dependencies = [ "parquet", "parquet-format", "thrift 0.13.0", + "tokio", ] [[package]] diff --git a/analytic_engine/src/compaction/scheduler.rs b/analytic_engine/src/compaction/scheduler.rs index a47f494d92..5e929e0b9f 100644 --- a/analytic_engine/src/compaction/scheduler.rs +++ b/analytic_engine/src/compaction/scheduler.rs @@ -40,6 +40,7 @@ use crate::{ instance::{ flush_compaction::TableFlushOptions, write_worker::CompactionNotifier, Instance, SpaceStore, }, + sst::factory::SstWriteOptions, table::data::TableDataRef, TableOptions, }; @@ -287,6 +288,7 @@ impl SchedulerImpl { space_store: Arc, runtime: Arc, config: SchedulerConfig, + write_sst_max_buffer_size: usize, ) -> Self { let (tx, rx) = mpsc::channel(config.schedule_channel_len); let running = Arc::new(AtomicBool::new(true)); @@ -300,6 +302,7 @@ impl SchedulerImpl { picker_manager: PickerManager::default(), max_ongoing_tasks: config.max_ongoing_tasks, max_unflushed_duration: config.max_unflushed_duration.0, + write_sst_max_buffer_size, limit: Arc::new(OngoingTaskLimit { ongoing_tasks: AtomicUsize::new(0), request_buf: RwLock::new(RequestQueue::default()), @@ -367,6 +370,7 @@ struct ScheduleWorker { max_unflushed_duration: Duration, picker_manager: PickerManager, max_ongoing_tasks: usize, + write_sst_max_buffer_size: usize, limit: Arc, running: Arc, memory_limit: MemoryLimit, @@ -462,13 +466,27 @@ impl ScheduleWorker { let sender = self.sender.clone(); let request_id = RequestId::next_id(); + let storage_format_hint = table_data.table_options().storage_format_hint; + let sst_write_options = SstWriteOptions { + storage_format_hint, + num_rows_per_row_group: table_data.table_options().num_rows_per_row_group, + compression: table_data.table_options().compression, + max_buffer_size: self.write_sst_max_buffer_size, + }; + // Do actual costly compact job in background. self.runtime.spawn(async move { // Release the token after compaction finished. let _token = token; let res = space_store - .compact_table(runtime, &table_data, request_id, &compaction_task) + .compact_table( + runtime, + &table_data, + request_id, + &compaction_task, + &sst_write_options, + ) .await; if let Err(e) = &res { diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index fe1c1f3889..a164991fcc 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -544,6 +544,7 @@ impl Instance { storage_format_hint: table_data.table_options().storage_format_hint, num_rows_per_row_group: table_data.table_options().num_rows_per_row_group, compression: table_data.table_options().compression, + max_buffer_size: self.write_sst_max_buffer_size, }; for time_range in &time_ranges { @@ -673,6 +674,7 @@ impl Instance { storage_format_hint, num_rows_per_row_group: table_data.table_options().num_rows_per_row_group, compression: table_data.table_options().compression, + max_buffer_size: self.write_sst_max_buffer_size, }; let mut writer = self .space_store @@ -728,6 +730,7 @@ impl SpaceStore { table_data: &TableData, request_id: RequestId, task: &CompactionTask, + sst_write_options: &SstWriteOptions, ) -> Result<()> { debug!( "Begin compact table, table_name:{}, id:{}, task:{:?}", @@ -765,6 +768,7 @@ impl SpaceStore { table_data, request_id, input, + sst_write_options, &mut edit_meta, ) .await?; @@ -795,6 +799,7 @@ impl SpaceStore { table_data: &TableData, request_id: RequestId, input: &CompactionInputFiles, + sst_write_options: &SstWriteOptions, edit_meta: &mut VersionEditMeta, ) -> Result<()> { debug!( @@ -903,18 +908,12 @@ impl SpaceStore { let file_id = table_data.alloc_file_id(); let sst_file_path = table_data.set_sst_file_path(file_id); - let storage_format_hint = table_data.table_options().storage_format_hint; - let sst_write_options = SstWriteOptions { - storage_format_hint, - num_rows_per_row_group: table_options.num_rows_per_row_group, - compression: table_options.compression, - }; let mut sst_writer = self .sst_factory - .create_writer(&sst_write_options, &sst_file_path, self.store_picker()) + .create_writer(sst_write_options, &sst_file_path, self.store_picker()) .await .context(CreateSstWriter { - storage_format_hint, + storage_format_hint: sst_write_options.storage_format_hint, })?; let sst_info = sst_writer diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index 7575ecd41f..d82550b0a2 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -170,6 +170,8 @@ pub struct Instance { pub(crate) space_write_buffer_size: usize, /// Replay wal batch size pub(crate) replay_batch_size: usize, + /// Write sst max buffer size + pub(crate) write_sst_max_buffer_size: usize, /// Options for scanning sst pub(crate) iter_options: IterOptions, pub(crate) remote_engine: Option, diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index e66fbfd77b..2f8a03a8bf 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -69,6 +69,7 @@ impl Instance { space_store.clone(), bg_runtime.clone(), scheduler_config, + ctx.config.write_sst_max_buffer_size.as_bytes() as usize, )); let file_purger = FilePurger::start(&bg_runtime, store_picker.default_store().clone()); @@ -92,6 +93,7 @@ impl Instance { db_write_buffer_size: ctx.config.db_write_buffer_size, space_write_buffer_size: ctx.config.space_write_buffer_size, replay_batch_size: ctx.config.replay_batch_size, + write_sst_max_buffer_size: ctx.config.write_sst_max_buffer_size.as_bytes() as usize, iter_options, remote_engine: remote_engine_ref, }); diff --git a/analytic_engine/src/lib.rs b/analytic_engine/src/lib.rs index 1d956e4b0d..2d41cde485 100644 --- a/analytic_engine/src/lib.rs +++ b/analytic_engine/src/lib.rs @@ -22,7 +22,7 @@ pub mod table_options; #[cfg(any(test, feature = "test"))] pub mod tests; -use common_util::config::ReadableDuration; +use common_util::config::{ReadableDuration, ReadableSize}; use manifest::details::Options as ManifestOptions; use message_queue::kafka::config::Config as KafkaConfig; use serde::{Deserialize, Serialize}; @@ -75,6 +75,8 @@ pub struct Config { pub scan_batch_size: usize, /// Sst background reading parallelism pub sst_background_read_parallelism: usize, + /// Max buffer size for writing sst + pub write_sst_max_buffer_size: ReadableSize, /// Wal storage config /// @@ -108,6 +110,7 @@ impl Default for Config { db_write_buffer_size: 0, scan_batch_size: 500, sst_background_read_parallelism: 8, + write_sst_max_buffer_size: ReadableSize::mb(10), wal: WalStorageConfig::RocksDB(Box::default()), remote_engine_client: remote_engine_client::config::Config::default(), } diff --git a/analytic_engine/src/sst/factory.rs b/analytic_engine/src/sst/factory.rs index a58abc8e5f..7872e4cf69 100644 --- a/analytic_engine/src/sst/factory.rs +++ b/analytic_engine/src/sst/factory.rs @@ -116,6 +116,7 @@ pub struct SstWriteOptions { pub storage_format_hint: StorageFormatHint, pub num_rows_per_row_group: usize, pub compression: Compression, + pub max_buffer_size: usize, } #[derive(Debug, Default)] diff --git a/analytic_engine/src/sst/meta_data/cache.rs b/analytic_engine/src/sst/meta_data/cache.rs index ea773334a2..296c4e2476 100644 --- a/analytic_engine/src/sst/meta_data/cache.rs +++ b/analytic_engine/src/sst/meta_data/cache.rs @@ -7,12 +7,10 @@ use std::{ use lru::LruCache; use parquet::file::metadata::FileMetaData; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::{OptionExt, ResultExt}; use crate::sst::{ - meta_data::{ - DecodeCustomMetaData, EmptyCustomMetaData, KvMetaDataNotFound, ParquetMetaDataRef, Result, - }, + meta_data::{DecodeCustomMetaData, KvMetaDataNotFound, ParquetMetaDataRef, Result}, parquet::encoding, }; @@ -41,11 +39,14 @@ impl MetaData { let kv_metas = file_meta_data .key_value_metadata() .context(KvMetaDataNotFound)?; - ensure!(!kv_metas.is_empty(), EmptyCustomMetaData); + let kv_meta = kv_metas + .iter() + .find(|kv| kv.key == encoding::META_KEY) + .context(KvMetaDataNotFound)?; let custom = { let mut sst_meta = - encoding::decode_sst_meta_data(&kv_metas[0]).context(DecodeCustomMetaData)?; + encoding::decode_sst_meta_data(kv_meta).context(DecodeCustomMetaData)?; if ignore_sst_filter { sst_meta.parquet_filter = None; } diff --git a/analytic_engine/src/sst/parquet/encoding.rs b/analytic_engine/src/sst/parquet/encoding.rs index c199faa603..236dc51d42 100644 --- a/analytic_engine/src/sst/parquet/encoding.rs +++ b/analytic_engine/src/sst/parquet/encoding.rs @@ -1,6 +1,6 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -use std::{convert::TryFrom, sync::Arc}; +use std::{convert::TryFrom, mem, sync::Arc}; use arrow::{ array::{make_array, Array, ArrayData, ArrayRef}, @@ -9,11 +9,12 @@ use arrow::{ record_batch::RecordBatch as ArrowRecordBatch, util::bit_util, }; +use async_trait::async_trait; use ceresdbproto::sst as sst_pb; use common_types::{ bytes::{BytesMut, SafeBufMut}, datum::DatumKind, - schema::{ArrowSchema, ArrowSchemaRef, DataType, Field}, + schema::{ArrowSchema, ArrowSchemaRef, DataType, Field, Schema}, }; use common_util::{ define_result, @@ -21,12 +22,13 @@ use common_util::{ }; use log::trace; use parquet::{ - arrow::ArrowWriter, basic::Compression, file::{metadata::KeyValue, properties::WriterProperties}, }; +use parquet_ext::async_arrow_writer::AsyncArrowWriter; use prost::Message; use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; +use tokio::io::AsyncWrite; use crate::sst::parquet::{ hybrid::{self, IndexedType}, @@ -215,40 +217,48 @@ pub fn decode_sst_meta_data(kv: &KeyValue) -> Result { /// RecordEncoder is used for encoding ArrowBatch. /// /// TODO: allow pre-allocate buffer +#[async_trait] trait RecordEncoder { /// Encode vector of arrow batch, return encoded row number - fn encode(&mut self, arrow_record_batch_vec: Vec) -> Result; + async fn encode(&mut self, record_batches: Vec) -> Result; + + fn set_meta_data(&mut self, meta_data: ParquetMetaData) -> Result<()>; /// Return encoded bytes /// Note: trait method cannot receive `self`, so take a &mut self here to /// indicate this encoder is already consumed - fn close(&mut self) -> Result>; + async fn close(&mut self) -> Result<()>; } -struct ColumnarRecordEncoder { +struct ColumnarRecordEncoder { // wrap in Option so ownership can be taken out behind `&mut self` - arrow_writer: Option>>, + arrow_writer: Option>, arrow_schema: ArrowSchemaRef, } -impl ColumnarRecordEncoder { +impl ColumnarRecordEncoder { fn try_new( + sink: W, + schema: &Schema, num_rows_per_row_group: usize, + max_buffer_size: usize, compression: Compression, - meta_data: ParquetMetaData, ) -> Result { - let arrow_schema = meta_data.schema.to_arrow_schema_ref(); + let arrow_schema = schema.to_arrow_schema_ref(); let write_props = WriterProperties::builder() - .set_key_value_metadata(Some(vec![encode_sst_meta_data(meta_data)?])) .set_max_row_group_size(num_rows_per_row_group) .set_compression(compression) .build(); - let arrow_writer = - ArrowWriter::try_new(Vec::new(), arrow_schema.clone(), Some(write_props)) - .box_err() - .context(EncodeRecordBatch)?; + let arrow_writer = AsyncArrowWriter::try_new( + sink, + arrow_schema.clone(), + max_buffer_size, + Some(write_props), + ) + .box_err() + .context(EncodeRecordBatch)?; Ok(Self { arrow_writer: Some(arrow_writer), @@ -257,8 +267,9 @@ impl ColumnarRecordEncoder { } } -impl RecordEncoder for ColumnarRecordEncoder { - fn encode(&mut self, arrow_record_batch_vec: Vec) -> Result { +#[async_trait] +impl RecordEncoder for ColumnarRecordEncoder { + async fn encode(&mut self, arrow_record_batch_vec: Vec) -> Result { assert!(self.arrow_writer.is_some()); let record_batch = compute::concat_batches(&self.arrow_schema, &arrow_record_batch_vec) @@ -269,62 +280,78 @@ impl RecordEncoder for ColumnarRecordEncoder { .as_mut() .unwrap() .write(&record_batch) + .await .box_err() .context(EncodeRecordBatch)?; Ok(record_batch.num_rows()) } - fn close(&mut self) -> Result> { + fn set_meta_data(&mut self, meta_data: ParquetMetaData) -> Result<()> { + let key_value = encode_sst_meta_data(meta_data)?; + self.arrow_writer + .as_mut() + .unwrap() + .append_key_value_metadata(key_value); + + Ok(()) + } + + async fn close(&mut self) -> Result<()> { assert!(self.arrow_writer.is_some()); let arrow_writer = self.arrow_writer.take().unwrap(); - let bytes = arrow_writer - .into_inner() + arrow_writer + .close() + .await .box_err() .context(EncodeRecordBatch)?; - Ok(bytes) + Ok(()) } } -struct HybridRecordEncoder { +struct HybridRecordEncoder { // wrap in Option so ownership can be taken out behind `&mut self` - arrow_writer: Option>>, + arrow_writer: Option>, arrow_schema: ArrowSchemaRef, tsid_type: IndexedType, non_collapsible_col_types: Vec, // columns that can be collapsed into list collapsible_col_types: Vec, + collapsible_col_idx: Vec, } -impl HybridRecordEncoder { +impl HybridRecordEncoder { fn try_new( + sink: W, + schema: &Schema, num_rows_per_row_group: usize, + max_buffer_size: usize, compression: Compression, - mut meta_data: ParquetMetaData, ) -> Result { // TODO: What we really want here is a unique ID, tsid is one case // Maybe support other cases later. - let tsid_idx = meta_data.schema.index_of_tsid().context(TsidRequired)?; + let tsid_idx = schema.index_of_tsid().context(TsidRequired)?; let tsid_type = IndexedType { idx: tsid_idx, - data_type: meta_data.schema.column(tsid_idx).data_type, + data_type: schema.column(tsid_idx).data_type, }; let mut non_collapsible_col_types = Vec::new(); let mut collapsible_col_types = Vec::new(); - for (idx, col) in meta_data.schema.columns().iter().enumerate() { + let mut collapsible_col_idx = Vec::new(); + for (idx, col) in schema.columns().iter().enumerate() { if idx == tsid_idx { continue; } - if meta_data.schema.is_collapsible_column(idx) { + if schema.is_collapsible_column(idx) { collapsible_col_types.push(IndexedType { idx, - data_type: meta_data.schema.column(idx).data_type, + data_type: schema.column(idx).data_type, }); - meta_data.collapsible_cols_idx.push(idx as u32); + collapsible_col_idx.push(idx as u32); } else { // TODO: support non-string key columns ensure!( @@ -340,30 +367,35 @@ impl HybridRecordEncoder { } } - let arrow_schema = hybrid::build_hybrid_arrow_schema(&meta_data.schema); + let arrow_schema = hybrid::build_hybrid_arrow_schema(schema); let write_props = WriterProperties::builder() - .set_key_value_metadata(Some(vec![encode_sst_meta_data(meta_data)?])) .set_max_row_group_size(num_rows_per_row_group) .set_compression(compression) .build(); - let arrow_writer = - ArrowWriter::try_new(Vec::new(), arrow_schema.clone(), Some(write_props)) - .box_err() - .context(EncodeRecordBatch)?; + let arrow_writer = AsyncArrowWriter::try_new( + sink, + arrow_schema.clone(), + max_buffer_size, + Some(write_props), + ) + .box_err() + .context(EncodeRecordBatch)?; Ok(Self { arrow_writer: Some(arrow_writer), arrow_schema, tsid_type, non_collapsible_col_types, collapsible_col_types, + collapsible_col_idx, }) } } -impl RecordEncoder for HybridRecordEncoder { - fn encode(&mut self, arrow_record_batch_vec: Vec) -> Result { +#[async_trait] +impl RecordEncoder for HybridRecordEncoder { + async fn encode(&mut self, arrow_record_batch_vec: Vec) -> Result { assert!(self.arrow_writer.is_some()); let record_batch = hybrid::convert_to_hybrid_record( @@ -380,21 +412,35 @@ impl RecordEncoder for HybridRecordEncoder { .as_mut() .unwrap() .write(&record_batch) + .await .box_err() .context(EncodeRecordBatch)?; Ok(record_batch.num_rows()) } - fn close(&mut self) -> Result> { + fn set_meta_data(&mut self, mut meta_data: ParquetMetaData) -> Result<()> { + meta_data.collapsible_cols_idx = mem::take(&mut self.collapsible_col_idx); + let key_value = encode_sst_meta_data(meta_data)?; + self.arrow_writer + .as_mut() + .unwrap() + .append_key_value_metadata(key_value); + + Ok(()) + } + + async fn close(&mut self) -> Result<()> { assert!(self.arrow_writer.is_some()); let arrow_writer = self.arrow_writer.take().unwrap(); - let bytes = arrow_writer - .into_inner() + arrow_writer + .close() + .await .box_err() .context(EncodeRecordBatch)?; - Ok(bytes) + + Ok(()) } } @@ -403,23 +449,29 @@ pub struct ParquetEncoder { } impl ParquetEncoder { - pub fn try_new( + pub fn try_new( + sink: W, + schema: &Schema, hybrid_encoding: bool, num_rows_per_row_group: usize, + max_buffer_size: usize, compression: Compression, - meta_data: ParquetMetaData, ) -> Result { let record_encoder: Box = if hybrid_encoding { Box::new(HybridRecordEncoder::try_new( + sink, + schema, num_rows_per_row_group, + max_buffer_size, compression, - meta_data, )?) } else { Box::new(ColumnarRecordEncoder::try_new( + sink, + schema, num_rows_per_row_group, + max_buffer_size, compression, - meta_data, )?) }; @@ -428,19 +480,23 @@ impl ParquetEncoder { /// Encode the record batch with [ArrowWriter] and the encoded contents is /// written to the buffer. - pub fn encode_record_batch( + pub async fn encode_record_batches( &mut self, - arrow_record_batch_vec: Vec, + arrow_record_batches: Vec, ) -> Result { - if arrow_record_batch_vec.is_empty() { + if arrow_record_batches.is_empty() { return Ok(0); } - self.record_encoder.encode(arrow_record_batch_vec) + self.record_encoder.encode(arrow_record_batches).await } - pub fn close(mut self) -> Result> { - self.record_encoder.close() + pub fn set_meta_data(&mut self, meta_data: ParquetMetaData) -> Result<()> { + self.record_encoder.set_meta_data(meta_data) + } + + pub async fn close(mut self) -> Result<()> { + self.record_encoder.close().await } } @@ -699,6 +755,8 @@ impl ParquetDecoder { #[cfg(test)] mod tests { + use std::{pin::Pin, sync::Mutex, task::Poll}; + use arrow::array::{Int32Array, StringArray, TimestampMillisecondArray, UInt64Array}; use common_types::{ bytes::Bytes, @@ -707,6 +765,7 @@ mod tests { time::{TimeRange, Timestamp}, }; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use pin_project_lite::pin_project; use super::*; @@ -852,11 +911,60 @@ mod tests { } } - #[test] - fn hybrid_record_encode_and_decode() { + pin_project! { + struct CopiedBuffer { + #[pin] + buffer: Vec, + copied_buffer: Arc>>, + } + } + + impl CopiedBuffer { + fn new(buffer: Vec) -> Self { + Self { + buffer, + copied_buffer: Arc::new(Mutex::new(Vec::new())), + } + } + + fn copied_buffer(&self) -> Arc>> { + self.copied_buffer.clone() + } + } + + impl AsyncWrite for CopiedBuffer { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> Poll> { + self.copied_buffer.lock().unwrap().extend_from_slice(buf); + let buffer = self.project().buffer; + buffer.poll_write(cx, buf) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let buffer = self.project().buffer; + buffer.poll_flush(cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let buffer = self.project().buffer; + buffer.poll_shutdown(cx) + } + } + + #[tokio::test] + async fn hybrid_record_encode_and_decode() { let schema = build_schema(); - let mut meta_data = ParquetMetaData { + let meta_data = ParquetMetaData { min_key: Bytes::from_static(b"100"), max_key: Bytes::from_static(b"200"), time_range: TimeRange::new_unchecked(Timestamp::new(100), Timestamp::new(101)), @@ -865,8 +973,19 @@ mod tests { parquet_filter: Default::default(), collapsible_cols_idx: Vec::new(), }; - let mut encoder = - HybridRecordEncoder::try_new(100, Compression::ZSTD, meta_data.clone()).unwrap(); + let copied_buffer = CopiedBuffer::new(Vec::new()); + let copied_encoded_buffer = copied_buffer.copied_buffer(); + let mut encoder = HybridRecordEncoder::try_new( + copied_buffer, + &meta_data.schema, + 100, + 0, + Compression::ZSTD, + ) + .unwrap(); + encoder + .set_meta_data(meta_data.clone()) + .expect("Failed to set meta data"); let columns = vec![ Arc::new(UInt64Array::from(vec![1, 1, 2])) as ArrayRef, @@ -911,20 +1030,24 @@ mod tests { ArrowRecordBatch::try_new(schema.to_arrow_schema_ref(), columns2).unwrap(); let row_nums = encoder .encode(vec![input_record_batch, input_record_batch2]) + .await .unwrap(); assert_eq!(2, row_nums); // read encoded records back, and then compare with input records - let encoded_bytes = encoder.close().unwrap(); + encoder.close().await.unwrap(); + + let encoded_bytes = copied_encoded_buffer.lock().unwrap().clone(); let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(encoded_bytes)) .unwrap() .build() .unwrap(); let hybrid_record_batch = reader.next().unwrap().unwrap(); - collect_collapsible_cols_idx(&meta_data.schema, &mut meta_data.collapsible_cols_idx); + let mut collapsible_cols_idx = Vec::new(); + collect_collapsible_cols_idx(&meta_data.schema, &mut collapsible_cols_idx); let decoder = HybridRecordDecoder { - collapsible_cols_idx: meta_data.collapsible_cols_idx.clone(), + collapsible_cols_idx, }; let decoded_record_batch = decoder.decode(hybrid_record_batch).unwrap(); diff --git a/analytic_engine/src/sst/parquet/meta_data.rs b/analytic_engine/src/sst/parquet/meta_data.rs index 65ea5204a6..581bba7af4 100644 --- a/analytic_engine/src/sst/parquet/meta_data.rs +++ b/analytic_engine/src/sst/parquet/meta_data.rs @@ -198,8 +198,8 @@ pub struct ParquetFilter { } impl ParquetFilter { - pub fn new(row_group_filters: Vec) -> Self { - Self { row_group_filters } + pub fn push_row_group_filter(&mut self, row_group_filter: RowGroupFilter) { + self.row_group_filters.push(row_group_filter); } pub fn len(&self) -> usize { diff --git a/analytic_engine/src/sst/parquet/writer.rs b/analytic_engine/src/sst/parquet/writer.rs index 6b7af75886..ce2359be00 100644 --- a/analytic_engine/src/sst/parquet/writer.rs +++ b/analytic_engine/src/sst/parquet/writer.rs @@ -2,19 +2,15 @@ //! Sst writer implementation based on parquet. -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; - use async_trait::async_trait; use common_types::{record_batch::RecordBatchWithKey, request_id::RequestId}; use common_util::error::BoxError; use datafusion::parquet::basic::Compression; use futures::StreamExt; -use log::debug; +use log::{debug, error}; use object_store::{ObjectStoreRef, Path}; use snafu::ResultExt; +use tokio::io::AsyncWrite; use crate::{ sst::{ @@ -41,6 +37,7 @@ pub struct ParquetSstWriter<'a> { store: &'a ObjectStoreRef, /// Max row group size. num_rows_per_row_group: usize, + max_buffer_size: usize, compression: Compression, } @@ -58,43 +55,28 @@ impl<'a> ParquetSstWriter<'a> { store, num_rows_per_row_group: options.num_rows_per_row_group, compression: options.compression.into(), + max_buffer_size: options.max_buffer_size, } } } -/// RecordBytesReader provides AsyncRead implementation for the encoded records -/// by parquet. -struct RecordBytesReader { +/// The writer will reorganize the record batches into row groups, and then +/// encode them to parquet file. +struct RecordBatchGroupWriter { request_id: RequestId, hybrid_encoding: bool, + input: RecordBatchStream, meta_data: MetaData, - record_stream: RecordBatchStream, num_rows_per_row_group: usize, + max_buffer_size: usize, compression: Compression, - total_row_num: Arc, - // Record batch partitioned by exactly given `num_rows_per_row_group` - // There may be more than one `RecordBatchWithKey` inside each partition - partitioned_record_batch: Vec>, + /// The filter for the parquet file, and it will be updated during + /// generating the parquet file. + parquet_filter: ParquetFilter, } -impl RecordBytesReader { - // Partition record batch stream into batch vector with exactly given - // `num_rows_per_row_group` - async fn partition_record_batch(&mut self) -> Result<()> { - let mut prev_record_batch: Option = None; - - loop { - let row_group = self.fetch_next_row_group(&mut prev_record_batch).await?; - if row_group.is_empty() { - break; - } - self.partitioned_record_batch.push(row_group); - } - - Ok(()) - } - - /// Fetch an integral row group from the `self.record_stream`. +impl RecordBatchGroupWriter { + /// Fetch an integral row group from the `self.input`. /// /// Except the last one, every row group is ensured to contains exactly /// `self.num_rows_per_row_group`. As for the last one, it will cover all @@ -129,7 +111,7 @@ impl RecordBytesReader { } // Previous record batch has been exhausted, and let's fetch next record batch. - match self.record_stream.next().await { + match self.input.next().await { Some(v) => { let v = v.context(PollRecordBatch)?; debug_assert!( @@ -149,75 +131,126 @@ impl RecordBytesReader { Ok(curr_row_group) } - fn build_parquet_filter(&self) -> Result { + /// Build the parquet filter for the given `row_group`, and then update it + /// into `self.parquet_filter`. + fn update_parquet_filter(&mut self, row_group_batch: &[RecordBatchWithKey]) -> Result<()> { // TODO: support filter in hybrid storage format [#435](https://github.com/CeresDB/ceresdb/issues/435) if self.hybrid_encoding { - return Ok(ParquetFilter::default()); + return Ok(()); } - let filters = self - .partitioned_record_batch - .iter() - .map(|row_group_batch| { - let mut builder = - RowGroupFilterBuilder::with_num_columns(row_group_batch[0].num_columns()); - - for partial_batch in row_group_batch { - for (col_idx, column) in partial_batch.columns().iter().enumerate() { - for row in 0..column.num_rows() { - let datum = column.datum(row); - let bytes = datum.to_bytes(); - builder.add_key(col_idx, &bytes); - } + + let row_group_filter = { + let mut builder = + RowGroupFilterBuilder::with_num_columns(row_group_batch[0].num_columns()); + + for partial_batch in row_group_batch { + for (col_idx, column) in partial_batch.columns().iter().enumerate() { + for row in 0..column.num_rows() { + let datum = column.datum(row); + let bytes = datum.to_bytes(); + builder.add_key(col_idx, &bytes); } } + } - builder.build().box_err().context(BuildParquetFilter) - }) - .collect::>>()?; + builder.build().box_err().context(BuildParquetFilter)? + }; - Ok(ParquetFilter::new(filters)) + self.parquet_filter.push_row_group_filter(row_group_filter); + Ok(()) } - async fn read_all(mut self) -> Result> { - self.partition_record_batch().await?; - - let parquet_meta_data = { - let sst_filter = self.build_parquet_filter()?; - let mut parquet_meta_data = ParquetMetaData::from(self.meta_data); - parquet_meta_data.parquet_filter = Some(sst_filter); - parquet_meta_data - }; + async fn write_all(mut self, sink: W) -> Result { + let mut prev_record_batch: Option = None; + let mut arrow_row_group = Vec::new(); + let mut total_num_rows = 0; let mut parquet_encoder = ParquetEncoder::try_new( + sink, + &self.meta_data.schema, self.hybrid_encoding, self.num_rows_per_row_group, + self.max_buffer_size, self.compression, - parquet_meta_data, ) .box_err() .context(EncodeRecordBatch)?; - // process record batch stream - let mut arrow_record_batch_vec = Vec::new(); - for record_batches in self.partitioned_record_batch { - for batch in record_batches { - arrow_record_batch_vec.push(batch.into_record_batch().into_arrow_record_batch()); + loop { + let row_group = self.fetch_next_row_group(&mut prev_record_batch).await?; + if row_group.is_empty() { + break; } - let buf_len = arrow_record_batch_vec.len(); - let row_num = parquet_encoder - .encode_record_batch(arrow_record_batch_vec) + self.update_parquet_filter(&row_group)?; + + let num_batches = row_group.len(); + for record_batch in row_group { + arrow_row_group.push(record_batch.into_record_batch().into_arrow_record_batch()); + } + let num_rows = parquet_encoder + .encode_record_batches(arrow_row_group) + .await .box_err() .context(EncodeRecordBatch)?; - self.total_row_num.fetch_add(row_num, Ordering::Relaxed); - arrow_record_batch_vec = Vec::with_capacity(buf_len); + + // TODO: it will be better to use `arrow_row_group.clear()` to reuse the + // allocated memory. + arrow_row_group = Vec::with_capacity(num_batches); + total_num_rows += num_rows; } - let bytes = parquet_encoder + let parquet_meta_data = { + let mut parquet_meta_data = ParquetMetaData::from(self.meta_data); + parquet_meta_data.parquet_filter = Some(self.parquet_filter); + parquet_meta_data + }; + parquet_encoder + .set_meta_data(parquet_meta_data) + .box_err() + .context(EncodeRecordBatch)?; + + parquet_encoder .close() + .await .box_err() .context(EncodeRecordBatch)?; - Ok(bytes) + + Ok(total_num_rows) + } +} + +struct ObjectStoreMultiUploadAborter<'a> { + location: &'a Path, + session_id: String, + object_store: &'a ObjectStoreRef, +} + +impl<'a> ObjectStoreMultiUploadAborter<'a> { + async fn initialize_upload( + object_store: &'a ObjectStoreRef, + location: &'a Path, + ) -> Result<( + ObjectStoreMultiUploadAborter<'a>, + Box, + )> { + let (session_id, upload_writer) = object_store + .put_multipart(location) + .await + .context(Storage)?; + let aborter = Self { + location, + session_id, + object_store, + }; + Ok((aborter, upload_writer)) + } + + async fn abort(self) -> Result<()> { + self.object_store + .abort_multipart(self.location, &self.session_id) + .await + .context(Storage) } } @@ -227,32 +260,42 @@ impl<'a> SstWriter for ParquetSstWriter<'a> { &mut self, request_id: RequestId, meta: &MetaData, - record_stream: RecordBatchStream, + input: RecordBatchStream, ) -> writer::Result { debug!( "Build parquet file, request_id:{}, meta:{:?}, num_rows_per_row_group:{}", request_id, meta, self.num_rows_per_row_group ); - let total_row_num = Arc::new(AtomicUsize::new(0)); - let reader = RecordBytesReader { + let group_writer = RecordBatchGroupWriter { hybrid_encoding: self.hybrid_encoding, request_id, - record_stream, + input, num_rows_per_row_group: self.num_rows_per_row_group, + max_buffer_size: self.max_buffer_size, compression: self.compression, - total_row_num: total_row_num.clone(), meta_data: meta.clone(), - partitioned_record_batch: Default::default(), + parquet_filter: ParquetFilter::default(), }; - let bytes = reader.read_all().await?; - self.store - .put(self.path, bytes.into()) - .await - .context(Storage)?; - let file_head = self.store.head(self.path).await.context(Storage)?; + let (aborter, sink) = + ObjectStoreMultiUploadAborter::initialize_upload(self.store, self.path).await?; + let total_num_rows = match group_writer.write_all(sink).await { + Ok(v) => v, + Err(e) => { + if let Err(e) = aborter.abort().await { + // The uploading file will be leaked if failed to abort. A repair command will + // be provided to clean up the leaked files. + error!( + "Failed to abort multi-upload for sst:{}, err:{}", + self.path, e + ); + } + return Err(e); + } + }; + let file_head = self.store.head(self.path).await.context(Storage)?; let storage_format = if self.hybrid_encoding { StorageFormat::Hybrid } else { @@ -260,7 +303,7 @@ impl<'a> SstWriter for ParquetSstWriter<'a> { }; Ok(SstInfo { file_size: file_head.size, - row_num: total_row_num.load(Ordering::Relaxed), + row_num: total_num_rows, storage_format, }) } @@ -269,7 +312,7 @@ impl<'a> SstWriter for ParquetSstWriter<'a> { #[cfg(test)] mod tests { - use std::task::Poll; + use std::{sync::Arc, task::Poll}; use common_types::{ bytes::Bytes, @@ -320,6 +363,7 @@ mod tests { storage_format_hint: StorageFormatHint::Auto, num_rows_per_row_group, compression: table_options::Compression::Uncompressed, + max_buffer_size: 0, }; let dir = tempdir().unwrap(); @@ -425,7 +469,7 @@ mod tests { } #[tokio::test] - async fn test_partition_record_batch() { + async fn test_fetch_row_group() { // rows per group: 10 let testcases = vec![ // input, expected @@ -443,25 +487,25 @@ mod tests { ]; for (num_rows_per_group, input, expected) in testcases { - test_partition_record_batch_inner(num_rows_per_group, input, expected).await; + check_num_rows_of_row_group(num_rows_per_group, input, expected).await; } } - async fn test_partition_record_batch_inner( + async fn check_num_rows_of_row_group( num_rows_per_row_group: usize, - input_row_nums: Vec, - expected_row_nums: Vec, + input_num_rows: Vec, + expected_num_rows: Vec, ) { init_log_for_test(); let schema = build_schema(); let mut poll_cnt = 0; let schema_clone = schema.clone(); let record_batch_stream = Box::new(stream::poll_fn(move |_ctx| -> Poll> { - if poll_cnt == input_row_nums.len() { + if poll_cnt == input_num_rows.len() { return Poll::Ready(None); } - let rows = (0..input_row_nums[poll_cnt]) + let rows = (0..input_num_rows[poll_cnt]) .map(|_| build_row(b"a", 100, 10.0, "v4", 1000, 1_000_000)) .collect::>(); @@ -471,10 +515,10 @@ mod tests { Poll::Ready(Some(Ok(batch))) })); - let mut reader = RecordBytesReader { + let mut group_writer = RecordBatchGroupWriter { request_id: RequestId::next_id(), hybrid_encoding: false, - record_stream: record_batch_stream, + input: record_batch_stream, num_rows_per_row_group, compression: Compression::UNCOMPRESSED, meta_data: MetaData { @@ -484,18 +528,19 @@ mod tests { max_sequence: 1, schema, }, - total_row_num: Arc::new(AtomicUsize::new(0)), - partitioned_record_batch: Vec::new(), + max_buffer_size: 0, + parquet_filter: ParquetFilter::default(), }; - reader.partition_record_batch().await.unwrap(); + let mut prev_record_batch = None; + for expect_num_row in expected_num_rows { + let batch = group_writer + .fetch_next_row_group(&mut prev_record_batch) + .await + .unwrap(); - for (i, expected_row_num) in expected_row_nums.into_iter().enumerate() { - let actual: usize = reader.partitioned_record_batch[i] - .iter() - .map(|b| b.num_rows()) - .sum(); - assert_eq!(expected_row_num, actual); + let actual_num_row: usize = batch.iter().map(|b| b.num_rows()).sum(); + assert_eq!(expect_num_row, actual_num_row); } } } diff --git a/analytic_engine/src/sst/writer.rs b/analytic_engine/src/sst/writer.rs index 87a78e30b2..5f436bea5e 100644 --- a/analytic_engine/src/sst/writer.rs +++ b/analytic_engine/src/sst/writer.rs @@ -22,9 +22,14 @@ pub mod error { #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Failed to perform storage operation, err:{}", source))] + #[snafu(display( + "Failed to perform storage operation, err:{}.\nBacktrace:\n{}", + source, + backtrace + ))] Storage { source: object_store::ObjectStoreError, + backtrace: Backtrace, }, #[snafu(display("Failed to encode meta data, err:{}", source))] diff --git a/benchmarks/src/sst_tools.rs b/benchmarks/src/sst_tools.rs index 13d85459d8..13a86bab38 100644 --- a/benchmarks/src/sst_tools.rs +++ b/benchmarks/src/sst_tools.rs @@ -51,6 +51,7 @@ async fn create_sst_from_stream(config: SstConfig, record_batch_stream: RecordBa storage_format_hint: StorageFormatHint::Auto, num_rows_per_row_group: config.num_rows_per_row_group, compression: config.compression, + max_buffer_size: 1024 * 1024 * 10, }; info!( diff --git a/components/parquet_ext/src/async_arrow_writer.rs b/components/parquet_ext/src/async_arrow_writer.rs new file mode 100644 index 0000000000..4a2a616470 --- /dev/null +++ b/components/parquet_ext/src/async_arrow_writer.rs @@ -0,0 +1,122 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Async arrow writer for parquet file. + +use std::{ + io::Write, + sync::{Arc, Mutex}, +}; + +use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; +use parquet::{ + arrow::ArrowWriter, + errors::{ParquetError, Result}, + file::properties::WriterProperties, + format::{FileMetaData, KeyValue}, +}; +use tokio::io::{AsyncWrite, AsyncWriteExt}; + +#[derive(Clone, Default)] +pub struct SharedBuffer { + /// The buffer to store the data to be written. + /// + /// The lock is used to obtain internal mutability, so no worry about the + /// lock contention. + buffer: Arc>>, +} + +impl Write for SharedBuffer { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let mut buffer = self.buffer.lock().unwrap(); + Write::write(&mut *buffer, buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + let mut buffer = self.buffer.lock().unwrap(); + Write::flush(&mut *buffer) + } +} + +/// Async arrow writer for parquet file. +/// +/// A shared buffer is provided to the sync writer [ArrowWriter] and it will +/// accept the data from the sync writer and flush the received data to the +/// async writer when the buffer size exceeds the threshold. +pub struct AsyncArrowWriter { + sync_writer: ArrowWriter, + async_writer: W, + shared_buffer: SharedBuffer, + max_buffer_size: usize, +} + +impl AsyncArrowWriter { + pub fn try_new( + writer: W, + arrow_schema: SchemaRef, + max_buffer_size: usize, + props: Option, + ) -> Result { + let shared_buffer = SharedBuffer::default(); + let sync_writer = ArrowWriter::try_new(shared_buffer.clone(), arrow_schema, props)?; + + Ok(Self { + sync_writer, + async_writer: writer, + shared_buffer, + max_buffer_size, + }) + } + + pub async fn write(&mut self, batch: &RecordBatch) -> Result<()> { + self.sync_writer.write(batch)?; + Self::flush( + &self.shared_buffer, + &mut self.async_writer, + self.max_buffer_size, + ) + .await + } + + pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) { + self.sync_writer.append_key_value_metadata(kv_metadata); + } + + pub async fn close(mut self) -> Result { + let metadata = self.sync_writer.close()?; + + // flush the remaining data. + Self::flush(&self.shared_buffer, &mut self.async_writer, 0).await?; + self.async_writer + .shutdown() + .await + .map_err(|e| ParquetError::External(Box::new(e)))?; + + Ok(metadata) + } + + async fn flush( + shared_buffer: &SharedBuffer, + async_writer: &mut W, + threshold: usize, + ) -> Result<()> { + let mut buffer = { + let mut buffer = shared_buffer.buffer.lock().unwrap(); + + if buffer.is_empty() || buffer.len() < threshold { + // no need to flush + return Ok(()); + } + std::mem::take(&mut *buffer) + }; + + async_writer + .write(&buffer) + .await + .map_err(|e| ParquetError::External(Box::new(e)))?; + + // reuse the buffer. + buffer.clear(); + *shared_buffer.buffer.lock().unwrap() = buffer; + Ok(()) + } +} diff --git a/components/parquet_ext/src/lib.rs b/components/parquet_ext/src/lib.rs index 2cb355ea5c..2d61dd4d14 100644 --- a/components/parquet_ext/src/lib.rs +++ b/components/parquet_ext/src/lib.rs @@ -1,5 +1,6 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +pub mod async_arrow_writer; pub mod meta_data; pub mod prune; pub mod reverse_reader; diff --git a/tools/src/bin/sst-convert.rs b/tools/src/bin/sst-convert.rs index e1009e48b4..730116d029 100644 --- a/tools/src/bin/sst-convert.rs +++ b/tools/src/bin/sst-convert.rs @@ -104,6 +104,7 @@ async fn run(args: Args, runtime: Arc) -> Result<()> { num_rows_per_row_group: args.batch_size, compression: Compression::parse_from(&args.compression) .with_context(|| format!("invalid compression:{}", args.compression))?, + max_buffer_size: 10 * 1024 * 1024, }; let output = Path::from(args.output); let mut writer = factory