Skip to content

Commit

Permalink
Merge pull request #1440 from jiacai2050/feat-meta-size
Browse files Browse the repository at this point in the history
feat: persist sst meta size
  • Loading branch information
jiacai2050 authored Jan 31, 2024
2 parents 601d821 + 200433b commit 6f9d426
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 36 deletions.
10 changes: 8 additions & 2 deletions src/analytic_engine/src/sst/meta_data/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ use std::{
use lru::LruCache;
use object_store::{ObjectStoreRef, Path};
use parquet::{file::metadata::FileMetaData, format::KeyValue};
use snafu::{ensure, OptionExt};
use snafu::{ensure, OptionExt, ResultExt};

use crate::sst::{
meta_data::{
metadata_reader::parse_metadata, KvMetaDataNotFound, KvMetaVersionEmpty,
metadata_reader::parse_metadata, InvalidSize, KvMetaDataNotFound, KvMetaVersionEmpty,
ParquetMetaDataRef, Result,
},
metrics::{META_DATA_CACHE_HIT_COUNTER, META_DATA_CACHE_MISS_COUNTER},
Expand Down Expand Up @@ -66,6 +66,7 @@ impl MetaData {
ensure!(!kv_metas.is_empty(), KvMetaDataNotFound);

let mut meta_path = None;
let mut meta_size = None;
let mut other_kv_metas: Vec<KeyValue> = Vec::with_capacity(kv_metas.len() - 1);
let mut custom_kv_meta = None;
let mut meta_version = encoding::META_VERSION_V1; // default is v1
Expand All @@ -77,6 +78,10 @@ impl MetaData {
meta_path = kv_meta.value.as_ref().map(|path| Path::from(path.as_str()))
} else if kv_meta.key == encoding::META_VERSION_KEY {
meta_version = kv_meta.value.as_ref().context(KvMetaVersionEmpty)?;
} else if kv_meta.key == encoding::META_SIZE_KEY {
let size = kv_meta.value.as_ref().context(KvMetaVersionEmpty)?;
let size = size.parse::<usize>().context(InvalidSize { size })?;
meta_size = Some(size);
} else {
other_kv_metas.push(kv_meta.clone());
}
Expand All @@ -87,6 +92,7 @@ impl MetaData {
custom_kv_meta,
ignore_sst_filter,
meta_path.clone(),
meta_size,
store,
)
.await?;
Expand Down
50 changes: 35 additions & 15 deletions src/analytic_engine/src/sst/meta_data/metadata_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,17 @@ impl CustomMetadataReader for MetaV1Reader<'_> {

pub struct MetaV2Reader {
meta_path: Option<Path>,
meta_size: Option<usize>,
store: ObjectStoreRef,
}

impl MetaV2Reader {
fn new(meta_path: Option<Path>, store: ObjectStoreRef) -> Self {
Self { meta_path, store }
fn new(meta_path: Option<Path>, meta_size: Option<usize>, store: ObjectStoreRef) -> Self {
Self {
meta_path,
meta_size,
store,
}
}
}

Expand All @@ -77,18 +82,32 @@ impl CustomMetadataReader for MetaV2Reader {
match &self.meta_path {
None => KvMetaPathEmpty {}.fail(),
Some(meta_path) => {
let metadata = self
.store
.get(meta_path)
.await
.with_context(|| FetchFromStore {
file_path: meta_path.to_string(),
})?
.bytes()
.await
.with_context(|| FetchAndDecodeSstMeta {
file_path: meta_path.to_string(),
})?;
// TODO: The disk cache only works for `get_range` now, so here
// We prefer to use `get_range` to fetch metadata when possible.
// A better way is to fix https://github.com/apache/incubator-horaedb/issues/1473.
let metadata = match self.meta_size {
Some(size) => {
let all_range = 0..size;
self.store
.get_range(meta_path, all_range)
.await
.with_context(|| FetchFromStore {
file_path: meta_path.to_string(),
})?
}
None => self
.store
.get(meta_path)
.await
.with_context(|| FetchFromStore {
file_path: meta_path.to_string(),
})?
.bytes()
.await
.with_context(|| FetchAndDecodeSstMeta {
file_path: meta_path.to_string(),
})?,
};

decode_sst_meta_data_from_bytes(metadata.as_bytes()).context(DecodeCustomMetaData)
}
Expand All @@ -101,6 +120,7 @@ pub async fn parse_metadata(
custom_kv_meta: Option<&KeyValue>,
ignore_sst_filter: bool,
meta_path: Option<Path>,
meta_size: Option<usize>,
store: ObjectStoreRef,
) -> Result<ParquetMetaDataRef> {
// Must ensure custom metadata only store in one place
Expand All @@ -111,7 +131,7 @@ pub async fn parse_metadata(

let reader: Box<dyn CustomMetadataReader + Send + Sync + '_> = match meta_version {
META_VERSION_V1 => Box::new(MetaV1Reader::new(custom_kv_meta)),
META_VERSION_CURRENT => Box::new(MetaV2Reader::new(meta_path, store)),
META_VERSION_CURRENT => Box::new(MetaV2Reader::new(meta_path, meta_size, store)),
_ => {
return UnknownMetaVersion {
version: meta_version,
Expand Down
9 changes: 9 additions & 0 deletions src/analytic_engine/src/sst/meta_data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ pub enum Error {
source: Utf8Error,
backtrace: Backtrace,
},

#[snafu(display(
"Parse meta size failed, size:{size}, source:{source}.\nBacktrace:\n{backtrace}"
))]
InvalidSize {
size: String,
source: std::num::ParseIntError,
backtrace: Backtrace,
},
}

define_result!(Error);
Expand Down
17 changes: 17 additions & 0 deletions src/analytic_engine/src/sst/parquet/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ pub const META_VERSION_V1: &str = "1";
pub const META_VERSION_CURRENT: &str = "2";
pub const META_KEY: &str = "meta"; // used in v1
pub const META_PATH_KEY: &str = "meta_path"; // used in v2
pub const META_SIZE_KEY: &str = "meta_size"; // used in v2
pub const META_VERSION_KEY: &str = "meta_version";
pub const META_VALUE_HEADER: u8 = 0;

Expand Down Expand Up @@ -228,6 +229,7 @@ trait RecordEncoder {
async fn encode(&mut self, record_batches: Vec<ArrowRecordBatch>) -> Result<usize>;

fn set_meta_data_path(&mut self, metadata_path: Option<String>) -> Result<()>;
fn set_meta_data_size(&mut self, size: usize) -> Result<()>;

/// Return encoded bytes
/// Note: trait method cannot receive `self`, so take a &mut self here to
Expand Down Expand Up @@ -323,6 +325,17 @@ impl<W: AsyncWrite + Send + Unpin> RecordEncoder for ColumnarRecordEncoder<W> {
Ok(())
}

fn set_meta_data_size(&mut self, size: usize) -> Result<()> {
let size_kv = KeyValue {
key: META_SIZE_KEY.to_string(),
value: Some(size.to_string()),
};
let writer = self.arrow_writer.as_mut().unwrap();
writer.append_key_value_metadata(size_kv);

Ok(())
}

async fn close(&mut self) -> Result<()> {
assert!(self.arrow_writer.is_some());

Expand Down Expand Up @@ -369,6 +382,10 @@ impl ParquetEncoder {
self.record_encoder.set_meta_data_path(meta_data_path)
}

pub fn set_meta_data_size(&mut self, size: usize) -> Result<()> {
self.record_encoder.set_meta_data_size(size)
}

pub async fn close(mut self) -> Result<()> {
self.record_encoder.close().await
}
Expand Down
42 changes: 23 additions & 19 deletions src/analytic_engine/src/sst/parquet/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ impl<'a> RecordBatchGroupWriter<'a> {
mut self,
sink: W,
meta_path: &Path,
) -> Result<(usize, ParquetMetaData)> {
) -> Result<(usize, ParquetMetaData, ParquetEncoder)> {
let mut prev_record_batch: Option<FetchedRecordBatch> = None;
let mut arrow_row_group = Vec::new();
let mut total_num_rows = 0;
Expand Down Expand Up @@ -401,13 +401,7 @@ impl<'a> RecordBatchGroupWriter<'a> {
.box_err()
.context(EncodeRecordBatch)?;

parquet_encoder
.close()
.await
.box_err()
.context(EncodeRecordBatch)?;

Ok((total_num_rows, parquet_meta_data))
Ok((total_num_rows, parquet_meta_data, parquet_encoder))
}
}

Expand Down Expand Up @@ -449,23 +443,22 @@ async fn write_metadata<W>(
mut meta_sink: W,
parquet_metadata: ParquetMetaData,
meta_path: &object_store::Path,
) -> writer::Result<()>
) -> writer::Result<usize>
where
W: AsyncWrite + Send + Unpin,
{
let buf = encode_sst_meta_data(parquet_metadata).context(EncodePbData)?;
meta_sink
.write_all(buf.as_bytes())
.await
.with_context(|| Io {
file: meta_path.clone(),
})?;
let bytes = buf.as_bytes();
let bytes_size = bytes.len();
meta_sink.write_all(bytes).await.with_context(|| Io {
file: meta_path.clone(),
})?;

meta_sink.shutdown().await.with_context(|| Io {
file: meta_path.clone(),
})?;

Ok(())
Ok(bytes_size)
}

async fn multi_upload_abort(path: &Path, aborter: ObjectStoreMultiUploadAborter<'_>) {
Expand Down Expand Up @@ -503,7 +496,7 @@ impl<'a> SstWriter for ParquetSstWriter<'a> {

let meta_path = Path::from(sst_util::new_metadata_path(self.path.as_ref()));

let (total_num_rows, parquet_metadata) =
let (total_num_rows, parquet_metadata, mut data_encoder) =
match group_writer.write_all(sink, &meta_path).await {
Ok(v) => v,
Err(e) => {
Expand All @@ -515,14 +508,25 @@ impl<'a> SstWriter for ParquetSstWriter<'a> {

let (meta_aborter, meta_sink) =
ObjectStoreMultiUploadAborter::initialize_upload(self.store, &meta_path).await?;
match write_metadata(meta_sink, parquet_metadata, &meta_path).await {
let meta_size = match write_metadata(meta_sink, parquet_metadata, &meta_path).await {
Ok(v) => v,
Err(e) => {
multi_upload_abort(self.path, aborter).await;
multi_upload_abort(&meta_path, meta_aborter).await;
return Err(e);
}
}
};

data_encoder
.set_meta_data_size(meta_size)
.box_err()
.context(EncodeRecordBatch)?;

data_encoder
.close()
.await
.box_err()
.context(EncodeRecordBatch)?;

let file_head = self.store.head(self.path).await.context(Storage)?;
Ok(SstInfo {
Expand Down

0 comments on commit 6f9d426

Please sign in to comment.