-
Notifications
You must be signed in to change notification settings - Fork 135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
build: Restore CI by making parquet and arrow version consistent #280
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,10 +17,8 @@ | |
|
||
use super::TransformFunction; | ||
use crate::{Error, ErrorKind, Result}; | ||
use arrow_arith::{ | ||
arity::binary, | ||
temporal::{month_dyn, year_dyn}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They are deprecated in latest arrow release. |
||
}; | ||
use arrow_arith::temporal::DatePart; | ||
use arrow_arith::{arity::binary, temporal::date_part}; | ||
use arrow_array::{ | ||
types::Date32Type, Array, ArrayRef, Date32Array, Int32Array, TimestampMicrosecondArray, | ||
}; | ||
|
@@ -43,8 +41,8 @@ pub struct Year; | |
|
||
impl TransformFunction for Year { | ||
fn transform(&self, input: ArrayRef) -> Result<ArrayRef> { | ||
let array = | ||
year_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?; | ||
let array = date_part(&input, DatePart::Year) | ||
.map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?; | ||
Ok(Arc::<Int32Array>::new( | ||
array | ||
.as_any() | ||
|
@@ -61,15 +59,15 @@ pub struct Month; | |
|
||
impl TransformFunction for Month { | ||
fn transform(&self, input: ArrayRef) -> Result<ArrayRef> { | ||
let year_array = | ||
year_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?; | ||
let year_array = date_part(&input, DatePart::Year) | ||
.map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?; | ||
let year_array: Int32Array = year_array | ||
.as_any() | ||
.downcast_ref::<Int32Array>() | ||
.unwrap() | ||
.unary(|v| 12 * (v - UNIX_EPOCH_YEAR)); | ||
let month_array = | ||
month_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?; | ||
let month_array = date_part(&input, DatePart::Month) | ||
.map_err(|err| Error::new(ErrorKind::Unexpected, format!("{err}")))?; | ||
Ok(Arc::<Int32Array>::new( | ||
binary( | ||
month_array.as_any().downcast_ref::<Int32Array>().unwrap(), | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,6 @@ | |
//! The module contains the file writer for parquet file format. | ||
|
||
use std::{ | ||
cmp::max, | ||
collections::HashMap, | ||
sync::{atomic::AtomicI64, Arc}, | ||
}; | ||
|
@@ -43,9 +42,6 @@ use super::{ | |
/// ParquetWriterBuilder is used to builder a [`ParquetWriter`] | ||
#[derive(Clone)] | ||
pub struct ParquetWriterBuilder<T: LocationGenerator, F: FileNameGenerator> { | ||
/// `buffer_size` determines the initial size of the intermediate buffer. | ||
/// The intermediate buffer will automatically be resized if necessary | ||
init_buffer_size: usize, | ||
props: WriterProperties, | ||
schema: ArrowSchemaRef, | ||
|
||
|
@@ -55,21 +51,16 @@ pub struct ParquetWriterBuilder<T: LocationGenerator, F: FileNameGenerator> { | |
} | ||
|
||
impl<T: LocationGenerator, F: FileNameGenerator> ParquetWriterBuilder<T, F> { | ||
/// To avoid EntiryTooSmall error, we set the minimum buffer size to 8MB if the given buffer size is smaller than it. | ||
const MIN_BUFFER_SIZE: usize = 8 * 1024 * 1024; | ||
|
||
/// Create a new `ParquetWriterBuilder` | ||
/// To construct the write result, the schema should contain the `PARQUET_FIELD_ID_META_KEY` metadata for each field. | ||
pub fn new( | ||
init_buffer_size: usize, | ||
props: WriterProperties, | ||
schema: ArrowSchemaRef, | ||
file_io: FileIO, | ||
location_generator: T, | ||
file_name_generator: F, | ||
) -> Self { | ||
Self { | ||
init_buffer_size, | ||
props, | ||
schema, | ||
file_io, | ||
|
@@ -112,20 +103,14 @@ impl<T: LocationGenerator, F: FileNameGenerator> FileWriterBuilder for ParquetWr | |
.generate_location(&self.file_name_generator.generate_file_name()), | ||
)?; | ||
let inner_writer = TrackWriter::new(out_file.writer().await?, written_size.clone()); | ||
let init_buffer_size = max(Self::MIN_BUFFER_SIZE, self.init_buffer_size); | ||
let writer = AsyncArrowWriter::try_new( | ||
inner_writer, | ||
self.schema.clone(), | ||
init_buffer_size, | ||
Some(self.props), | ||
) | ||
.map_err(|err| { | ||
Error::new( | ||
crate::ErrorKind::Unexpected, | ||
"Failed to build parquet writer.", | ||
) | ||
.with_source(err) | ||
})?; | ||
let writer = AsyncArrowWriter::try_new(inner_writer, self.schema.clone(), Some(self.props)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
.map_err(|err| { | ||
Error::new( | ||
crate::ErrorKind::Unexpected, | ||
"Failed to build parquet writer.", | ||
) | ||
.with_source(err) | ||
})?; | ||
|
||
Ok(ParquetWriter { | ||
writer, | ||
|
@@ -311,7 +296,6 @@ mod tests { | |
|
||
// write data | ||
let mut pw = ParquetWriterBuilder::new( | ||
0, | ||
WriterProperties::builder().build(), | ||
to_write.schema(), | ||
file_io.clone(), | ||
|
@@ -551,7 +535,6 @@ mod tests { | |
|
||
// write data | ||
let mut pw = ParquetWriterBuilder::new( | ||
0, | ||
WriterProperties::builder().build(), | ||
to_write.schema(), | ||
file_io.clone(), | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why previously parquet version is not consistent with arrow version. Different versions could cause errors like:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to change it to
parquet >= 46 <51
? We faced similar problems in icelake. cc @Xuanwo I still recommend to pin the arrow version since arrow doesn't ensure compatibility during major version upgrades.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can temporarily fix the CI by pinning
arrow
andparquet
. However, since our users rely on various versions ofarrow
, we cannot force them to always use the latest version. Ultimately, we will determine a MSAV through different methods.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And commiting
Cargo.lock
is still recommanded to make sure CI works as expected.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, let's pin this first to unlock the ci failure.