Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: init iceberg writer #275

Merged
merged 3 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ array-init = "2"
arrow-arith = { version = "51" }
arrow-array = { version = "51" }
arrow-schema = { version = "51" }
arrow-select = { version = "51" }
async-stream = "0.3.5"
async-trait = "0.1"
bimap = "0.6"
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ array-init = { workspace = true }
arrow-arith = { workspace = true }
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
async-stream = { workspace = true }
async-trait = { workspace = true }
bimap = { workspace = true }
Expand Down
323 changes: 323 additions & 0 deletions crates/iceberg/src/writer/base_writer/data_file_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,323 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! This module provide `DataFileWriter`.

use crate::spec::{DataContentType, DataFile, Struct};
use crate::writer::file_writer::FileWriter;
use crate::writer::CurrentFileStatus;
use crate::writer::{file_writer::FileWriterBuilder, IcebergWriter, IcebergWriterBuilder};
use crate::Result;
use arrow_array::RecordBatch;
use itertools::Itertools;

/// Builder for `DataFileWriter`.
#[derive(Clone)]
pub struct DataFileWriterBuilder<B: FileWriterBuilder> {
inner: B,
}

impl<B: FileWriterBuilder> DataFileWriterBuilder<B> {
/// Create a new `DataFileWriterBuilder` using a `FileWriterBuilder`.
pub fn new(inner: B) -> Self {
Self { inner }
}
}

/// Config for `DataFileWriter`.
pub struct DataFileWriterConfig {
partition_value: Struct,
}

impl DataFileWriterConfig {
/// Create a new `DataFileWriterConfig` with partition value.
pub fn new(partition_value: Option<Struct>) -> Self {
Self {
partition_value: partition_value.unwrap_or(Struct::empty()),
}
}
}

#[async_trait::async_trait]
impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {
type R = DataFileWriter<B>;
type C = DataFileWriterConfig;

async fn build(self, config: Self::C) -> Result<Self::R> {
Ok(DataFileWriter {
inner_writer: self.inner.clone().build().await?,
builder: self.inner,
partition_value: config.partition_value,
})
}
}

/// A writer write data is within one spec/partition.
pub struct DataFileWriter<B: FileWriterBuilder> {
builder: B,
inner_writer: B::R,
partition_value: Struct,
}

#[async_trait::async_trait]
impl<B: FileWriterBuilder> IcebergWriter for DataFileWriter<B> {
async fn write(&mut self, batch: RecordBatch) -> Result<()> {
self.inner_writer.write(&batch).await
}

async fn flush(&mut self) -> Result<Vec<DataFile>> {
let writer = std::mem::replace(&mut self.inner_writer, self.builder.clone().build().await?);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I'm bit confused about this. Why we need this? What will happen if user calling flush on the same file twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the FileWriter only provides the interface close(self) which will make it simple. To provide the interface flush(&mut self), each time flush we will create a new FileWriter.

A FileWriter will write the data into one or multiple files. There is no restriction for that.

What will happen if user calling flush on the same file twice?

It's safe. The semantic of flush is to flush data written before into the storage and generate the files.

Copy link
Member

@Xuanwo Xuanwo Apr 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking about the error handling cases. If user hits error during flush, they can't retry it and will lost all existing written data. How about pushing the writer back if it flush failed? User can decide whether abort the whole writing process or retry this flush.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking about the error handling cases. If user hits error during flush, they can't retry it and will lost all existing written data. How about pushing the writer back if it flush failed? User can decide whether abort the whole writing process or retry this flush.

It's fine if you think it's out of the current scope. We can create an issue for this and keep moving forward.

Copy link
Contributor Author

@ZENOTME ZENOTME Apr 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking about the error handling cases. If user hits error during flush, they can't retry it and will lost all existing written data. How about pushing the writer back if it flush failed? User can decide whether abort the whole writing process or retry this flush.

Thanks! I think you remind me that this way is not good for the error handle.
For a flush process, there are two phases:

  1. close inner writer.
  2. create a new writer.

Because of fn close(self), we have to replace a new writer before closing successfully. So there are two error cases that may happen:

  1. create a new writer fail so that we skip to close the original writer
  2. close fail but we create a new writer

I think there are two solutions:

  1. Use fn close(&mut self)
  2. Use Option<Writer> so that we can take the writer temporarily.

Copy link
Member

@Xuanwo Xuanwo Apr 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do prefer to use fn close(&mut self) since close also might return error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking about the error handling cases. If user hits error during flush, they can't retry it and will lost all existing written data. How about pushing the writer back if it flush failed? User can decide whether abort the whole writing process or retry this flush.

I find that we can't guarantee that the user can retry the flush. E.g. the close interface for parquet will consume itself. I think a more simple semantic we can guarantee that if flush fails, the data will lose all existing written data and the writer can be used again. It is not friendly for users but it's easier to maintain its semantics.
For this semantic, I think Option<Writer> and fn close(self) may be more appropriate because it guarantees that a writer will not be used again if the close fails.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a more simple semantic we can guarantee that if flush fails, the data will lose all existing written data and the writer can be used again. It is not friendly for users but it's easier to maintain its semantics.

Makes sense. By the way, opendal can retry internally, so this should not be a big issue. Let's keep moving.

let res = writer
.close()
.await?
.into_iter()
.map(|mut res| {
res.content(DataContentType::Data);
res.partition(self.partition_value.clone());
res.build().expect("Guaranteed to be valid")
})
.collect_vec();
Ok(res)
}
}

impl<B: FileWriterBuilder> CurrentFileStatus for DataFileWriter<B> {
fn current_file_path(&self) -> String {
self.inner_writer.current_file_path()
}

fn current_row_num(&self) -> usize {
self.inner_writer.current_row_num()
}

fn current_written_size(&self) -> usize {
self.inner_writer.current_written_size()
}
}

#[cfg(test)]
mod test {
use std::{collections::HashMap, sync::Arc};

use arrow_array::{types::Int64Type, ArrayRef, Int64Array, RecordBatch, StructArray};
use parquet::{arrow::PARQUET_FIELD_ID_META_KEY, file::properties::WriterProperties};
use tempfile::TempDir;

use crate::{
io::FileIOBuilder,
spec::DataFileFormat,
writer::{
base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig},
file_writer::{
location_generator::{test::MockLocationGenerator, DefaultFileNameGenerator},
ParquetWriterBuilder,
},
tests::check_parquet_data_file,
IcebergWriter, IcebergWriterBuilder,
},
};

#[tokio::test]
async fn test_data_file_writer() -> Result<(), anyhow::Error> {
let temp_dir = TempDir::new().unwrap();
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
let location_gen =
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
let file_name_gen =
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);

// prepare data
// Int, Struct(Int), String, List(Int), Struct(Struct(Int))
let schema = {
let fields = vec![
arrow_schema::Field::new("col0", arrow_schema::DataType::Int64, true)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"0".to_string(),
)])),
arrow_schema::Field::new(
"col1",
arrow_schema::DataType::Struct(
vec![arrow_schema::Field::new(
"sub_col",
arrow_schema::DataType::Int64,
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"5".to_string(),
)]))]
.into(),
),
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
arrow_schema::Field::new("col2", arrow_schema::DataType::Utf8, true).with_metadata(
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
),
arrow_schema::Field::new(
"col3",
arrow_schema::DataType::List(Arc::new(
arrow_schema::Field::new("item", arrow_schema::DataType::Int64, true)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"6".to_string(),
)])),
)),
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"3".to_string(),
)])),
arrow_schema::Field::new(
"col4",
arrow_schema::DataType::Struct(
vec![arrow_schema::Field::new(
"sub_col",
arrow_schema::DataType::Struct(
vec![arrow_schema::Field::new(
"sub_sub_col",
arrow_schema::DataType::Int64,
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"7".to_string(),
)]))]
.into(),
),
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"8".to_string(),
)]))]
.into(),
),
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"4".to_string(),
)])),
];
Arc::new(arrow_schema::Schema::new(fields))
};
let col0 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
let col1 = Arc::new(StructArray::new(
vec![
arrow_schema::Field::new("sub_col", arrow_schema::DataType::Int64, true)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"5".to_string(),
)])),
]
.into(),
vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
None,
));
let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![
"test";
1024
])) as ArrayRef;
let col3 = Arc::new({
let list_parts = arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(
vec![Some(1),]
);
1024
])
.into_parts();
arrow_array::ListArray::new(
Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"6".to_string(),
)]))),
list_parts.1,
list_parts.2,
list_parts.3,
)
}) as ArrayRef;
let col4 = Arc::new(StructArray::new(
vec![arrow_schema::Field::new(
"sub_col",
arrow_schema::DataType::Struct(
vec![arrow_schema::Field::new(
"sub_sub_col",
arrow_schema::DataType::Int64,
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"7".to_string(),
)]))]
.into(),
),
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"8".to_string(),
)]))]
.into(),
vec![Arc::new(StructArray::new(
vec![
arrow_schema::Field::new("sub_sub_col", arrow_schema::DataType::Int64, true)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"7".to_string(),
)])),
]
.into(),
vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
None,
))],
None,
));
let to_write =
RecordBatch::try_new(schema.clone(), vec![col0, col1, col2, col3, col4]).unwrap();

// prepare writer
let pb = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
to_write.schema(),
file_io.clone(),
location_gen,
file_name_gen,
);
let mut data_file_writer = DataFileWriterBuilder::new(pb)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels a little bit odd to pass a builder to another builder. Could we do something like this?

DataFileWriter
    .builder()
    // this would call ParquetWriter::builder() to get a `ParquetWriterBuilder`,
    // and then pass ownership of the `DataFileWriter` to the `ParquetWriterBuilder`,
    // returning the `ParquetWriterBuilder`
    .with_writer(ParquetWriter)  
    // these calls happen in the `ParquetWriterBuilder`, 
    // allowing customization of the wrapped concrete writer
    .with_foo()
    .with_bar()
    // this finalizes the `ParquetWriterBuilder`, building a
    // `ParquetWriter`, and returns the `DataFileWriterBuilder`
    // that was passed earlier, after first passing in the `ParrquetWriter`
    .build_writer()
    // these calls now happen on the `DataFileWriterBuilder`,
    // allowing further setup of the `DataFileWriter`
    .with_baz()
    .with_quux()
    // finally returns a `DataFileWriter`, or perhaps a
    // `Result<DataFileWriter>` or Future of one of those
    .build() 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This top-down way looks good to me to avoid passing builders to each other. However, I am not sure whether this style will incur more complexity in the future.

Actually, this API for now can do this thing because it doesn't restrict how to create and use the builder like the following. So I think this is more relative to the implementation style of our builder rather than the API design.

#[derive(Clone)]
struct B;

impl B {
    pub fn new() -> Self {
        Self
    }

    pub fn with_config(&mut self, _: ()) -> &mut Self {
        self
    }
}

#[async_trait::async_trait]
impl IcebergWriterBuilder for B {
    type R = BW;
    type C = ();
    async fn build(self, _: Self::C) -> Result<Self::R> {
        Ok(BW)
    }
}

struct BW;

impl BW {
    pub fn builder() -> B {
        B::new()
    }
}

#[async_trait::async_trait]
impl IcebergWriter for BW {
    async fn write(&mut self, _input: DefaultInput) -> Result<()> {
        Ok(())
    }

    async fn flush(&mut self) -> Result<DefaultOutput> {
        Ok(vec![])
    }
}

#[derive(Clone)]
struct A<I> {
    inner: Option<I>,
}

impl<I: IcebergWriterBuilder> A<I> {
    pub fn new() -> Self {
        Self { inner: None }
    }

    pub fn with_buidler(&mut self, builder: I) -> &mut I {
        self.inner = Some(builder);
        self.inner.as_mut().unwrap()
    }

    pub fn with_config(&mut self, _: ()) -> &mut Self {
        self
    }
}

struct AW;

impl AW {
    pub fn builder<I:IcebergWriterBuilder>() -> A<I> {
        A::<I>::new()
    }
}

#[async_trait::async_trait]
impl<I: IcebergWriterBuilder> IcebergWriterBuilder for A<I> {
    type R = AW;
    type C = ();
    async fn build(self, _: Self::C) -> Result<Self::R> {
        Ok(AW)
    }
}

#[async_trait::async_trait]
impl IcebergWriter for AW {
    async fn write(&mut self, _input: DefaultInput) -> Result<()> {
        Ok(())
    }

    async fn flush(&mut self) -> Result<DefaultOutput> {
        Ok(vec![])
    }
}

async fn test() {
    let mut a = AW::builder();
    a.
        // config first A
        with_config(()).
        with_buidler(AW::builder()).
        // config second A
        with_config(()).
        with_buidler(BW::builder()).
        // config BW
        with_config(());
    let writer = a.build(()).await;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both way looks good to me. cc @Fokko @Xuanwo @liurenjie1024

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remembered that we have discussed about this before...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean this #135 (comment). Indeed they look similar.🥵

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can move on first if both way looks good?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

.build(DataFileWriterConfig::new(None))
.await?;

for _ in 0..3 {
// write
data_file_writer.write(to_write.clone()).await?;
let res = data_file_writer.flush().await?;
assert_eq!(res.len(), 1);
let data_file = res.into_iter().next().unwrap();

// check
check_parquet_data_file(&file_io, &data_file, &to_write).await;
}

Ok(())
}
}
20 changes: 20 additions & 0 deletions crates/iceberg/src/writer/base_writer/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Base writer module contains the basic writer provide by iceberg: `DataFileWriter`, `PositionDeleteFileWriter`, `EqualityDeleteFileWriter`.

pub mod data_file_writer;
6 changes: 4 additions & 2 deletions crates/iceberg/src/writer/file_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

//! This module contains the writer for data file format supported by iceberg: parquet, orc.

use super::{CurrentFileStatus, DefaultOutput};
use crate::Result;
use super::CurrentFileStatus;
use crate::{spec::DataFileBuilder, Result};
use arrow_array::RecordBatch;
use futures::Future;

Expand All @@ -28,6 +28,8 @@ mod track_writer;

pub mod location_generator;

type DefaultOutput = Vec<DataFileBuilder>;

/// File writer builder trait.
pub trait FileWriterBuilder<O = DefaultOutput>: Send + Clone + 'static {
/// The associated file writer type.
Expand Down
Loading
Loading