From b0b63a3569683e9aa392de323dcc38ccaeb87331 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 11 Apr 2024 13:40:25 +0800 Subject: [PATCH] refine the interface --- .../writer/base_writer/data_file_writer.rs | 39 ++++++++----------- crates/iceberg/src/writer/mod.rs | 16 ++++++-- 2 files changed, 30 insertions(+), 25 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs b/crates/iceberg/src/writer/base_writer/data_file_writer.rs index 9090baff7..442c9f164 100644 --- a/crates/iceberg/src/writer/base_writer/data_file_writer.rs +++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs @@ -59,8 +59,7 @@ impl IcebergWriterBuilder for DataFileWriterBuilder { async fn build(self, config: Self::C) -> Result { Ok(DataFileWriter { - inner_writer: self.inner.clone().build().await?, - builder: self.inner, + inner_writer: Some(self.inner.clone().build().await?), partition_value: config.partition_value, }) } @@ -68,20 +67,19 @@ impl IcebergWriterBuilder for DataFileWriterBuilder { /// A writer write data is within one spec/partition. pub struct DataFileWriter { - builder: B, - inner_writer: B::R, + inner_writer: Option, partition_value: Struct, } #[async_trait::async_trait] impl IcebergWriter for DataFileWriter { async fn write(&mut self, batch: RecordBatch) -> Result<()> { - self.inner_writer.write(&batch).await + self.inner_writer.as_mut().unwrap().write(&batch).await } - async fn flush(&mut self) -> Result> { - let writer = std::mem::replace(&mut self.inner_writer, self.builder.clone().build().await?); - let res = writer + async fn close(&mut self) -> Result> { + let writer = self.inner_writer.take().unwrap(); + Ok(writer .close() .await? .into_iter() @@ -90,22 +88,21 @@ impl IcebergWriter for DataFileWriter { res.partition(self.partition_value.clone()); res.build().expect("Guaranteed to be valid") }) - .collect_vec(); - Ok(res) + .collect_vec()) } } impl CurrentFileStatus for DataFileWriter { fn current_file_path(&self) -> String { - self.inner_writer.current_file_path() + self.inner_writer.as_ref().unwrap().current_file_path() } fn current_row_num(&self) -> usize { - self.inner_writer.current_row_num() + self.inner_writer.as_ref().unwrap().current_row_num() } fn current_written_size(&self) -> usize { - self.inner_writer.current_written_size() + self.inner_writer.as_ref().unwrap().current_written_size() } } @@ -307,16 +304,14 @@ mod test { .build(DataFileWriterConfig::new(None)) .await?; - for _ in 0..3 { - // write - data_file_writer.write(to_write.clone()).await?; - let res = data_file_writer.flush().await?; - assert_eq!(res.len(), 1); - let data_file = res.into_iter().next().unwrap(); + // write + data_file_writer.write(to_write.clone()).await?; + let res = data_file_writer.close().await?; + assert_eq!(res.len(), 1); + let data_file = res.into_iter().next().unwrap(); - // check - check_parquet_data_file(&file_io, &data_file, &to_write).await; - } + // check + check_parquet_data_file(&file_io, &data_file, &to_write).await; Ok(()) } diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index 499e914e7..7618d2ec3 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -72,8 +72,11 @@ pub trait IcebergWriterBuilder: pub trait IcebergWriter: Send + 'static { /// Write data to iceberg table. async fn write(&mut self, input: I) -> Result<()>; - /// Flush the writer and return the write result. - async fn flush(&mut self) -> Result; + /// Close the writer and return the written data files. + /// If close failed, the data written before maybe be lost. User may need to recreate the writer and rewrite the data again. + /// # NOTE + /// After close, no matter successfully or fail,the writer should never be used again, otherwise the writer will panic. + async fn close(&mut self) -> Result; } /// The current file status of iceberg writer. It implement for the writer which write a single @@ -90,6 +93,7 @@ pub trait CurrentFileStatus { #[cfg(test)] mod tests { use arrow_array::RecordBatch; + use arrow_schema::Schema; use arrow_select::concat::concat_batches; use bytes::Bytes; use futures::AsyncReadExt; @@ -102,7 +106,13 @@ mod tests { use super::IcebergWriter; - fn _guarantee_object_safe(_: &dyn IcebergWriter) {} + // This function is used to guarantee the trait can be used as a object safe trait. + async fn _guarantee_object_safe(mut w: Box) { + let _ = w + .write(RecordBatch::new_empty(Schema::empty().into())) + .await; + let _ = w.close().await; + } // This function check: // The data of the written parquet file is correct.