Skip to content

Commit

Permalink
fix: skip empty parquet (#1236)
Browse files Browse the repository at this point in the history
* fix: returns None if parquet file does not contain any rows

* fix: skip empty parquet file

* chore: add doc

* rebase develop

* fix: use flatten instead of filter_map with identity
  • Loading branch information
v0y4g3r authored Mar 26, 2023
1 parent 77f9383 commit 6f81717
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 43 deletions.
36 changes: 19 additions & 17 deletions src/storage/src/compaction/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ impl<S: LogStore> CompactionTaskImpl<S> {
});
}

let outputs = futures::future::join_all(futs)
.await
let outputs = futures::future::try_join_all(futs)
.await?
.into_iter()
.collect::<Result<_>>()?;
.flatten()
.collect();
let inputs = compacted_inputs.into_iter().collect();
Ok((outputs, inputs))
}
Expand Down Expand Up @@ -162,7 +163,7 @@ impl CompactionOutput {
region_id: RegionId,
schema: RegionSchemaRef,
sst_layer: AccessLayerRef,
) -> Result<FileMeta> {
) -> Result<Option<FileMeta>> {
let reader = build_sst_reader(
schema,
sst_layer.clone(),
Expand All @@ -175,20 +176,21 @@ impl CompactionOutput {
let output_file_id = FileId::random();
let opts = WriteOptions {};

let SstInfo {
time_range,
file_size,
} = sst_layer
Ok(sst_layer
.write_sst(output_file_id, Source::Reader(reader), &opts)
.await?;

Ok(FileMeta {
region_id,
file_id: output_file_id,
time_range,
level: self.output_level,
file_size,
})
.await?
.map(
|SstInfo {
time_range,
file_size,
}| FileMeta {
region_id,
file_id: output_file_id,
time_range,
level: self.output_level,
file_size,
},
))
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/storage/src/compaction/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ mod tests {
} = writer
.write_sst(&sst::WriteOptions::default())
.await
.unwrap()
.unwrap();
let handle = FileHandle::new(
FileMeta {
Expand Down Expand Up @@ -415,6 +416,7 @@ mod tests {
)
.write_sst(&opts)
.await
.unwrap()
.unwrap();
assert_eq!(
Some((
Expand All @@ -431,6 +433,7 @@ mod tests {
)
.write_sst(&opts)
.await
.unwrap()
.unwrap();
assert_eq!(
Some((
Expand All @@ -447,6 +450,7 @@ mod tests {
)
.write_sst(&opts)
.await
.unwrap()
.unwrap();

assert_eq!(
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/file_purger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ mod tests {
let sst_info = layer
.write_sst(sst_file_id, Source::Iter(iter), &WriteOptions {})
.await
.unwrap()
.unwrap();

(
Expand Down
34 changes: 17 additions & 17 deletions src/storage/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,28 +202,28 @@ impl<S: LogStore> FlushJob<S> {
let sst_layer = self.sst_layer.clone();

futures.push(async move {
let SstInfo {
time_range,
file_size,
} = sst_layer
Ok(sst_layer
.write_sst(file_id, Source::Iter(iter), &WriteOptions::default())
.await?;

Ok(FileMeta {
region_id,
file_id,
time_range,
level: 0,
file_size,
})
.await?
.map(
|SstInfo {
time_range,
file_size,
}| FileMeta {
region_id,
file_id,
time_range,
level: 0,
file_size,
},
))
});
}

let metas = futures_util::future::join_all(futures)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?
let metas = futures_util::future::try_join_all(futures)
.await?
.into_iter()
.flatten()
.collect();

logging::info!("Successfully flush memtables to files: {:?}", metas);
Expand Down
9 changes: 5 additions & 4 deletions src/storage/src/sst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ impl Drop for FileHandleInner {
);
}
Err(e) => {
error!(e; "Failed to schedule SST purge task, region: {}, name: {}",
error!(e; "Failed to schedule SST purge task, region: {}, name: {}",
self.meta.region_id, self.meta.file_id.as_parquet());
}
}
Expand Down Expand Up @@ -402,13 +402,14 @@ pub trait AccessLayer: Send + Sync + std::fmt::Debug {
/// Returns the sst file path.
fn sst_file_path(&self, file_name: &str) -> String;

/// Writes SST file with given `file_id`.
/// Writes SST file with given `file_id` and returns the SST info.
/// If source does not contain any data, `write_sst` will return `Ok(None)`.
async fn write_sst(
&self,
file_id: FileId,
source: Source,
opts: &WriteOptions,
) -> Result<SstInfo>;
) -> Result<Option<SstInfo>>;

/// Read SST file with given `file_handle` and schema.
async fn read_sst(
Expand Down Expand Up @@ -478,7 +479,7 @@ impl AccessLayer for FsAccessLayer {
file_id: FileId,
source: Source,
opts: &WriteOptions,
) -> Result<SstInfo> {
) -> Result<Option<SstInfo>> {
// Now we only supports parquet format. We may allow caller to specific SST format in
// WriteOptions in the future.
let file_path = self.sst_file_path(&file_id.as_parquet());
Expand Down
49 changes: 45 additions & 4 deletions src/storage/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,17 @@ impl<'a> ParquetWriter<'a> {
}
}

pub async fn write_sst(self, _opts: &sst::WriteOptions) -> Result<SstInfo> {
pub async fn write_sst(self, _opts: &sst::WriteOptions) -> Result<Option<SstInfo>> {
self.write_rows(None).await
}

/// Iterates memtable and writes rows to Parquet file.
/// A chunk of records yielded from each iteration with a size given
/// in config will be written to a single row group.
async fn write_rows(mut self, extra_meta: Option<HashMap<String, String>>) -> Result<SstInfo> {
async fn write_rows(
mut self,
extra_meta: Option<HashMap<String, String>>,
) -> Result<Option<SstInfo>> {
let projected_schema = self.source.projected_schema();
let store_schema = projected_schema.schema_to_read();
let schema = store_schema.arrow_schema().clone();
Expand All @@ -106,6 +109,7 @@ impl<'a> ParquetWriter<'a> {
let mut arrow_writer = ArrowWriter::try_new(&mut buf, schema.clone(), Some(writer_props))
.context(WriteParquetSnafu)?;

let mut batches_written = 0;
while let Some(batch) = self.source.next_batch().await? {
let arrow_batch = RecordBatch::try_new(
schema.clone(),
Expand All @@ -119,8 +123,13 @@ impl<'a> ParquetWriter<'a> {
arrow_writer
.write(&arrow_batch)
.context(WriteParquetSnafu)?;
batches_written += 1;
}

if batches_written == 0 {
// if the source does not contain any batch, we skip writing an empty parquet file.
return Ok(None);
}
let file_meta = arrow_writer.close().context(WriteParquetSnafu)?;

let time_range = decode_timestamp_range(&file_meta, store_schema)
Expand All @@ -137,10 +146,10 @@ impl<'a> ParquetWriter<'a> {
path: object.path(),
})?
.content_length();
Ok(SstInfo {
Ok(Some(SstInfo {
time_range,
file_size,
})
}))
}
}

Expand Down Expand Up @@ -210,6 +219,13 @@ fn decode_timestamp_range_inner(
end = end.max(max);
}

assert!(
start <= end,
"Illegal timestamp range decoded from SST file {:?}, start: {}, end: {}",
file_meta,
start,
end
);
Ok(Some((
Timestamp::new(start, unit),
Timestamp::new(end, unit),
Expand Down Expand Up @@ -682,6 +698,7 @@ mod tests {
} = writer
.write_sst(&sst::WriteOptions::default())
.await
.unwrap()
.unwrap();

assert_eq!(
Expand Down Expand Up @@ -779,6 +796,7 @@ mod tests {
} = writer
.write_sst(&sst::WriteOptions::default())
.await
.unwrap()
.unwrap();

assert_eq!(
Expand Down Expand Up @@ -898,6 +916,7 @@ mod tests {
} = writer
.write_sst(&sst::WriteOptions::default())
.await
.unwrap()
.unwrap();

assert_eq!(
Expand Down Expand Up @@ -971,6 +990,28 @@ mod tests {
)
}

#[tokio::test]
async fn test_write_empty_file() {
common_telemetry::init_default_ut_logging();
let schema = memtable_tests::schema_for_test();
let memtable = DefaultMemtableBuilder::default().build(schema.clone());

let dir = create_temp_dir("read-parquet-by-range");
let path = dir.path().to_str().unwrap();
let backend = Fs::default().root(path).build().unwrap();
let object_store = ObjectStore::new(backend).finish();
let sst_file_name = "test-read.parquet";
let iter = memtable.iter(&IterContext::default()).unwrap();
let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone());

let sst_info_opt = writer
.write_sst(&sst::WriteOptions::default())
.await
.unwrap();

assert!(sst_info_opt.is_none());
}

#[test]
fn test_time_unit_lossy() {
// converting a range with unit second to millisecond will not cause rounding error
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/test_util/access_layer_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl AccessLayer for MockAccessLayer {
_file_id: FileId,
_source: Source,
_opts: &WriteOptions,
) -> crate::error::Result<SstInfo> {
) -> crate::error::Result<Option<SstInfo>> {
unimplemented!()
}

Expand Down

0 comments on commit 6f81717

Please sign in to comment.