Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cluster key statistics in block meta #5194

Merged
merged 4 commits into from
May 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions common/datavalues/src/data_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
// Borrow from apache/arrow/rust/datafusion/src/functions.rs
// See notice.md

use std::cmp::Ordering;
use std::fmt;
use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use common_macros::MallocSizeOf;
use ordered_float::OrderedFloat;
use serde_json::json;

use crate::prelude::*;
Expand Down Expand Up @@ -177,6 +179,19 @@ impl DataValue {
matches!(self, DataValue::UInt64(_))
}

#[inline]
pub fn is_numeric(&self) -> bool {
matches!(
self,
DataValue::Int64(_) | DataValue::UInt64(_) | DataValue::Float64(_)
)
}

#[inline]
pub fn is_float(&self) -> bool {
matches!(self, DataValue::Float64(_))
}

pub fn as_u64(&self) -> Result<u64> {
match self {
DataValue::Int64(v) if *v >= 0 => Ok(*v as u64),
Expand Down Expand Up @@ -258,6 +273,66 @@ impl DataValue {
}
}

impl Eq for DataValue {}

impl Ord for DataValue {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we can define a meaningful total order relation between DataValues.
@sundy-li any comments about this? thanks

Copy link
Member

@dantengsky dantengsky May 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aggregate_scalar_state is generic comparision impls without matching each DataValue, so it's optimized by performance.

In cluster key, we already have min/max indexes, so we don't need to copy this indexes into a Column and apply eval_aggr.

Copy link
Member

@dantengsky dantengsky May 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aggregate_scalar_state is generic comparision impls without matching each DataValue, so it's optimized by performance...

got it.

Two other concerns:

  1. Although we can define our own total order for DataValue (even for weird things like NaN...)
    but a PartialOrder may be easier to define (or just use rust's derive mechanism)
  2. The min/max logic is also implemented by the aggregate_min/max
    It would be better if these two keep in coherent.

A suggestion:

  • derive PartialOrd for DataValue

  • implements the min/max like the aggregate_scalar_state does
    maybe something like this (hope it is doing the same thing as aggregate min max does:-)

            let min = min_stats
                  .iter()
                  .filter(|x| !x.is_null())
                  .min_by(|x, y| min.partial_cmp(&r).unwrap_or(Ordering::Equal))
                  .unwrap_or(DataValue::Null);
    
             let max = max_stats
                  .iter()
                  .filter(|x| !x.is_null())
                  .max_by(|x, y| max.partial_cmp(&r).unwrap_or(Ordering::Equal))
                  .unwrap_or(DataValue::Null);
    

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dantengsky . This is a great suggestion. I will try it later

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, hope it helps.
As long as the format kept backward compatible, I think we could merge this first and polish it later.

fn cmp(&self, other: &Self) -> Ordering {
if self.value_type() == other.value_type() {
return match (self, other) {
(DataValue::Null, DataValue::Null) => Ordering::Equal,
(DataValue::Boolean(v1), DataValue::Boolean(v2)) => v1.cmp(v2),
(DataValue::UInt64(v1), DataValue::UInt64(v2)) => v1.cmp(v2),
(DataValue::Int64(v1), DataValue::Int64(v2)) => v1.cmp(v2),
(DataValue::Float64(v1), DataValue::Float64(v2)) => {
OrderedFloat::from(*v1).cmp(&OrderedFloat::from(*v2))
}
(DataValue::String(v1), DataValue::String(v2)) => v1.cmp(v2),
(DataValue::Array(v1), DataValue::Array(v2)) => {
for (l, r) in v1.iter().zip(v2) {
let cmp = l.cmp(r);
if cmp != Ordering::Equal {
return cmp;
}
}
v1.len().cmp(&v2.len())
}
(DataValue::Struct(v1), DataValue::Struct(v2)) => {
for (l, r) in v1.iter().zip(v2.iter()) {
let cmp = l.cmp(r);
if cmp != Ordering::Equal {
return cmp;
}
}
v1.len().cmp(&v2.len())
}
(DataValue::Variant(v1), DataValue::Variant(v2)) => v1.cmp(v2),
_ => unreachable!(),
};
}

if !self.is_numeric() || !other.is_numeric() {
panic!(
"Cannot compare different types with {:?} and {:?}",
self.value_type(),
other.value_type()
);
}

if self.is_float() || other.is_float() {
return OrderedFloat::from(self.as_f64().unwrap())
.cmp(&OrderedFloat::from(other.as_f64().unwrap()));
}

self.as_i64().unwrap().cmp(&other.as_i64().unwrap())
}
}

impl PartialOrd for DataValue {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

// Did not use std::convert:TryFrom
// Because we do not need custom type error.
pub trait DFTryFrom<T>: Sized {
Expand Down
2 changes: 1 addition & 1 deletion query/src/storages/fuse/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl Table for FuseTable {
ctx: Arc<QueryContext>,
stream: SendableDataBlockStream,
) -> Result<SendableDataBlockStream> {
let log_entry_stream = self.append_trunks(ctx, stream).await?;
let log_entry_stream = self.append_chunks(ctx, stream).await?;
let data_block_stream =
log_entry_stream.map(|append_log_entry_res| match append_log_entry_res {
Ok(log_entry) => DataBlock::try_from(log_entry),
Expand Down
24 changes: 8 additions & 16 deletions query/src/storages/fuse/io/write/block_stream_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use common_arrow::parquet::FileMetaData;
use common_datablocks::DataBlock;
use common_datavalues::DataSchema;
use common_exception::ErrorCode;
use common_exception::Result;
use common_streams::SendableDataBlockStream;
Expand All @@ -41,7 +39,6 @@ pub type SegmentInfoStream =
pub struct BlockStreamWriter {
num_block_threshold: usize,
data_accessor: Operator,
data_schema: Arc<DataSchema>,
number_of_blocks_accumulated: usize,
statistics_accumulator: Option<StatisticsAccumulator>,
meta_locations: TableMetaLocationGenerator,
Expand All @@ -51,7 +48,6 @@ impl BlockStreamWriter {
pub async fn write_block_stream(
data_accessor: Operator,
block_stream: SendableDataBlockStream,
data_schema: Arc<DataSchema>,
row_per_block: usize,
block_per_segment: usize,
meta_locations: TableMetaLocationGenerator,
Expand All @@ -70,12 +66,7 @@ impl BlockStreamWriter {

// Write out the blocks.
// And transform the stream of DataBlocks into Stream of SegmentInfo at the same time.
let block_writer = BlockStreamWriter::new(
block_per_segment,
data_accessor,
data_schema,
meta_locations,
);
let block_writer = BlockStreamWriter::new(block_per_segment, data_accessor, meta_locations);
let segments = Self::transform(Box::pin(block_stream), block_writer);

Box::pin(segments)
Expand All @@ -84,13 +75,11 @@ impl BlockStreamWriter {
pub fn new(
num_block_threshold: usize,
data_accessor: Operator,
data_schema: Arc<DataSchema>,
meta_locations: TableMetaLocationGenerator,
) -> Self {
Self {
num_block_threshold,
data_accessor,
data_schema,
number_of_blocks_accumulated: 0,
statistics_accumulator: None,
meta_locations,
Expand Down Expand Up @@ -127,7 +116,7 @@ impl BlockStreamWriter {

async fn write_block(&mut self, block: DataBlock) -> Result<Option<SegmentInfo>> {
let mut acc = self.statistics_accumulator.take().unwrap_or_default();
let partial_acc = acc.begin(&block)?;
let partial_acc = acc.begin(&block, None)?;
let schema = block.schema().to_arrow();
let location = self.meta_locations.gen_block_location();
let (file_size, file_meta_data) =
Expand All @@ -137,13 +126,15 @@ impl BlockStreamWriter {
acc = partial_acc.end(file_size, location, col_metas);
self.number_of_blocks_accumulated += 1;
if self.number_of_blocks_accumulated >= self.num_block_threshold {
let summary = acc.summary(self.data_schema.as_ref())?;
let summary = acc.summary()?;
let cluster_stats = acc.summary_clusters();
let seg = SegmentInfo::new(acc.blocks_metas, Statistics {
row_count: acc.summary_row_count,
block_count: acc.summary_block_count,
uncompressed_byte_size: acc.in_memory_size,
compressed_byte_size: acc.file_size,
col_stats: summary,
cluster_stats,
});

// Reset state
Expand Down Expand Up @@ -231,17 +222,18 @@ impl Compactor<DataBlock, SegmentInfo> for BlockStreamWriter {

fn finish(mut self) -> Result<Option<SegmentInfo>> {
let acc = self.statistics_accumulator.take();
let data_schema = self.data_schema.as_ref();
match acc {
None => Ok(None),
Some(acc) => {
let summary = acc.summary(data_schema)?;
let summary = acc.summary()?;
let cluster_stats = acc.summary_clusters();
let seg = SegmentInfo::new(acc.blocks_metas, Statistics {
row_count: acc.summary_row_count,
block_count: acc.summary_block_count,
uncompressed_byte_size: acc.in_memory_size,
compressed_byte_size: acc.file_size,
col_stats: summary,
cluster_stats,
});
Ok(Some(seg))
}
Expand Down
2 changes: 2 additions & 0 deletions query/src/storages/fuse/meta/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use serde::Deserialize;
use serde::Serialize;
use uuid::Uuid;

use crate::storages::index::ClusterStatistics;
use crate::storages::index::ColumnStatistics;

pub type ColumnId = u32;
Expand All @@ -34,6 +35,7 @@ pub struct Statistics {
pub compressed_byte_size: u64,

pub col_stats: HashMap<ColumnId, ColumnStatistics>,
pub cluster_stats: Option<ClusterStatistics>,
}

/// Thing has a u64 version nubmer
Expand Down
3 changes: 3 additions & 0 deletions query/src/storages/fuse/meta/v1/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::storages::fuse::meta::common::Location;
use crate::storages::fuse::meta::common::Statistics;
use crate::storages::fuse::meta::common::Versioned;
use crate::storages::fuse::meta::v0::ColumnMeta;
use crate::storages::index::ClusterStatistics;
use crate::storages::index::ColumnStatistics;

/// A segment comprises one or more blocks
Expand All @@ -48,6 +49,7 @@ pub struct BlockMeta {
pub file_size: u64,
pub col_stats: HashMap<ColumnId, ColumnStatistics>,
pub col_metas: HashMap<ColumnId, ColumnMeta>,
pub cluster_stats: Option<ClusterStatistics>,
pub location: Location,

/// Compression algo used to compress the columns of blocks
Expand Down Expand Up @@ -93,6 +95,7 @@ impl From<v0::BlockMeta> for BlockMeta {
file_size: s.file_size,
col_stats: s.col_stats,
col_metas: s.col_metas,
cluster_stats: None,
location: (s.location.path, DataBlock::VERSION),
compression: Compression::Lz4,
}
Expand Down
38 changes: 11 additions & 27 deletions query/src/storages/fuse/operations/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@ use common_cache::Cache;
use common_datablocks::SortColumnDescription;
use common_datavalues::DataSchemaRefExt;
use common_exception::Result;
use common_planners::Expression;
use common_streams::SendableDataBlockStream;
use futures::StreamExt;

use crate::pipelines::new::processors::port::InputPort;
use crate::pipelines::new::processors::BlockCompactor;
use crate::pipelines::new::processors::ExpressionTransform;
use crate::pipelines::new::processors::ProjectionTransform;
use crate::pipelines::new::processors::TransformCompact;
use crate::pipelines::new::processors::TransformSortPartial;
use crate::pipelines::new::NewPipeline;
Expand All @@ -48,7 +46,7 @@ pub type AppendOperationLogEntryStream =

impl FuseTable {
#[inline]
pub async fn append_trunks(
pub async fn append_chunks(
&self,
ctx: Arc<QueryContext>,
stream: SendableDataBlockStream,
Expand All @@ -63,7 +61,6 @@ impl FuseTable {
let mut segment_stream = BlockStreamWriter::write_block_stream(
da.clone(),
stream,
self.table_info.schema().clone(),
rows_per_block,
block_per_seg,
self.meta_location_generator().clone(),
Expand Down Expand Up @@ -116,15 +113,21 @@ impl FuseTable {
)
})?;

let mut cluster_keys_index = Vec::with_capacity(self.order_keys.len());
if !self.order_keys.is_empty() {
let input_schema = self.table_info.schema();
let mut merged = input_schema.fields().clone();

for expr in &self.order_keys {
let cname = expr.column_name();
if !merged.iter().any(|x| x.name() == &cname) {
merged.push(expr.to_data_field(&input_schema)?);
}
let index = match merged.iter().position(|x| x.name() == &cname) {
None => {
merged.push(expr.to_data_field(&input_schema)?);
merged.len() - 1
}
Some(idx) => idx,
};
cluster_keys_index.push(index);
}

let output_schema = DataSchemaRefExt::create(merged);
Expand Down Expand Up @@ -161,26 +164,6 @@ impl FuseTable {
sort_descs.clone(),
)
})?;

// Remove unused columns before sink
if output_schema != input_schema {
let project_exprs: Vec<Expression> = input_schema
.fields()
.iter()
.map(|f| Expression::Column(f.name().to_owned()))
.collect();

pipeline.add_transform(|transform_input_port, transform_output_port| {
ProjectionTransform::try_create(
transform_input_port,
transform_output_port,
output_schema.clone(),
input_schema.clone(),
project_exprs.clone(),
ctx.clone(),
)
})?;
}
}

let mut sink_pipeline_builder = SinkPipeBuilder::create();
Expand All @@ -195,6 +178,7 @@ impl FuseTable {
da.clone(),
self.table_info.schema().clone(),
self.meta_location_generator().clone(),
cluster_keys_index.clone(),
)?,
);
}
Expand Down
18 changes: 13 additions & 5 deletions query/src/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl FuseTable {
let prev = self.read_table_snapshot(ctx).await?;
let prev_version = self.snapshot_format_version();
let schema = self.table_info.meta.schema.as_ref().clone();
let (segments, summary) = Self::merge_append_operations(&schema, operation_log)?;
let (segments, summary) = Self::merge_append_operations(operation_log)?;

let progress_values = ProgressValues {
rows: summary.row_count as usize,
Expand Down Expand Up @@ -215,7 +215,7 @@ impl FuseTable {
// 1. merge stats with previous snapshot, if any
let stats = if let Some(snapshot) = &previous {
let summary = &snapshot.summary;
statistics::merge_statistics(schema, &statistics, summary)?
statistics::merge_statistics(&statistics, summary)?
} else {
statistics
};
Expand Down Expand Up @@ -265,7 +265,6 @@ impl FuseTable {
}

pub fn merge_append_operations(
schema: &DataSchema,
append_log_entries: &[AppendOperationLogEntry],
) -> Result<(Vec<String>, Statistics)> {
let (s, seg_locs) = append_log_entries.iter().try_fold(
Expand All @@ -280,8 +279,17 @@ impl FuseTable {
acc.block_count += stats.block_count;
acc.uncompressed_byte_size += stats.uncompressed_byte_size;
acc.compressed_byte_size += stats.compressed_byte_size;
acc.col_stats =
statistics::reduce_block_stats(&[&acc.col_stats, &stats.col_stats], schema)?;
(acc.col_stats, acc.cluster_stats) = if acc.col_stats.is_empty() {
(stats.col_stats.clone(), stats.cluster_stats.clone())
} else {
(
statistics::reduce_block_stats(&[&acc.col_stats, &stats.col_stats])?,
statistics::reduce_cluster_stats(&[
&acc.cluster_stats,
&stats.cluster_stats,
]),
)
};
seg_acc.push(loc.clone());
Ok::<_, ErrorCode>((acc, seg_acc))
},
Expand Down
Loading