Skip to content

Commit

Permalink
Merge branch 'main' into pre_vacuum2
Browse files Browse the repository at this point in the history
  • Loading branch information
SkyFan2002 authored Sep 5, 2024
2 parents b95dfce + 111d3f8 commit 7abc9c1
Show file tree
Hide file tree
Showing 76 changed files with 1,717 additions and 1,275 deletions.
72 changes: 54 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion scripts/ci/ci-run-sqllogic-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ echo "Run suites using argument: $RUN_DIR"

echo "Starting databend-sqllogic tests"
if [ -z "$RUN_DIR" ]; then
target/${BUILD_PROFILE}/databend-sqllogictests --run_dir temp_table --enable_sandbox --parallel 8
target/${BUILD_PROFILE}/databend-sqllogictests --run_dir temp_table --enable_sandbox --parallel 8
fi
target/${BUILD_PROFILE}/databend-sqllogictests --handlers ${TEST_HANDLERS} ${RUN_DIR} --skip_dir management,explain_native,ee,temp_table --enable_sandbox --parallel 8
5 changes: 0 additions & 5 deletions src/common/arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,9 @@

pub mod arrow;
pub mod native;
mod parquet_read;
mod parquet_write;
pub mod schema_projection;

pub use arrow_format;
pub use parquet2 as parquet;
pub use parquet_read::read_columns_async;
pub use parquet_read::read_columns_many_async;
pub use parquet_write::write_parquet_file;

pub type ArrayRef = Box<dyn crate::arrow::array::Array>;
1 change: 1 addition & 0 deletions src/common/arrow/src/native/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod util;

mod compression;
pub use compression::CommonCompression;
pub type SchemaDescriptor = parquet2::metadata::SchemaDescriptor;
pub mod read;
pub mod stat;
pub mod write;
Expand Down
62 changes: 62 additions & 0 deletions src/common/arrow/src/native/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,20 @@
mod array;
pub mod batch_read;
pub mod deserialize;
use batch_read::batch_read_array;
pub use deserialize::column_iter_to_arrays;
pub use deserialize::ArrayIter;

use crate::arrow::array::Array;
use crate::arrow::datatypes::Field;
use crate::arrow::error::Result;
pub(crate) mod read_basic;
use std::io::BufReader;

use super::PageMeta;
use super::SchemaDescriptor;
use crate::arrow::datatypes::Schema;
use crate::arrow::io::parquet::write::to_parquet_schema;
pub mod reader;

pub trait NativeReadBuf: std::io::BufRead {
Expand Down Expand Up @@ -59,3 +69,55 @@ impl<B: NativeReadBuf + ?Sized> NativeReadBuf for Box<B> {
pub trait PageIterator {
fn swap_buffer(&mut self, buffer: &mut Vec<u8>);
}

#[derive(Clone)]
pub struct NativeColumnsReader {
schema: Schema,
schema_desc: SchemaDescriptor,
}

impl NativeColumnsReader {
pub fn new(schema: Schema) -> Result<Self> {
let schema_desc = to_parquet_schema(&schema)?;
Ok(Self {
schema,
schema_desc,
})
}

/// An iterator adapter that maps [`PageIterator`]s into an iterator of [`Array`]s.
pub fn column_iter_to_arrays<'a, I>(
&self,
readers: Vec<I>,
leaf_indexes: &[usize],
field: Field,
is_nested: bool,
) -> Result<ArrayIter<'a>>
where
I: Iterator<Item = Result<(u64, Vec<u8>)>> + PageIterator + Send + Sync + 'a,
{
let leaves = leaf_indexes
.iter()
.map(|i| self.schema_desc.columns()[*i].clone())
.collect();

column_iter_to_arrays(readers, leaves, field, is_nested)
}

/// Read all pages of column at once.
pub fn batch_read_array<R: NativeReadBuf>(
&self,
readers: Vec<R>,
leaf_indexes: &[usize],
field: Field,
is_nested: bool,
page_metas: Vec<Vec<PageMeta>>,
) -> Result<Box<dyn Array>> {
let leaves = leaf_indexes
.iter()
.map(|i| self.schema_desc.columns()[*i].clone())
.collect();

batch_read_array(readers, leaves, field, is_nested, page_metas)
}
}
2 changes: 1 addition & 1 deletion src/common/arrow/src/native/stat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ mod test {
array.validity().is_some(),
);
let schema = Schema::from(vec![field.clone()]);
let mut writer = NativeWriter::new(&mut bytes, schema, options);
let mut writer = NativeWriter::new(&mut bytes, schema, options).unwrap();

writer.start().unwrap();
writer.write(&Chunk::new(vec![array])).unwrap();
Expand Down
9 changes: 2 additions & 7 deletions src/common/arrow/src/native/write/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use crate::arrow::io::parquet::write::slice_parquet_array;
use crate::arrow::io::parquet::write::to_leaves;
use crate::arrow::io::parquet::write::to_nested;
use crate::arrow::io::parquet::write::to_parquet_leaves;
use crate::arrow::io::parquet::write::SchemaDescriptor;
use crate::native::compression::CommonCompression;
use crate::native::compression::Compression;
use crate::native::ColumnMeta;
Expand All @@ -45,11 +44,7 @@ pub struct WriteOptions {

impl<W: Write> NativeWriter<W> {
/// Encode and write a [`Chunk`] to the file
pub fn encode_chunk(
&mut self,
schema_descriptor: SchemaDescriptor,
chunk: &Chunk<Box<dyn Array>>,
) -> Result<()> {
pub fn encode_chunk(&mut self, chunk: &Chunk<Box<dyn Array>>) -> Result<()> {
let page_size = self
.options
.max_page_size
Expand All @@ -59,7 +54,7 @@ impl<W: Write> NativeWriter<W> {
for (array, type_) in chunk
.arrays()
.iter()
.zip(schema_descriptor.fields().to_vec())
.zip(self.schema_descriptor.fields().to_vec())
{
let array = array.as_ref();
let nested = to_nested(array, &type_)?;
Expand Down
16 changes: 9 additions & 7 deletions src/common/arrow/src/native/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::arrow::io::ipc::write::default_ipc_fields;
use crate::arrow::io::ipc::write::schema_to_bytes;
use crate::arrow::io::parquet::write::to_parquet_schema;
use crate::native::ColumnMeta;
use crate::native::SchemaDescriptor;

#[derive(Clone, Copy, PartialEq, Eq)]
pub(crate) enum State {
Expand All @@ -43,6 +44,7 @@ pub struct NativeWriter<W: Write> {
pub(crate) options: WriteOptions,
/// A reference to the schema, used in validating record batches
pub(crate) schema: Schema,
pub(crate) schema_descriptor: SchemaDescriptor,

/// Record blocks that will be written as part of the strawboat footer
pub metas: Vec<ColumnMeta>,
Expand All @@ -55,26 +57,28 @@ pub struct NativeWriter<W: Write> {
impl<W: Write> NativeWriter<W> {
/// Creates a new [`NativeWriter`] and writes the header to `writer`
pub fn try_new(writer: W, schema: &Schema, options: WriteOptions) -> Result<Self> {
let mut slf = Self::new(writer, schema.clone(), options);
let mut slf = Self::new(writer, schema.clone(), options)?;
slf.start()?;

Ok(slf)
}

/// Creates a new [`NativeWriter`].
pub fn new(writer: W, schema: Schema, options: WriteOptions) -> Self {
pub fn new(writer: W, schema: Schema, options: WriteOptions) -> Result<Self> {
let num_cols = schema.fields.len();
Self {
let schema_descriptor = to_parquet_schema(&schema)?;
Ok(Self {
writer: OffsetWriter {
w: writer,
offset: 0,
},
options,
schema,
schema_descriptor,
metas: Vec::with_capacity(num_cols),
scratch: Vec::with_capacity(0),
state: State::None,
}
})
}

/// Consumes itself into the inner writer
Expand Down Expand Up @@ -113,9 +117,7 @@ impl<W: Write> NativeWriter<W> {
));
}
assert_eq!(chunk.arrays().len(), self.schema.fields.len());

let schema_descriptor = to_parquet_schema(&self.schema)?;
self.encode_chunk(schema_descriptor, chunk)?;
self.encode_chunk(chunk)?;

self.state = State::Written;
Ok(())
Expand Down
Loading

0 comments on commit 7abc9c1

Please sign in to comment.