Skip to content

Commit

Permalink
refine the interface
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Apr 11, 2024
1 parent e824477 commit b0b63a3
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 25 deletions.
39 changes: 17 additions & 22 deletions crates/iceberg/src/writer/base_writer/data_file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,29 +59,27 @@ impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {

async fn build(self, config: Self::C) -> Result<Self::R> {
Ok(DataFileWriter {
inner_writer: self.inner.clone().build().await?,
builder: self.inner,
inner_writer: Some(self.inner.clone().build().await?),
partition_value: config.partition_value,
})
}
}

/// A writer write data is within one spec/partition.
pub struct DataFileWriter<B: FileWriterBuilder> {
builder: B,
inner_writer: B::R,
inner_writer: Option<B::R>,
partition_value: Struct,
}

#[async_trait::async_trait]
impl<B: FileWriterBuilder> IcebergWriter for DataFileWriter<B> {
async fn write(&mut self, batch: RecordBatch) -> Result<()> {
self.inner_writer.write(&batch).await
self.inner_writer.as_mut().unwrap().write(&batch).await
}

async fn flush(&mut self) -> Result<Vec<DataFile>> {
let writer = std::mem::replace(&mut self.inner_writer, self.builder.clone().build().await?);
let res = writer
async fn close(&mut self) -> Result<Vec<DataFile>> {
let writer = self.inner_writer.take().unwrap();
Ok(writer
.close()
.await?
.into_iter()
Expand All @@ -90,22 +88,21 @@ impl<B: FileWriterBuilder> IcebergWriter for DataFileWriter<B> {
res.partition(self.partition_value.clone());
res.build().expect("Guaranteed to be valid")
})
.collect_vec();
Ok(res)
.collect_vec())
}
}

impl<B: FileWriterBuilder> CurrentFileStatus for DataFileWriter<B> {
fn current_file_path(&self) -> String {
self.inner_writer.current_file_path()
self.inner_writer.as_ref().unwrap().current_file_path()
}

fn current_row_num(&self) -> usize {
self.inner_writer.current_row_num()
self.inner_writer.as_ref().unwrap().current_row_num()
}

fn current_written_size(&self) -> usize {
self.inner_writer.current_written_size()
self.inner_writer.as_ref().unwrap().current_written_size()
}
}

Expand Down Expand Up @@ -307,16 +304,14 @@ mod test {
.build(DataFileWriterConfig::new(None))
.await?;

for _ in 0..3 {
// write
data_file_writer.write(to_write.clone()).await?;
let res = data_file_writer.flush().await?;
assert_eq!(res.len(), 1);
let data_file = res.into_iter().next().unwrap();
// write
data_file_writer.write(to_write.clone()).await?;
let res = data_file_writer.close().await?;
assert_eq!(res.len(), 1);
let data_file = res.into_iter().next().unwrap();

// check
check_parquet_data_file(&file_io, &data_file, &to_write).await;
}
// check
check_parquet_data_file(&file_io, &data_file, &to_write).await;

Ok(())
}
Expand Down
16 changes: 13 additions & 3 deletions crates/iceberg/src/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>:
pub trait IcebergWriter<I = DefaultInput, O = DefaultOutput>: Send + 'static {
/// Write data to iceberg table.
async fn write(&mut self, input: I) -> Result<()>;
/// Flush the writer and return the write result.
async fn flush(&mut self) -> Result<O>;
/// Close the writer and return the written data files.
/// If close failed, the data written before maybe be lost. User may need to recreate the writer and rewrite the data again.
/// # NOTE
/// After close, no matter successfully or fail,the writer should never be used again, otherwise the writer will panic.
async fn close(&mut self) -> Result<O>;
}

/// The current file status of iceberg writer. It implement for the writer which write a single
Expand All @@ -90,6 +93,7 @@ pub trait CurrentFileStatus {
#[cfg(test)]
mod tests {
use arrow_array::RecordBatch;
use arrow_schema::Schema;
use arrow_select::concat::concat_batches;
use bytes::Bytes;
use futures::AsyncReadExt;
Expand All @@ -102,7 +106,13 @@ mod tests {

use super::IcebergWriter;

fn _guarantee_object_safe(_: &dyn IcebergWriter) {}
// This function is used to guarantee the trait can be used as a object safe trait.
async fn _guarantee_object_safe(mut w: Box<dyn IcebergWriter>) {
let _ = w
.write(RecordBatch::new_empty(Schema::empty().into()))
.await;
let _ = w.close().await;
}

// This function check:
// The data of the written parquet file is correct.
Expand Down

0 comments on commit b0b63a3

Please sign in to comment.