Skip to content

Commit

Permalink
refine the interface
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Apr 11, 2024
1 parent e824477 commit e1cbfe1
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 16 deletions.
25 changes: 11 additions & 14 deletions crates/iceberg/src/writer/base_writer/data_file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,29 +59,27 @@ impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {

async fn build(self, config: Self::C) -> Result<Self::R> {
Ok(DataFileWriter {
inner_writer: self.inner.clone().build().await?,
builder: self.inner,
inner_writer: Some(self.inner.clone().build().await?),
partition_value: config.partition_value,
})
}
}

/// A writer write data is within one spec/partition.
pub struct DataFileWriter<B: FileWriterBuilder> {
builder: B,
inner_writer: B::R,
inner_writer: Option<B::R>,
partition_value: Struct,
}

#[async_trait::async_trait]
impl<B: FileWriterBuilder> IcebergWriter for DataFileWriter<B> {
async fn write(&mut self, batch: RecordBatch) -> Result<()> {
self.inner_writer.write(&batch).await
self.inner_writer.as_mut().unwrap().write(&batch).await
}

async fn flush(&mut self) -> Result<Vec<DataFile>> {
let writer = std::mem::replace(&mut self.inner_writer, self.builder.clone().build().await?);
let res = writer
async fn close(&mut self) -> Result<Vec<DataFile>> {
let writer = self.inner_writer.take().unwrap();
Ok(writer
.close()
.await?
.into_iter()
Expand All @@ -90,22 +88,21 @@ impl<B: FileWriterBuilder> IcebergWriter for DataFileWriter<B> {
res.partition(self.partition_value.clone());
res.build().expect("Guaranteed to be valid")
})
.collect_vec();
Ok(res)
.collect_vec())
}
}

impl<B: FileWriterBuilder> CurrentFileStatus for DataFileWriter<B> {
fn current_file_path(&self) -> String {
self.inner_writer.current_file_path()
self.inner_writer.as_ref().unwrap().current_file_path()
}

fn current_row_num(&self) -> usize {
self.inner_writer.current_row_num()
self.inner_writer.as_ref().unwrap().current_row_num()
}

fn current_written_size(&self) -> usize {
self.inner_writer.current_written_size()
self.inner_writer.as_ref().unwrap().current_written_size()
}
}

Expand Down Expand Up @@ -310,7 +307,7 @@ mod test {
for _ in 0..3 {
// write
data_file_writer.write(to_write.clone()).await?;
let res = data_file_writer.flush().await?;
let res = data_file_writer.close().await?;
assert_eq!(res.len(), 1);
let data_file = res.into_iter().next().unwrap();

Expand Down
7 changes: 5 additions & 2 deletions crates/iceberg/src/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>:
pub trait IcebergWriter<I = DefaultInput, O = DefaultOutput>: Send + 'static {
/// Write data to iceberg table.
async fn write(&mut self, input: I) -> Result<()>;
/// Flush the writer and return the write result.
async fn flush(&mut self) -> Result<O>;
/// Close the writer and return the written data files.
/// If close failed, the data written before maybe be lost. User may need to recreate the writer and rewrite the data again.
/// # NOTE
/// After close, no matter successfully or fail,the writer should never be used again, otherwise the writer will panic.
async fn close(&mut self) -> Result<O>;
}

/// The current file status of iceberg writer. It implement for the writer which write a single
Expand Down

0 comments on commit e1cbfe1

Please sign in to comment.