Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove dependency on parquet2: Part I #15158

Merged
merged 15 commits into from
Apr 7, 2024
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions src/query/ast/fuzz/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,3 @@ test = false
databend-common-ast = { path = ".." }

afl = "0.12"

[patch.crates-io]
parquet2 = { version = "0.14.1", optional = true, git = "https://github.com/datafuse-extras/parquet2", rev = "3a468fc3c4" }
8 changes: 1 addition & 7 deletions src/query/ee/src/storages/fuse/operations/virtual_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,7 @@ async fn materialize_virtual_columns(
let virtual_block = DataBlock::new(virtual_columns, len);

let mut buffer = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE);
let _ = serialize_block(
write_settings,
&virtual_schema,
virtual_block,
&mut buffer,
true,
)?;
let _ = serialize_block(write_settings, &virtual_schema, virtual_block, &mut buffer)?;

write_data(buffer, operator, location).await?;

Expand Down
8 changes: 1 addition & 7 deletions src/query/formats/src/output_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,7 @@ impl OutputFormat for ParquetOutputFormat {
return Ok(vec![]);
}
let mut buf = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE);
let _ = blocks_to_parquet(
&self.schema,
blocks,
&mut buf,
TableCompression::Zstd,
false,
)?;
let _ = blocks_to_parquet(&self.schema, blocks, &mut buf, TableCompression::Zstd)?;
Ok(buf)
}
}
9 changes: 4 additions & 5 deletions src/query/service/src/test_kits/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use databend_common_storages_fuse::io::TableMetaLocationGenerator;
use databend_common_storages_fuse::io::WriteSettings;
use databend_common_storages_fuse::FuseStorageFormat;
use databend_storages_common_blocks::blocks_to_parquet;
use databend_storages_common_blocks::ParquetFileMeta;
use databend_storages_common_index::BloomIndex;
use databend_storages_common_table_meta::meta::BlockMeta;
use databend_storages_common_table_meta::meta::ClusterStatistics;
Expand All @@ -34,6 +33,7 @@ use databend_storages_common_table_meta::meta::Location;
use databend_storages_common_table_meta::meta::StatisticsOfColumns;
use databend_storages_common_table_meta::table::TableCompression;
use opendal::Operator;
use parquet::format::FileMetaData;
use uuid::Uuid;

pub struct BlockWriter<'a> {
Expand All @@ -59,7 +59,7 @@ impl<'a> BlockWriter<'a> {
block: DataBlock,
col_stats: StatisticsOfColumns,
cluster_stats: Option<ClusterStatistics>,
) -> Result<(BlockMeta, Option<ParquetFileMeta>)> {
) -> Result<(BlockMeta, Option<FileMetaData>)> {
let (location, block_id) = self.location_generator.gen_block_location();

let data_accessor = &self.data_accessor;
Expand All @@ -75,7 +75,7 @@ impl<'a> BlockWriter<'a> {
};

let mut buf = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE);
let col_metas = serialize_block(&write_settings, schema, block, &mut buf, false)?;
let col_metas = serialize_block(&write_settings, schema, block, &mut buf)?;
let file_size = buf.len() as u64;

data_accessor.write(&location.0, buf).await?;
Expand All @@ -102,7 +102,7 @@ impl<'a> BlockWriter<'a> {
schema: TableSchemaRef,
block: &DataBlock,
block_id: Uuid,
) -> Result<(u64, Option<Location>, Option<ParquetFileMeta>)> {
) -> Result<(u64, Option<Location>, Option<FileMetaData>)> {
let location = self
.location_generator
.block_bloom_index_location(&block_id);
Expand All @@ -126,7 +126,6 @@ impl<'a> BlockWriter<'a> {
vec![index_block],
&mut data,
TableCompression::None,
false,
)?;
let size = data.len() as u64;
data_accessor.write(&location.0, data).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use databend_common_storages_fuse::statistics::gen_columns_statistics;
use databend_common_storages_fuse::statistics::STATS_STRING_PREFIX_LEN;
use databend_common_storages_fuse::FuseStorageFormat;
use databend_query::test_kits::*;
use databend_storages_common_blocks::ParquetFileMeta;
use databend_storages_common_cache::InMemoryCacheBuilder;
use databend_storages_common_cache::InMemoryItemCacheHolder;
// use databend_storages_common_index::BloomIndexMeta;
Expand All @@ -51,6 +50,7 @@ use databend_storages_common_table_meta::meta::SingleColumnMeta;
use databend_storages_common_table_meta::meta::Statistics;
use databend_storages_common_table_meta::meta::Versioned;
use opendal::Operator;
use parquet::format::FileMetaData;
use sysinfo::get_current_pid;
use sysinfo::System;
use uuid::Uuid;
Expand Down Expand Up @@ -376,7 +376,7 @@ where T: Clone {
}

#[allow(dead_code)]
async fn setup() -> databend_common_exception::Result<ParquetFileMeta> {
async fn setup() -> databend_common_exception::Result<FileMetaData> {
let fields = (0..23)
.map(|_| TableField::new("id", TableDataType::Number(NumberDataType::Int32)))
.collect::<Vec<_>>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -765,8 +765,7 @@ impl CompactSegmentTestFixture {
};

let mut buf = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE);
let col_metas =
serialize_block(&write_settings, &schema, block, &mut buf, false)?;
let col_metas = serialize_block(&write_settings, &schema, block, &mut buf)?;
let file_size = buf.len() as u64;

data_accessor.write(&location.0, buf).await?;
Expand Down
12 changes: 0 additions & 12 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,18 +546,6 @@ impl DefaultSettings {
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("fuse_write_use_parquet2", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Use parquet2 instead of parquet_rs when writing data with fuse engine.",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("fuse_read_use_parquet2", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Use parquet2 instead of parquet_rs when reading data with fuse engine.",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("enable_replace_into_partitioning", DefaultSettingValue {
value: UserSettingValue::UInt64(1),
desc: "Enables partitioning for replace-into statement (if table has cluster keys).",
Expand Down
16 changes: 0 additions & 16 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,22 +487,6 @@ impl Settings {
self.try_set_u64("use_parquet2", u64::from(val))
}

pub fn get_fuse_write_use_parquet2(&self) -> Result<bool> {
Ok(self.try_get_u64("fuse_write_use_parquet2")? == 1)
}

pub fn set_fuse_write_use_parquet2(&self, val: bool) -> Result<()> {
self.try_set_u64("fuse_write_use_parquet2", u64::from(val))
}

pub fn get_fuse_read_use_parquet2(&self) -> Result<bool> {
Ok(self.try_get_u64("fuse_read_use_parquet2")? != 0)
}

pub fn set_fuse_read_use_parquet2(&self, val: bool) -> Result<()> {
self.try_set_u64("fuse_read_use_parquet2", u64::from(val))
}

pub fn get_enable_replace_into_partitioning(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_replace_into_partitioning")? != 0)
}
Expand Down
3 changes: 0 additions & 3 deletions src/query/storages/common/blocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@ doctest = false
test = false

[dependencies]
databend-common-arrow = { path = "../../../../common/arrow" }
databend-common-config = { path = "../../../config" }
databend-common-exception = { path = "../../../../common/exception" }
databend-common-expression = { path = "../../../expression" }
parquet-format-safe = "0.2"
parquet_rs = { workspace = true }

databend-storages-common-table-meta = { path = "../table_meta" }
Expand Down
27 changes: 1 addition & 26 deletions src/query/storages/common/blocks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,5 @@

#![allow(clippy::uninlined_format_args)]

use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_common_expression::TableSchema;
use databend_storages_common_table_meta::table::TableCompression;

mod parquet2;
mod parquet_rs;

pub enum ParquetFileMeta {
Parquet2(parquet_format_safe::FileMetaData),
ParquetRs(::parquet_rs::format::FileMetaData),
}

pub fn blocks_to_parquet(
schema: &TableSchema,
blocks: Vec<DataBlock>,
write_buffer: &mut Vec<u8>,
compression: TableCompression,
use_parquet2: bool,
) -> Result<ParquetFileMeta> {
match use_parquet2 {
true => parquet2::blocks_to_parquet(schema, blocks, write_buffer, compression)
.map(|(_, meta)| ParquetFileMeta::Parquet2(meta)),
false => parquet_rs::blocks_to_parquet(schema, blocks, write_buffer, compression)
.map(ParquetFileMeta::ParquetRs),
}
}
pub use parquet_rs::blocks_to_parquet;
106 changes: 0 additions & 106 deletions src/query/storages/common/blocks/src/parquet2.rs

This file was deleted.

2 changes: 2 additions & 0 deletions src/query/storages/common/index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ xorfilter-rs = { git = "https://github.com/datafuse-extras/xorfilter", features
"cbordata",
], tag = "databend-alpha.4" }

parquet = { workspace = true }

[dev-dependencies]
criterion = "0.4"
databend-common-arrow = { path = "../../../../common/arrow" }
Expand Down
Loading
Loading