From 4407bc157f58c09be7ac1b461531484b8e311f96 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Sat, 16 Mar 2024 14:02:24 +0800 Subject: [PATCH 1/3] init iceberg writer --- Cargo.toml | 1 + crates/iceberg/Cargo.toml | 1 + .../writer/base_writer/data_file_writer.rs | 309 ++++++++++++++++++ crates/iceberg/src/writer/base_writer/mod.rs | 20 ++ .../src/writer/file_writer/parquet_writer.rs | 108 +----- crates/iceberg/src/writer/mod.rs | 154 ++++++++- 6 files changed, 489 insertions(+), 104 deletions(-) create mode 100644 crates/iceberg/src/writer/base_writer/data_file_writer.rs create mode 100644 crates/iceberg/src/writer/base_writer/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 7da16e00d..11a1bd3df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ array-init = "2" arrow-arith = { version = "51" } arrow-array = { version = "51" } arrow-schema = { version = "51" } +arrow-select = { version = "51" } async-stream = "0.3.5" async-trait = "0.1" bimap = "0.6" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 5aea856fe..46f167b7a 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -35,6 +35,7 @@ array-init = { workspace = true } arrow-arith = { workspace = true } arrow-array = { workspace = true } arrow-schema = { workspace = true } +arrow-select = { workspace = true } async-stream = { workspace = true } async-trait = { workspace = true } bimap = { workspace = true } diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs b/crates/iceberg/src/writer/base_writer/data_file_writer.rs new file mode 100644 index 000000000..491b122d4 --- /dev/null +++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs @@ -0,0 +1,309 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module provide `DataFileWriter`. + +use crate::spec::{DataContentType, DataFileBuilder}; +use crate::writer::file_writer::FileWriter; +use crate::writer::CurrentFileStatus; +use crate::writer::{file_writer::FileWriterBuilder, IcebergWriter, IcebergWriterBuilder}; +use crate::Result; +use arrow_array::RecordBatch; +use itertools::Itertools; + +/// Builder for `DataFileWriter`. +#[derive(Clone)] +pub struct DataFileWriterBuilder { + inner: B, +} + +impl DataFileWriterBuilder { + /// Create a new `DataFileWriterBuilder` using a `FileWriterBuilder`. + pub fn new(inner: B) -> Self { + Self { inner } + } +} + +#[allow(async_fn_in_trait)] +impl IcebergWriterBuilder for DataFileWriterBuilder { + type R = DataFileWriter; + + async fn build(self) -> Result { + Ok(DataFileWriter { + inner_writer: self.inner.clone().build().await?, + builder: self.inner, + }) + } +} + +/// A writer write data is within one spec/partition. +pub struct DataFileWriter { + builder: B, + inner_writer: B::R, +} + +#[async_trait::async_trait] +impl IcebergWriter for DataFileWriter { + async fn write(&mut self, batch: RecordBatch) -> Result<()> { + self.inner_writer.write(&batch).await + } + + async fn flush(&mut self) -> Result> { + let writer = std::mem::replace(&mut self.inner_writer, self.builder.clone().build().await?); + let res = writer + .close() + .await? + .into_iter() + .map(|mut res| { + res.content(DataContentType::Data); + res + }) + .collect_vec(); + Ok(res) + } +} + +impl CurrentFileStatus for DataFileWriter { + fn current_file_path(&self) -> String { + self.inner_writer.current_file_path() + } + + fn current_row_num(&self) -> usize { + self.inner_writer.current_row_num() + } + + fn current_written_size(&self) -> usize { + self.inner_writer.current_written_size() + } +} + +#[cfg(test)] +mod test { + use std::{collections::HashMap, sync::Arc}; + + use arrow_array::{types::Int64Type, ArrayRef, Int64Array, RecordBatch, StructArray}; + use parquet::{arrow::PARQUET_FIELD_ID_META_KEY, file::properties::WriterProperties}; + use tempfile::TempDir; + + use crate::{ + io::FileIOBuilder, + spec::{DataFileFormat, Struct}, + writer::{ + base_writer::data_file_writer::DataFileWriterBuilder, + file_writer::{ + location_generator::{test::MockLocationGenerator, DefaultFileNameGenerator}, + ParquetWriterBuilder, + }, + tests::check_parquet_data_file, + IcebergWriter, IcebergWriterBuilder, + }, + }; + + #[tokio::test] + async fn test_data_file_writer() -> Result<(), anyhow::Error> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // prepare data + // Int, Struct(Int), String, List(Int), Struct(Struct(Int)) + let schema = { + let fields = vec![ + arrow_schema::Field::new("col0", arrow_schema::DataType::Int64, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "0".to_string(), + )])), + arrow_schema::Field::new( + "col1", + arrow_schema::DataType::Struct( + vec![arrow_schema::Field::new( + "sub_col", + arrow_schema::DataType::Int64, + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "-1".to_string(), + )]))] + .into(), + ), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + arrow_schema::Field::new("col2", arrow_schema::DataType::Utf8, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]), + ), + arrow_schema::Field::new( + "col3", + arrow_schema::DataType::List(Arc::new( + arrow_schema::Field::new("item", arrow_schema::DataType::Int64, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "-1".to_string(), + )])), + )), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )])), + arrow_schema::Field::new( + "col4", + arrow_schema::DataType::Struct( + vec![arrow_schema::Field::new( + "sub_col", + arrow_schema::DataType::Struct( + vec![arrow_schema::Field::new( + "sub_sub_col", + arrow_schema::DataType::Int64, + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "-1".to_string(), + )]))] + .into(), + ), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "-1".to_string(), + )]))] + .into(), + ), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "4".to_string(), + )])), + ]; + Arc::new(arrow_schema::Schema::new(fields)) + }; + let col0 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef; + let col1 = Arc::new(StructArray::new( + vec![ + arrow_schema::Field::new("sub_col", arrow_schema::DataType::Int64, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "-1".to_string(), + )])), + ] + .into(), + vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))], + None, + )); + let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![ + "test"; + 1024 + ])) as ArrayRef; + let col3 = Arc::new({ + let list_parts = arrow_array::ListArray::from_iter_primitive::(vec![ + Some( + vec![Some(1),] + ); + 1024 + ]) + .into_parts(); + arrow_array::ListArray::new( + Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "-1".to_string(), + )]))), + list_parts.1, + list_parts.2, + list_parts.3, + ) + }) as ArrayRef; + let col4 = Arc::new(StructArray::new( + vec![arrow_schema::Field::new( + "sub_col", + arrow_schema::DataType::Struct( + vec![arrow_schema::Field::new( + "sub_sub_col", + arrow_schema::DataType::Int64, + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "-1".to_string(), + )]))] + .into(), + ), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "-1".to_string(), + )]))] + .into(), + vec![Arc::new(StructArray::new( + vec![ + arrow_schema::Field::new("sub_sub_col", arrow_schema::DataType::Int64, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "-1".to_string(), + )])), + ] + .into(), + vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))], + None, + ))], + None, + )); + let to_write = + RecordBatch::try_new(schema.clone(), vec![col0, col1, col2, col3, col4]).unwrap(); + + // prepare writer + let pb = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + to_write.schema(), + file_io.clone(), + location_gen, + file_name_gen, + ); + let mut data_file_writer = DataFileWriterBuilder::new(pb).build().await?; + + for _ in 0..3 { + // write + data_file_writer.write(to_write.clone()).await?; + let res = data_file_writer.flush().await?; + assert_eq!(res.len(), 1); + let data_file = res + .into_iter() + .next() + .unwrap() + .partition(Struct::empty()) + .build() + .unwrap(); + + // check + check_parquet_data_file(&file_io, &data_file, &to_write).await; + } + + Ok(()) + } +} diff --git a/crates/iceberg/src/writer/base_writer/mod.rs b/crates/iceberg/src/writer/base_writer/mod.rs new file mode 100644 index 000000000..37da2ab81 --- /dev/null +++ b/crates/iceberg/src/writer/base_writer/mod.rs @@ -0,0 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Base writer module contains the basic writer provide by iceberg: `DataFileWriter`, `PositionDeleteFileWriter`, `EqualityDeleteFileWriter`. + +pub mod data_file_writer; diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 3ec1a1b14..95efaa69c 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -256,9 +256,7 @@ mod tests { use arrow_array::Int64Array; use arrow_array::RecordBatch; use arrow_array::StructArray; - use bytes::Bytes; - use futures::AsyncReadExt; - use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use arrow_select::concat::concat_batches; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use tempfile::TempDir; @@ -267,6 +265,7 @@ mod tests { use crate::spec::Struct; use crate::writer::file_writer::location_generator::test::MockLocationGenerator; use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; + use crate::writer::tests::check_parquet_data_file; #[derive(Clone)] struct TestLocationGen; @@ -318,53 +317,9 @@ mod tests { .build() .unwrap(); - // read the written file - let mut input_file = file_io - .new_input(data_file.file_path.clone()) - .unwrap() - .reader() - .await - .unwrap(); - let mut res = vec![]; - let file_size = input_file.read_to_end(&mut res).await.unwrap(); - let reader_builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(res)).unwrap(); - let metadata = reader_builder.metadata().clone(); - - // check data - let mut reader = reader_builder.build().unwrap(); - let res = reader.next().unwrap().unwrap(); - assert_eq!(to_write, res); - let res = reader.next().unwrap().unwrap(); - assert_eq!(to_write_null, res); - - // check metadata - assert_eq!(metadata.num_row_groups(), 1); - assert_eq!(metadata.row_group(0).num_columns(), 1); - assert_eq!(data_file.file_format, DataFileFormat::Parquet); - assert_eq!( - data_file.record_count, - metadata - .row_groups() - .iter() - .map(|group| group.num_rows()) - .sum::() as u64 - ); - assert_eq!(data_file.file_size_in_bytes, file_size as u64); - assert_eq!(data_file.column_sizes.len(), 1); - assert_eq!( - *data_file.column_sizes.get(&0).unwrap(), - metadata.row_group(0).column(0).compressed_size() as u64 - ); - assert_eq!(data_file.value_counts.len(), 1); - assert_eq!(*data_file.value_counts.get(&0).unwrap(), 2048); - assert_eq!(data_file.null_value_counts.len(), 1); - assert_eq!(*data_file.null_value_counts.get(&0).unwrap(), 1024); - assert_eq!(data_file.key_metadata.len(), 0); - assert_eq!(data_file.split_offsets.len(), 1); - assert_eq!( - *data_file.split_offsets.first().unwrap(), - metadata.row_group(0).file_offset().unwrap() - ); + // check the written file + let expect_batch = concat_batches(&schema, vec![&to_write, &to_write_null]).unwrap(); + check_parquet_data_file(&file_io, &data_file, &expect_batch).await; Ok(()) } @@ -556,57 +511,8 @@ mod tests { .build() .unwrap(); - // read the written file - let mut input_file = file_io - .new_input(data_file.file_path.clone()) - .unwrap() - .reader() - .await - .unwrap(); - let mut res = vec![]; - let file_size = input_file.read_to_end(&mut res).await.unwrap(); - let reader_builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(res)).unwrap(); - let metadata = reader_builder.metadata().clone(); - - // check data - let mut reader = reader_builder.build().unwrap(); - let res = reader.next().unwrap().unwrap(); - assert_eq!(to_write, res); - - // check metadata - assert_eq!(metadata.num_row_groups(), 1); - assert_eq!(metadata.row_group(0).num_columns(), 5); - assert_eq!(data_file.file_format, DataFileFormat::Parquet); - assert_eq!( - data_file.record_count, - metadata - .row_groups() - .iter() - .map(|group| group.num_rows()) - .sum::() as u64 - ); - assert_eq!(data_file.file_size_in_bytes, file_size as u64); - assert_eq!(data_file.column_sizes.len(), 5); - assert_eq!( - *data_file.column_sizes.get(&0).unwrap(), - metadata.row_group(0).column(0).compressed_size() as u64 - ); - assert_eq!(data_file.value_counts.len(), 5); - data_file - .value_counts - .iter() - .for_each(|(_, v)| assert_eq!(*v, 1024)); - assert_eq!(data_file.null_value_counts.len(), 5); - data_file - .null_value_counts - .iter() - .for_each(|(_, v)| assert_eq!(*v, 0)); - assert_eq!(data_file.key_metadata.len(), 0); - assert_eq!(data_file.split_offsets.len(), 1); - assert_eq!( - *data_file.split_offsets.first().unwrap(), - metadata.row_group(0).file_offset().unwrap() - ); + // check the written file + check_parquet_data_file(&file_io, &data_file, &to_write).await; Ok(()) } diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index ac79d7bd4..f28b57c7a 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -15,14 +15,65 @@ // specific language governing permissions and limitations // under the License. -//! The iceberg writer module. - -use crate::spec::DataFileBuilder; +//! Iceberg writer module. +//! +//! The writer API is designed to be extensible and flexible. Each writer is decoupled and can be create and config independently. User can: +//! 1.Customize the writer using the writer trait. +//! 2.Combine different writer to build a writer which have complex write logic. +//! +//! There are two kinds of writer: +//! 1. FileWriter: Focus on writing record batch to different physical file format.(Such as parquet. orc) +//! 2. IcebergWriter: Focus on the logical format of iceberg table. It will write the data using the FileWriter finally. +//! +//! # Simple example for data file writer: +//! ```ignore +//! // Create a parquet file writer builder. The parameter can get from table. +//! let file_writer_builder = ParquetWriterBuilder::new( +//! 0, +//! WriterProperties::builder().build(), +//! schema, +//! file_io.clone(), +//! loccation_gen, +//! file_name_gen, +//! ) +//! // Create a data file writer using parquet file writer builder. +//! let data_file_builder = DataFileBuilder::new(file_writer_builder); +//! // Build the data file writer. +//! let data_file_writer = data_file_builder.build().await.unwrap(); +//! +//! data_file_writer.write(&record_batch).await.unwrap(); +//! let data_files = data_file_writer.flush().await.unwrap(); +//! ``` +pub mod base_writer; pub mod file_writer; +use crate::{spec::DataFileBuilder, Result}; +use arrow_array::RecordBatch; + +type DefaultInput = RecordBatch; type DefaultOutput = Vec; +/// The builder for iceberg writer. +#[allow(async_fn_in_trait)] +pub trait IcebergWriterBuilder: + Send + Clone + 'static +{ + /// The associated writer type. + type R: IcebergWriter; + /// Build the iceberg writer. + async fn build(self) -> Result; +} + +/// The iceberg writer used to write data to iceberg table. +#[async_trait::async_trait] +pub trait IcebergWriter: Send + 'static { + /// Write data to iceberg table. + async fn write(&mut self, input: I) -> Result<()>; + /// Flush the writer and return the write result. + async fn flush(&mut self) -> Result; +} + /// The current file status of iceberg writer. It implement for the writer which write a single /// file. pub trait CurrentFileStatus { @@ -33,3 +84,100 @@ pub trait CurrentFileStatus { /// Get the current file written size. fn current_written_size(&self) -> usize; } + +#[cfg(test)] +mod tests { + use arrow_array::RecordBatch; + use arrow_select::concat::concat_batches; + use bytes::Bytes; + use futures::AsyncReadExt; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + + use crate::{ + io::FileIO, + spec::{DataFile, DataFileFormat}, + }; + + use super::IcebergWriter; + + fn _guarantee_object_safe(_: &dyn IcebergWriter) {} + + // This function check: + // The data of the written parquet file is correct. + // The metadata of the data file is consistent with the written parquet file. + pub(crate) async fn check_parquet_data_file( + file_io: &FileIO, + data_file: &DataFile, + batch: &RecordBatch, + ) { + assert_eq!(data_file.file_format, DataFileFormat::Parquet); + + // read the written file + let mut input_file = file_io + .new_input(data_file.file_path.clone()) + .unwrap() + .reader() + .await + .unwrap(); + let mut res = vec![]; + let file_size = input_file.read_to_end(&mut res).await.unwrap(); + let reader_builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(res)).unwrap(); + let metadata = reader_builder.metadata().clone(); + + // check data + let reader = reader_builder.build().unwrap(); + let batches = reader.map(|batch| batch.unwrap()).collect::>(); + let res = concat_batches(&batch.schema(), &batches).unwrap(); + assert_eq!(*batch, res); + + // check metadata + let expect_column_num = batch.num_columns(); + + assert_eq!( + data_file.record_count, + metadata + .row_groups() + .iter() + .map(|group| group.num_rows()) + .sum::() as u64 + ); + + assert_eq!(data_file.file_size_in_bytes, file_size as u64); + + assert_eq!(data_file.column_sizes.len(), expect_column_num); + data_file.column_sizes.iter().for_each(|(&k, &v)| { + let expect = metadata + .row_groups() + .iter() + .map(|group| group.column(k as usize).compressed_size()) + .sum::() as u64; + assert_eq!(v, expect); + }); + + assert_eq!(data_file.value_counts.len(), expect_column_num); + data_file.value_counts.iter().for_each(|(_, &v)| { + let expect = metadata + .row_groups() + .iter() + .map(|group| group.num_rows()) + .sum::() as u64; + assert_eq!(v, expect); + }); + + assert_eq!(data_file.null_value_counts.len(), expect_column_num); + data_file.null_value_counts.iter().for_each(|(&k, &v)| { + let expect = batch.column(k as usize).null_count() as u64; + assert_eq!(v, expect); + }); + + assert_eq!(data_file.split_offsets.len(), metadata.num_row_groups()); + data_file + .split_offsets + .iter() + .enumerate() + .for_each(|(i, &v)| { + let expect = metadata.row_groups()[i].file_offset().unwrap(); + assert_eq!(v, expect); + }); + } +} From e8244770c2a35c5f7b7ea0c243c281655ed993fd Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Fri, 22 Mar 2024 19:45:01 +0800 Subject: [PATCH 2/3] refine --- .../writer/base_writer/data_file_writer.rs | 62 ++++++++++++------- crates/iceberg/src/writer/file_writer/mod.rs | 6 +- .../src/writer/file_writer/parquet_writer.rs | 18 +++--- crates/iceberg/src/writer/mod.rs | 10 +-- 4 files changed, 57 insertions(+), 39 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs b/crates/iceberg/src/writer/base_writer/data_file_writer.rs index 491b122d4..9090baff7 100644 --- a/crates/iceberg/src/writer/base_writer/data_file_writer.rs +++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs @@ -17,7 +17,7 @@ //! This module provide `DataFileWriter`. -use crate::spec::{DataContentType, DataFileBuilder}; +use crate::spec::{DataContentType, DataFile, Struct}; use crate::writer::file_writer::FileWriter; use crate::writer::CurrentFileStatus; use crate::writer::{file_writer::FileWriterBuilder, IcebergWriter, IcebergWriterBuilder}; @@ -38,14 +38,30 @@ impl DataFileWriterBuilder { } } -#[allow(async_fn_in_trait)] +/// Config for `DataFileWriter`. +pub struct DataFileWriterConfig { + partition_value: Struct, +} + +impl DataFileWriterConfig { + /// Create a new `DataFileWriterConfig` with partition value. + pub fn new(partition_value: Option) -> Self { + Self { + partition_value: partition_value.unwrap_or(Struct::empty()), + } + } +} + +#[async_trait::async_trait] impl IcebergWriterBuilder for DataFileWriterBuilder { type R = DataFileWriter; + type C = DataFileWriterConfig; - async fn build(self) -> Result { + async fn build(self, config: Self::C) -> Result { Ok(DataFileWriter { inner_writer: self.inner.clone().build().await?, builder: self.inner, + partition_value: config.partition_value, }) } } @@ -54,6 +70,7 @@ impl IcebergWriterBuilder for DataFileWriterBuilder { pub struct DataFileWriter { builder: B, inner_writer: B::R, + partition_value: Struct, } #[async_trait::async_trait] @@ -62,7 +79,7 @@ impl IcebergWriter for DataFileWriter { self.inner_writer.write(&batch).await } - async fn flush(&mut self) -> Result> { + async fn flush(&mut self) -> Result> { let writer = std::mem::replace(&mut self.inner_writer, self.builder.clone().build().await?); let res = writer .close() @@ -70,7 +87,8 @@ impl IcebergWriter for DataFileWriter { .into_iter() .map(|mut res| { res.content(DataContentType::Data); - res + res.partition(self.partition_value.clone()); + res.build().expect("Guaranteed to be valid") }) .collect_vec(); Ok(res) @@ -101,9 +119,9 @@ mod test { use crate::{ io::FileIOBuilder, - spec::{DataFileFormat, Struct}, + spec::DataFileFormat, writer::{ - base_writer::data_file_writer::DataFileWriterBuilder, + base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig}, file_writer::{ location_generator::{test::MockLocationGenerator, DefaultFileNameGenerator}, ParquetWriterBuilder, @@ -141,7 +159,7 @@ mod test { ) .with_metadata(HashMap::from([( PARQUET_FIELD_ID_META_KEY.to_string(), - "-1".to_string(), + "5".to_string(), )]))] .into(), ), @@ -160,7 +178,7 @@ mod test { arrow_schema::Field::new("item", arrow_schema::DataType::Int64, true) .with_metadata(HashMap::from([( PARQUET_FIELD_ID_META_KEY.to_string(), - "-1".to_string(), + "6".to_string(), )])), )), true, @@ -182,7 +200,7 @@ mod test { ) .with_metadata(HashMap::from([( PARQUET_FIELD_ID_META_KEY.to_string(), - "-1".to_string(), + "7".to_string(), )]))] .into(), ), @@ -190,7 +208,7 @@ mod test { ) .with_metadata(HashMap::from([( PARQUET_FIELD_ID_META_KEY.to_string(), - "-1".to_string(), + "8".to_string(), )]))] .into(), ), @@ -209,7 +227,7 @@ mod test { arrow_schema::Field::new("sub_col", arrow_schema::DataType::Int64, true) .with_metadata(HashMap::from([( PARQUET_FIELD_ID_META_KEY.to_string(), - "-1".to_string(), + "5".to_string(), )])), ] .into(), @@ -231,7 +249,7 @@ mod test { arrow_array::ListArray::new( Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([( PARQUET_FIELD_ID_META_KEY.to_string(), - "-1".to_string(), + "6".to_string(), )]))), list_parts.1, list_parts.2, @@ -249,7 +267,7 @@ mod test { ) .with_metadata(HashMap::from([( PARQUET_FIELD_ID_META_KEY.to_string(), - "-1".to_string(), + "7".to_string(), )]))] .into(), ), @@ -257,7 +275,7 @@ mod test { ) .with_metadata(HashMap::from([( PARQUET_FIELD_ID_META_KEY.to_string(), - "-1".to_string(), + "8".to_string(), )]))] .into(), vec![Arc::new(StructArray::new( @@ -265,7 +283,7 @@ mod test { arrow_schema::Field::new("sub_sub_col", arrow_schema::DataType::Int64, true) .with_metadata(HashMap::from([( PARQUET_FIELD_ID_META_KEY.to_string(), - "-1".to_string(), + "7".to_string(), )])), ] .into(), @@ -285,20 +303,16 @@ mod test { location_gen, file_name_gen, ); - let mut data_file_writer = DataFileWriterBuilder::new(pb).build().await?; + let mut data_file_writer = DataFileWriterBuilder::new(pb) + .build(DataFileWriterConfig::new(None)) + .await?; for _ in 0..3 { // write data_file_writer.write(to_write.clone()).await?; let res = data_file_writer.flush().await?; assert_eq!(res.len(), 1); - let data_file = res - .into_iter() - .next() - .unwrap() - .partition(Struct::empty()) - .build() - .unwrap(); + let data_file = res.into_iter().next().unwrap(); // check check_parquet_data_file(&file_io, &data_file, &to_write).await; diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs index f2848f4d4..0340df681 100644 --- a/crates/iceberg/src/writer/file_writer/mod.rs +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -17,8 +17,8 @@ //! This module contains the writer for data file format supported by iceberg: parquet, orc. -use super::{CurrentFileStatus, DefaultOutput}; -use crate::Result; +use super::CurrentFileStatus; +use crate::{spec::DataFileBuilder, Result}; use arrow_array::RecordBatch; use futures::Future; @@ -28,6 +28,8 @@ mod track_writer; pub mod location_generator; +type DefaultOutput = Vec; + /// File writer builder trait. pub trait FileWriterBuilder: Send + Clone + 'static { /// The associated file writer type. diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 95efaa69c..b743d8435 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -352,7 +352,7 @@ mod tests { ) .with_metadata(HashMap::from([( PARQUET_FIELD_ID_META_KEY.to_string(), - "-1".to_string(), + "5".to_string(), )]))] .into(), ), @@ -371,7 +371,7 @@ mod tests { arrow_schema::Field::new("item", arrow_schema::DataType::Int64, true) .with_metadata(HashMap::from([( PARQUET_FIELD_ID_META_KEY.to_string(), - "-1".to_string(), + "6".to_string(), )])), )), true, @@ -393,7 +393,7 @@ mod tests { ) .with_metadata(HashMap::from([( PARQUET_FIELD_ID_META_KEY.to_string(), - "-1".to_string(), + "7".to_string(), )]))] .into(), ), @@ -401,7 +401,7 @@ mod tests { ) .with_metadata(HashMap::from([( PARQUET_FIELD_ID_META_KEY.to_string(), - "-1".to_string(), + "8".to_string(), )]))] .into(), ), @@ -420,7 +420,7 @@ mod tests { arrow_schema::Field::new("sub_col", arrow_schema::DataType::Int64, true) .with_metadata(HashMap::from([( PARQUET_FIELD_ID_META_KEY.to_string(), - "-1".to_string(), + "5".to_string(), )])), ] .into(), @@ -442,7 +442,7 @@ mod tests { arrow_array::ListArray::new( Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([( PARQUET_FIELD_ID_META_KEY.to_string(), - "-1".to_string(), + "6".to_string(), )]))), list_parts.1, list_parts.2, @@ -460,7 +460,7 @@ mod tests { ) .with_metadata(HashMap::from([( PARQUET_FIELD_ID_META_KEY.to_string(), - "-1".to_string(), + "7".to_string(), )]))] .into(), ), @@ -468,7 +468,7 @@ mod tests { ) .with_metadata(HashMap::from([( PARQUET_FIELD_ID_META_KEY.to_string(), - "-1".to_string(), + "8".to_string(), )]))] .into(), vec![Arc::new(StructArray::new( @@ -476,7 +476,7 @@ mod tests { arrow_schema::Field::new("sub_sub_col", arrow_schema::DataType::Int64, true) .with_metadata(HashMap::from([( PARQUET_FIELD_ID_META_KEY.to_string(), - "-1".to_string(), + "7".to_string(), )])), ] .into(), diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index f28b57c7a..499e914e7 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -48,21 +48,23 @@ pub mod base_writer; pub mod file_writer; -use crate::{spec::DataFileBuilder, Result}; +use crate::{spec::DataFile, Result}; use arrow_array::RecordBatch; type DefaultInput = RecordBatch; -type DefaultOutput = Vec; +type DefaultOutput = Vec; /// The builder for iceberg writer. -#[allow(async_fn_in_trait)] +#[async_trait::async_trait] pub trait IcebergWriterBuilder: Send + Clone + 'static { /// The associated writer type. type R: IcebergWriter; + /// The associated writer config type used to build the writer. + type C; /// Build the iceberg writer. - async fn build(self) -> Result; + async fn build(self, config: Self::C) -> Result; } /// The iceberg writer used to write data to iceberg table. From b0b63a3569683e9aa392de323dcc38ccaeb87331 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 11 Apr 2024 13:40:25 +0800 Subject: [PATCH 3/3] refine the interface --- .../writer/base_writer/data_file_writer.rs | 39 ++++++++----------- crates/iceberg/src/writer/mod.rs | 16 ++++++-- 2 files changed, 30 insertions(+), 25 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs b/crates/iceberg/src/writer/base_writer/data_file_writer.rs index 9090baff7..442c9f164 100644 --- a/crates/iceberg/src/writer/base_writer/data_file_writer.rs +++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs @@ -59,8 +59,7 @@ impl IcebergWriterBuilder for DataFileWriterBuilder { async fn build(self, config: Self::C) -> Result { Ok(DataFileWriter { - inner_writer: self.inner.clone().build().await?, - builder: self.inner, + inner_writer: Some(self.inner.clone().build().await?), partition_value: config.partition_value, }) } @@ -68,20 +67,19 @@ impl IcebergWriterBuilder for DataFileWriterBuilder { /// A writer write data is within one spec/partition. pub struct DataFileWriter { - builder: B, - inner_writer: B::R, + inner_writer: Option, partition_value: Struct, } #[async_trait::async_trait] impl IcebergWriter for DataFileWriter { async fn write(&mut self, batch: RecordBatch) -> Result<()> { - self.inner_writer.write(&batch).await + self.inner_writer.as_mut().unwrap().write(&batch).await } - async fn flush(&mut self) -> Result> { - let writer = std::mem::replace(&mut self.inner_writer, self.builder.clone().build().await?); - let res = writer + async fn close(&mut self) -> Result> { + let writer = self.inner_writer.take().unwrap(); + Ok(writer .close() .await? .into_iter() @@ -90,22 +88,21 @@ impl IcebergWriter for DataFileWriter { res.partition(self.partition_value.clone()); res.build().expect("Guaranteed to be valid") }) - .collect_vec(); - Ok(res) + .collect_vec()) } } impl CurrentFileStatus for DataFileWriter { fn current_file_path(&self) -> String { - self.inner_writer.current_file_path() + self.inner_writer.as_ref().unwrap().current_file_path() } fn current_row_num(&self) -> usize { - self.inner_writer.current_row_num() + self.inner_writer.as_ref().unwrap().current_row_num() } fn current_written_size(&self) -> usize { - self.inner_writer.current_written_size() + self.inner_writer.as_ref().unwrap().current_written_size() } } @@ -307,16 +304,14 @@ mod test { .build(DataFileWriterConfig::new(None)) .await?; - for _ in 0..3 { - // write - data_file_writer.write(to_write.clone()).await?; - let res = data_file_writer.flush().await?; - assert_eq!(res.len(), 1); - let data_file = res.into_iter().next().unwrap(); + // write + data_file_writer.write(to_write.clone()).await?; + let res = data_file_writer.close().await?; + assert_eq!(res.len(), 1); + let data_file = res.into_iter().next().unwrap(); - // check - check_parquet_data_file(&file_io, &data_file, &to_write).await; - } + // check + check_parquet_data_file(&file_io, &data_file, &to_write).await; Ok(()) } diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index 499e914e7..7618d2ec3 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -72,8 +72,11 @@ pub trait IcebergWriterBuilder: pub trait IcebergWriter: Send + 'static { /// Write data to iceberg table. async fn write(&mut self, input: I) -> Result<()>; - /// Flush the writer and return the write result. - async fn flush(&mut self) -> Result; + /// Close the writer and return the written data files. + /// If close failed, the data written before maybe be lost. User may need to recreate the writer and rewrite the data again. + /// # NOTE + /// After close, no matter successfully or fail,the writer should never be used again, otherwise the writer will panic. + async fn close(&mut self) -> Result; } /// The current file status of iceberg writer. It implement for the writer which write a single @@ -90,6 +93,7 @@ pub trait CurrentFileStatus { #[cfg(test)] mod tests { use arrow_array::RecordBatch; + use arrow_schema::Schema; use arrow_select::concat::concat_batches; use bytes::Bytes; use futures::AsyncReadExt; @@ -102,7 +106,13 @@ mod tests { use super::IcebergWriter; - fn _guarantee_object_safe(_: &dyn IcebergWriter) {} + // This function is used to guarantee the trait can be used as a object safe trait. + async fn _guarantee_object_safe(mut w: Box) { + let _ = w + .write(RecordBatch::new_empty(Schema::empty().into())) + .await; + let _ = w.close().await; + } // This function check: // The data of the written parquet file is correct.