Skip to content

Commit

Permalink
Merge pull request #4726 from dantengsky/fix-bump-parquet2
Browse files Browse the repository at this point in the history
ISSUE-4668: Enable `Lz4Raw` & rm `parquet_format_async_temp`
  • Loading branch information
BohuTANG authored Apr 8, 2022
2 parents 27a2351 + c8e2ec3 commit b0c7f01
Show file tree
Hide file tree
Showing 21 changed files with 86 additions and 118 deletions.
30 changes: 13 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,4 @@ object = { opt-level = 3 }
rustc-demangle = { opt-level = 3 }

[patch.crates-io]
parquet2 = { version = "0.10", optional = true, git = "https://github.com/datafuse-extras/parquet2", rev = "b3efb4e" }
parquet2 = { version = "0.10", optional = true, git = "https://github.com/datafuse-extras/parquet2", rev = "daae989" }
2 changes: 0 additions & 2 deletions common/arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,4 @@ arrow-format = { version = "0.4.0", features = ["flight-data", "flight-service",
futures = "0.3.21"
parquet2 = { version = "0.10.3", default_features = false }

parquet-format-async-temp = "=0.2.0"

[dev-dependencies]
2 changes: 1 addition & 1 deletion common/arrow/src/parquet_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use arrow::io::parquet::write::to_parquet_schema;
use arrow::io::parquet::write::RowGroupIterator;
use parquet2::write::FileWriter;
use parquet2::write::WriteOptions;
use parquet_format_async_temp::FileMetaData;
use parquet2::FileMetaData;

// a simple wrapper for code reuse
pub fn write_parquet_file<W: Write, A, I>(
Expand Down
2 changes: 1 addition & 1 deletion common/streams/tests/it/sources/source_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn test_source_parquet() -> Result<()> {
use common_arrow::arrow::io::parquet::write::*;
let options = WriteOptions {
write_statistics: true,
compression: Compression::Lz4, // let's begin with lz4
compression: Compression::Lz4Raw,
version: Version::V2,
};

Expand Down
1 change: 0 additions & 1 deletion query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ num_cpus = "1.13.1"
octocrab = "0.15.4"
once_cell = "1.10.0"
opendal = "0.5.1"
parquet-format-async-temp = "=0.2.0"
paste = "1.0.7"
petgraph = "0.6.0"
poem = { version = "=1.3.16", features = ["rustls", "multipart", "compression"] }
Expand Down
2 changes: 1 addition & 1 deletion query/src/interpreters/interpreter_user_stage_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl Interpreter for CreateUserStageInterpreter {
if user_stage.stage_type == StageType::Internal {
let prefix = format!("stage/{}/", user_stage.stage_name);
let op = self.ctx.get_storage_operator()?;
op.object(&prefix).create().await?;
op.object(&prefix).create().await?
}

let _create_stage = user_mgr
Expand Down
5 changes: 5 additions & 0 deletions query/src/storages/fuse/fuse_part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use common_exception::Result;
use common_planners::PartInfo;
use common_planners::PartInfoPtr;

use crate::storages::fuse::meta::Compression;

#[derive(serde::Serialize, serde::Deserialize, PartialEq)]
pub struct ColumnMeta {
pub offset: u64,
Expand All @@ -46,6 +48,7 @@ pub struct FusePartInfo {
pub format_version: u64,
pub nums_rows: usize,
pub columns_meta: HashMap<usize, ColumnMeta>,
pub compression: Compression,
}

#[typetag::serde(name = "fuse")]
Expand All @@ -68,12 +71,14 @@ impl FusePartInfo {
format_version: u64,
rows_count: u64,
columns_meta: HashMap<usize, ColumnMeta>,
compression: Compression,
) -> Arc<Box<dyn PartInfo>> {
Arc::new(Box::new(FusePartInfo {
location,
format_version,
columns_meta,
nums_rows: rows_count as usize,
compression,
}))
}

Expand Down
15 changes: 13 additions & 2 deletions query/src/storages/fuse/io/read/block_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use common_arrow::arrow::io::parquet::read::column_iter_to_arrays;
use common_arrow::arrow::io::parquet::read::ArrayIter;
use common_arrow::arrow::io::parquet::read::RowGroupDeserializer;
use common_arrow::arrow::io::parquet::write::to_parquet_schema;
use common_arrow::parquet::compression::Compression;
use common_arrow::parquet::compression::Compression as ParquetCompression;
use common_arrow::parquet::metadata::ColumnDescriptor;
use common_arrow::parquet::metadata::SchemaDescriptor;
use common_arrow::parquet::read::BasicDecompressor;
Expand All @@ -41,6 +41,7 @@ use opendal::Operator;

use crate::storages::fuse::fuse_part::ColumnMeta;
use crate::storages::fuse::fuse_part::FusePartInfo;
use crate::storages::fuse::meta::Compression;

#[derive(Clone)]
pub struct BlockReader {
Expand Down Expand Up @@ -76,11 +77,12 @@ impl BlockReader {
rows: usize,
descriptor: &ColumnDescriptor,
field: Field,
compression: &Compression,
) -> Result<ArrayIter<'static>> {
let pages = PageIterator::new(
std::io::Cursor::new(chunk),
meta.num_values as i64,
Compression::Lz4,
Self::to_parquet_compression(compression),
descriptor.clone(),
Arc::new(|_, _| true),
vec![],
Expand Down Expand Up @@ -137,6 +139,7 @@ impl BlockReader {
rows,
column_descriptor,
field,
&part.compression,
)?);
}

Expand Down Expand Up @@ -165,6 +168,7 @@ impl BlockReader {
num_rows,
column_descriptor,
field,
&part.compression,
)?);
}

Expand Down Expand Up @@ -224,4 +228,11 @@ impl BlockReader {
Some(Ok(chunk)) => DataBlock::from_chunk(&self.projected_schema, &chunk),
}
}

fn to_parquet_compression(meta_compression: &Compression) -> ParquetCompression {
match meta_compression {
Compression::Lz4 => ParquetCompression::Lz4,
Compression::Lz4Raw => ParquetCompression::Lz4Raw,
}
}
}
2 changes: 1 addition & 1 deletion query/src/storages/fuse/io/write/block_stream_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use common_arrow::parquet::FileMetaData;
use common_datablocks::DataBlock;
use common_datavalues::DataSchema;
use common_exception::ErrorCode;
Expand All @@ -25,7 +26,6 @@ use futures::stream::Stream;
use futures::StreamExt;
use futures::TryStreamExt;
use opendal::Operator;
use parquet_format_async_temp::FileMetaData;

use super::block_writer;
use crate::storages::fuse::io::TableMetaLocationGenerator;
Expand Down
4 changes: 2 additions & 2 deletions query/src/storages/fuse/io/write/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ use common_arrow::arrow::datatypes::Schema as ArrowSchema;
use common_arrow::arrow::io::parquet::write::WriteOptions;
use common_arrow::arrow::io::parquet::write::*;
use common_arrow::parquet::encoding::Encoding;
use common_arrow::parquet::FileMetaData;
use common_datablocks::DataBlock;
use common_exception::ErrorCode;
use common_exception::Result;
use opendal::Operator;
use parquet_format_async_temp::FileMetaData;

pub async fn write_block(
arrow_schema: &ArrowSchema,
Expand All @@ -33,7 +33,7 @@ pub async fn write_block(
) -> Result<(u64, FileMetaData)> {
let options = WriteOptions {
write_statistics: false,
compression: Compression::Lz4, // let's begin with lz4
compression: Compression::Lz4Raw,
version: Version::V2,
};
let batch = Chunk::try_from(block)?;
Expand Down
16 changes: 15 additions & 1 deletion query/src/storages/fuse/meta/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::collections::HashMap;

use serde::Deserialize;
use serde::Serialize;
use uuid::Uuid;

use crate::storages::index::ColumnStatistics;
Expand All @@ -23,7 +25,7 @@ pub type FormatVersion = u64;
pub type SnapshotId = Uuid;
pub type Location = (String, FormatVersion);

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)]
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub struct Statistics {
pub row_count: u64,
pub block_count: u64,
Expand All @@ -40,3 +42,15 @@ where Self: Sized
{
const VERSION: u64 = V;
}

#[derive(Serialize, Deserialize, PartialEq, Copy, Clone, Debug)]
pub enum Compression {
Lz4,
Lz4Raw,
}

impl Compression {
pub fn legacy() -> Self {
Compression::Lz4
}
}
1 change: 1 addition & 0 deletions query/src/storages/fuse/meta/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod v1;
mod versions;

pub use common::ColumnId;
pub use common::Compression;
pub use common::Location;
pub use common::SnapshotId;
pub use common::Statistics;
Expand Down
16 changes: 14 additions & 2 deletions query/src/storages/fuse/meta/v1/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
use std::collections::HashMap;

use common_datablocks::DataBlock;
use serde::Deserialize;
use serde::Serialize;

use crate::storages::fuse::meta::common::ColumnId;
use crate::storages::fuse::meta::common::Compression;
use crate::storages::fuse::meta::common::FormatVersion;
use crate::storages::fuse::meta::common::Location;
use crate::storages::fuse::meta::common::Statistics;
Expand All @@ -26,7 +29,7 @@ use crate::storages::fuse::meta::v0::ColumnMeta;
use crate::storages::index::ColumnStatistics;

/// A segment comprises one or more blocks
#[derive(serde::Serialize, serde::Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug)]
pub struct SegmentInfo {
/// format version
format_version: FormatVersion,
Expand All @@ -38,14 +41,22 @@ pub struct SegmentInfo {

/// Meta information of a block
/// Part of and kept inside the [SegmentInfo]
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct BlockMeta {
pub row_count: u64,
pub block_size: u64,
pub file_size: u64,
pub col_stats: HashMap<ColumnId, ColumnStatistics>,
pub col_metas: HashMap<ColumnId, ColumnMeta>,
pub location: Location,

/// Compression algo used to compress the columns of blocks
///
/// If not specified, the legacy algo `Lz4` will be used.
/// `Lz4` is merely for backward compatibility, it will NO longer be
/// used in the write path.
#[serde(default = "Compression::legacy")]
pub compression: Compression,
}

impl SegmentInfo {
Expand Down Expand Up @@ -83,6 +94,7 @@ impl From<v0::BlockMeta> for BlockMeta {
col_stats: s.col_stats,
col_metas: s.col_metas,
location: (s.location.path, DataBlock::VERSION),
compression: Compression::Lz4,
}
}
}
2 changes: 0 additions & 2 deletions query/src/storages/fuse/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ mod append;
mod commit;
mod operation_log;
mod optimize;
mod part_info;
mod read;
mod read_partitions;
mod truncate;

pub use operation_log::AppendOperationLogEntry;
pub use operation_log::TableOperationLog;
pub use part_info::PartInfo;
1 change: 0 additions & 1 deletion query/src/storages/fuse/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ impl FuseTable {
// segments which no longer need to be kept
let seg_delta = prevs.difference(&current_segments).collect::<Vec<_>>();

// TODO rm those deference **
// blocks to be removed
let prev_blocks: HashSet<String> = self
.blocks_of(seg_delta.iter().map(|i| **i), ctx.clone())
Expand Down
Loading

0 comments on commit b0c7f01

Please sign in to comment.