Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE-4668: Enable Lz4Raw & rm parquet_format_async_temp #4726

Merged
merged 12 commits into from
Apr 8, 2022
30 changes: 13 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,4 @@ object = { opt-level = 3 }
rustc-demangle = { opt-level = 3 }

[patch.crates-io]
parquet2 = { version = "0.10", optional = true, git = "https://github.com/datafuse-extras/parquet2", rev = "b3efb4e" }
parquet2 = { version = "0.10", optional = true, git = "https://github.com/datafuse-extras/parquet2", rev = "daae989" }
2 changes: 0 additions & 2 deletions common/arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,4 @@ arrow-format = { version = "0.4.0", features = ["flight-data", "flight-service",
futures = "0.3.21"
parquet2 = { version = "0.10.3", default_features = false }

parquet-format-async-temp = "=0.2.0"

[dev-dependencies]
2 changes: 1 addition & 1 deletion common/arrow/src/parquet_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use arrow::io::parquet::write::to_parquet_schema;
use arrow::io::parquet::write::RowGroupIterator;
use parquet2::write::FileWriter;
use parquet2::write::WriteOptions;
use parquet_format_async_temp::FileMetaData;
use parquet2::FileMetaData;

// a simple wrapper for code reuse
pub fn write_parquet_file<W: Write, A, I>(
Expand Down
2 changes: 1 addition & 1 deletion common/streams/tests/it/sources/source_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn test_source_parquet() -> Result<()> {
use common_arrow::arrow::io::parquet::write::*;
let options = WriteOptions {
write_statistics: true,
compression: Compression::Lz4, // let's begin with lz4
compression: Compression::Lz4Raw,
version: Version::V2,
};

Expand Down
1 change: 0 additions & 1 deletion query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ num_cpus = "1.13.1"
octocrab = "0.15.4"
once_cell = "1.10.0"
opendal = "0.5.1"
parquet-format-async-temp = "=0.2.0"
paste = "1.0.7"
petgraph = "0.6.0"
poem = { version = "=1.3.16", features = ["rustls", "multipart", "compression"] }
Expand Down
2 changes: 1 addition & 1 deletion query/src/interpreters/interpreter_user_stage_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl Interpreter for CreateUserStageInterpreter {
if user_stage.stage_type == StageType::Internal {
let prefix = format!("stage/{}/", user_stage.stage_name);
let op = self.ctx.get_storage_operator()?;
op.object(&prefix).create().await?;
op.object(&prefix).create().await?
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
}

let _create_stage = user_mgr
Expand Down
5 changes: 5 additions & 0 deletions query/src/storages/fuse/fuse_part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use common_exception::Result;
use common_planners::PartInfo;
use common_planners::PartInfoPtr;

use crate::storages::fuse::meta::Compression;

#[derive(serde::Serialize, serde::Deserialize, PartialEq)]
pub struct ColumnMeta {
pub offset: u64,
Expand All @@ -46,6 +48,7 @@ pub struct FusePartInfo {
pub format_version: u64,
pub nums_rows: usize,
pub columns_meta: HashMap<usize, ColumnMeta>,
pub compression: Compression,
}

#[typetag::serde(name = "fuse")]
Expand All @@ -68,12 +71,14 @@ impl FusePartInfo {
format_version: u64,
rows_count: u64,
columns_meta: HashMap<usize, ColumnMeta>,
compression: Compression,
) -> Arc<Box<dyn PartInfo>> {
Arc::new(Box::new(FusePartInfo {
location,
format_version,
columns_meta,
nums_rows: rows_count as usize,
compression,
}))
}

Expand Down
15 changes: 13 additions & 2 deletions query/src/storages/fuse/io/read/block_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use common_arrow::arrow::io::parquet::read::column_iter_to_arrays;
use common_arrow::arrow::io::parquet::read::ArrayIter;
use common_arrow::arrow::io::parquet::read::RowGroupDeserializer;
use common_arrow::arrow::io::parquet::write::to_parquet_schema;
use common_arrow::parquet::compression::Compression;
use common_arrow::parquet::compression::Compression as ParquetCompression;
use common_arrow::parquet::metadata::ColumnDescriptor;
use common_arrow::parquet::metadata::SchemaDescriptor;
use common_arrow::parquet::read::BasicDecompressor;
Expand All @@ -41,6 +41,7 @@ use opendal::Operator;

use crate::storages::fuse::fuse_part::ColumnMeta;
use crate::storages::fuse::fuse_part::FusePartInfo;
use crate::storages::fuse::meta::Compression;

#[derive(Clone)]
pub struct BlockReader {
Expand Down Expand Up @@ -76,11 +77,12 @@ impl BlockReader {
rows: usize,
descriptor: &ColumnDescriptor,
field: Field,
compression: &Compression,
) -> Result<ArrayIter<'static>> {
let pages = PageIterator::new(
std::io::Cursor::new(chunk),
meta.num_values as i64,
Compression::Lz4,
Self::to_parquet_compression(compression),
descriptor.clone(),
Arc::new(|_, _| true),
vec![],
Expand Down Expand Up @@ -137,6 +139,7 @@ impl BlockReader {
rows,
column_descriptor,
field,
&part.compression,
)?);
}

Expand Down Expand Up @@ -165,6 +168,7 @@ impl BlockReader {
num_rows,
column_descriptor,
field,
&part.compression,
)?);
}

Expand Down Expand Up @@ -224,4 +228,11 @@ impl BlockReader {
Some(Ok(chunk)) => DataBlock::from_chunk(&self.projected_schema, &chunk),
}
}

fn to_parquet_compression(meta_compression: &Compression) -> ParquetCompression {
match meta_compression {
Compression::Lz4 => ParquetCompression::Lz4,
Compression::Lz4Raw => ParquetCompression::Lz4Raw,
}
}
}
2 changes: 1 addition & 1 deletion query/src/storages/fuse/io/write/block_stream_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use common_arrow::parquet::FileMetaData;
use common_datablocks::DataBlock;
use common_datavalues::DataSchema;
use common_exception::ErrorCode;
Expand All @@ -25,7 +26,6 @@ use futures::stream::Stream;
use futures::StreamExt;
use futures::TryStreamExt;
use opendal::Operator;
use parquet_format_async_temp::FileMetaData;

use super::block_writer;
use crate::storages::fuse::io::TableMetaLocationGenerator;
Expand Down
4 changes: 2 additions & 2 deletions query/src/storages/fuse/io/write/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ use common_arrow::arrow::datatypes::Schema as ArrowSchema;
use common_arrow::arrow::io::parquet::write::WriteOptions;
use common_arrow::arrow::io::parquet::write::*;
use common_arrow::parquet::encoding::Encoding;
use common_arrow::parquet::FileMetaData;
use common_datablocks::DataBlock;
use common_exception::ErrorCode;
use common_exception::Result;
use opendal::Operator;
use parquet_format_async_temp::FileMetaData;

pub async fn write_block(
arrow_schema: &ArrowSchema,
Expand All @@ -33,7 +33,7 @@ pub async fn write_block(
) -> Result<(u64, FileMetaData)> {
let options = WriteOptions {
write_statistics: false,
compression: Compression::Lz4, // let's begin with lz4
compression: Compression::Lz4Raw,
version: Version::V2,
};
let batch = Chunk::try_from(block)?;
Expand Down
16 changes: 15 additions & 1 deletion query/src/storages/fuse/meta/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::collections::HashMap;

use serde::Deserialize;
use serde::Serialize;
use uuid::Uuid;

use crate::storages::index::ColumnStatistics;
Expand All @@ -23,7 +25,7 @@ pub type FormatVersion = u64;
pub type SnapshotId = Uuid;
pub type Location = (String, FormatVersion);

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)]
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub struct Statistics {
pub row_count: u64,
pub block_count: u64,
Expand All @@ -40,3 +42,15 @@ where Self: Sized
{
const VERSION: u64 = V;
}

#[derive(Serialize, Deserialize, PartialEq, Copy, Clone, Debug)]
pub enum Compression {
Lz4,
Lz4Raw,
}

impl Compression {
pub fn legacy() -> Self {
Compression::Lz4
}
}
1 change: 1 addition & 0 deletions query/src/storages/fuse/meta/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod v1;
mod versions;

pub use common::ColumnId;
pub use common::Compression;
pub use common::Location;
pub use common::SnapshotId;
pub use common::Statistics;
Expand Down
16 changes: 14 additions & 2 deletions query/src/storages/fuse/meta/v1/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
use std::collections::HashMap;

use common_datablocks::DataBlock;
use serde::Deserialize;
use serde::Serialize;

use crate::storages::fuse::meta::common::ColumnId;
use crate::storages::fuse::meta::common::Compression;
use crate::storages::fuse::meta::common::FormatVersion;
use crate::storages::fuse::meta::common::Location;
use crate::storages::fuse::meta::common::Statistics;
Expand All @@ -26,7 +29,7 @@ use crate::storages::fuse::meta::v0::ColumnMeta;
use crate::storages::index::ColumnStatistics;

/// A segment comprises one or more blocks
#[derive(serde::Serialize, serde::Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug)]
pub struct SegmentInfo {
/// format version
format_version: FormatVersion,
Expand All @@ -38,14 +41,22 @@ pub struct SegmentInfo {

/// Meta information of a block
/// Part of and kept inside the [SegmentInfo]
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct BlockMeta {
pub row_count: u64,
pub block_size: u64,
pub file_size: u64,
pub col_stats: HashMap<ColumnId, ColumnStatistics>,
pub col_metas: HashMap<ColumnId, ColumnMeta>,
pub location: Location,

/// Compression algo used to compress the columns of blocks
///
/// If not specified, the legacy algo `Lz4` will be used.
/// `Lz4` is merely for backward compatibility, it will NO longer be
/// used in the write path.
#[serde(default = "Compression::legacy")]
pub compression: Compression,
}

impl SegmentInfo {
Expand Down Expand Up @@ -83,6 +94,7 @@ impl From<v0::BlockMeta> for BlockMeta {
col_stats: s.col_stats,
col_metas: s.col_metas,
location: (s.location.path, DataBlock::VERSION),
compression: Compression::Lz4,
}
}
}
2 changes: 0 additions & 2 deletions query/src/storages/fuse/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ mod append;
mod commit;
mod operation_log;
mod optimize;
mod part_info;
mod read;
mod read_partitions;
mod truncate;

pub use operation_log::AppendOperationLogEntry;
pub use operation_log::TableOperationLog;
pub use part_info::PartInfo;
1 change: 0 additions & 1 deletion query/src/storages/fuse/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ impl FuseTable {
// segments which no longer need to be kept
let seg_delta = prevs.difference(&current_segments).collect::<Vec<_>>();

// TODO rm those deference **
// blocks to be removed
let prev_blocks: HashSet<String> = self
.blocks_of(seg_delta.iter().map(|i| **i), ctx.clone())
Expand Down
Loading