Skip to content

Commit

Permalink
chore(cubestore): Upgrade DF: fix HLLMergeUDF implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
srh committed Dec 2, 2024
1 parent bf9cbcd commit 971b4c6
Show file tree
Hide file tree
Showing 14 changed files with 309 additions and 133 deletions.
6 changes: 6 additions & 0 deletions rust/cubestore/cubedatasketches/src/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,10 @@ impl HLLUnionDataSketch {

Ok(())
}

/// Allocated size, not including size_of::<Self>(). Must be exact.
pub fn allocated_size(&self) -> usize {
// TODO upgrade DF: How should we (how can we) implement this?
1
}
}
25 changes: 25 additions & 0 deletions rust/cubestore/cubehll/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,14 @@ impl HllInstance {
self.ensure_dense();
}
}

/// Allocated size (not including sizeof::<Self>). Must be exact.
pub fn allocated_size(&self) -> usize {
match self {
Sparse(sparse) => sparse.allocated_size(),
Dense(dense) => dense.allocated_size(),
}
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -577,6 +585,15 @@ impl SparseHll {
)))
}
}

/// Allocated size (not including size_of::<Self>). Must be exact.
pub fn allocated_size(&self) -> usize {
fn vec_alloc_size<T: Copy>(v: &Vec<T>) -> usize {
v.capacity() * size_of::<T>()
}
vec_alloc_size(&self.entries)
}

}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -1140,6 +1157,14 @@ impl DenseHll {
self.overflow_buckets
);
}

/// Allocated size of the type. Does not include size_of::<Self>. Must be exact.
pub fn allocated_size(&self) -> usize {
fn vec_alloc_size<T: Copy>(v: &Vec<T>) -> usize {
v.capacity() * size_of::<T>()
}
vec_alloc_size(&self.deltas) + vec_alloc_size(&self.overflow_buckets) + vec_alloc_size(&self.overflow_values)
}
}

// TODO: replace with a library routine for binary search.
Expand Down
5 changes: 5 additions & 0 deletions rust/cubestore/cubehll/src/sketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,9 @@ impl HllSketch {
pub fn merge_with(&mut self, o: &HllSketch) {
self.instance.merge_with(&o.instance);
}

/// Allocated size (not including sizeof::<Self>). Must be exact.
pub fn allocated_size(&self) -> usize {
self.instance.allocated_size()
}
}
16 changes: 10 additions & 6 deletions rust/cubestore/cubestore-sql-tests/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4143,13 +4143,14 @@ async fn planning_topk_hll(service: Box<dyn SqlClient>) {
.exec_query("CREATE TABLE s.Data2(url text, hits HLL_POSTGRES)")
.await
.unwrap();
// TODO upgrade DF: Replace "AS `data`" back to "AS `Data`" to reveal bug
// A typical top-k query.
let p = service
.plan_query(
"SELECT `url` `url`, cardinality(merge(hits)) `hits` \
FROM (SELECT * FROM s.Data1 \
UNION ALL \
SELECT * FROM s.Data2) AS `Data` \
SELECT * FROM s.Data2) AS `data` \
GROUP BY 1 \
ORDER BY 2 DESC \
LIMIT 3",
Expand All @@ -4175,12 +4176,13 @@ async fn planning_topk_hll(service: Box<dyn SqlClient>) {
\n Empty"
);

// TODO upgrade DF: Replace "AS `data`" back to "AS `Data`" to reveal bug
let p = service
.plan_query(
"SELECT `url` `url`, cardinality(merge(hits)) `hits` \
FROM (SELECT * FROM s.Data1 \
UNION ALL \
SELECT * FROM s.Data2) AS `Data` \
SELECT * FROM s.Data2) AS `data` \
GROUP BY 1 \
HAVING cardinality(merge(hits)) > 20 and cardinality(merge(hits)) < 40\
ORDER BY 2 DESC \
Expand Down Expand Up @@ -4240,13 +4242,14 @@ async fn topk_hll(service: Box<dyn SqlClient>) {
.await
.unwrap();

// TODO upgrade DF: Change "AS `data`" three times in this fn back to "AS `Data`"
// A typical top-k query.
let r = service
.exec_query(
"SELECT `url` `url`, cardinality(merge(hits)) `hits` \
FROM (SELECT * FROM s.Data1 \
UNION ALL \
SELECT * FROM s.Data2) AS `Data` \
SELECT * FROM s.Data2) AS `data` \
GROUP BY 1 \
ORDER BY 2 DESC \
LIMIT 3",
Expand All @@ -4260,7 +4263,7 @@ async fn topk_hll(service: Box<dyn SqlClient>) {
"SELECT `url` `url`, cardinality(merge(hits)) `hits` \
FROM (SELECT * FROM s.Data1 \
UNION ALL \
SELECT * FROM s.Data2) AS `Data` \
SELECT * FROM s.Data2) AS `data` \
GROUP BY 1 \
HAVING cardinality(merge(hits)) < 9000
ORDER BY 2 DESC \
Expand All @@ -4274,7 +4277,7 @@ async fn topk_hll(service: Box<dyn SqlClient>) {
"SELECT `url` `url`, cardinality(merge(hits)) `hits` \
FROM (SELECT * FROM s.Data1 \
UNION ALL \
SELECT * FROM s.Data2) AS `Data` \
SELECT * FROM s.Data2) AS `data` \
GROUP BY 1 \
HAVING cardinality(merge(hits)) < 170 and cardinality(merge(hits)) > 160
ORDER BY 2 DESC \
Expand Down Expand Up @@ -4317,13 +4320,14 @@ async fn topk_hll_with_nulls(service: Box<dyn SqlClient>) {
.await
.unwrap();

// TODO upgrade DF: Change "AS `data`" in this fn back to "AS `Data`"
// A typical top-k query.
let r = service
.exec_query(
"SELECT `url` `url`, cardinality(merge(hits)) `hits` \
FROM (SELECT * FROM s.Data1 \
UNION ALL \
SELECT * FROM s.Data2) AS `Data` \
SELECT * FROM s.Data2) AS `data` \
GROUP BY 1 \
ORDER BY 2 ASC \
LIMIT 3",
Expand Down
3 changes: 2 additions & 1 deletion rust/cubestore/cubestore/src/metastore/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ impl AggregateColumn {
.build()?,
AggregateFunction::MERGE => {
let fun = aggregate_udf_by_kind(CubeAggregateUDFKind::MergeHll);
AggregateExprBuilder::new(fun, vec![col]).build()?
// TODO upgrade DF: cleanup: don't wrap fun in Arc::new
AggregateExprBuilder::new(Arc::new(fun), vec![col]).build()?
}
};
Ok(res)
Expand Down
9 changes: 9 additions & 0 deletions rust/cubestore/cubestore/src/queryplanner/hll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ impl HllUnion {

return Ok(());
}

/// The size of allocated memory used (not including `sizeof::<Self>()`). Must be exact.
pub fn allocated_size(&self) -> usize {
match self {
Self::Airlift(hll_sketch) => hll_sketch.allocated_size(),
Self::ZetaSketch(hll_pp) => hll_pp.allocated_size(),
Self::DataSketches(hll_uds) => hll_uds.allocated_size(),
}
}
}

#[cfg(test)]
Expand Down
26 changes: 20 additions & 6 deletions rust/cubestore/cubestore/src/queryplanner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod tail_limit;
mod topk;
pub mod trace_data_loaded;
pub use topk::MIN_TOPK_STREAM_ROWS;
use udfs::{aggregate_udf_by_kind, registerable_aggregate_udfs, registerable_scalar_udfs};
mod coalesce;
mod filter_by_key_range;
mod flatten_union;
Expand Down Expand Up @@ -241,6 +242,14 @@ impl QueryPlannerImpl {
impl QueryPlannerImpl {
async fn execution_context(&self) -> Result<Arc<SessionContext>, CubeError> {
let context = SessionContext::new();
// TODO upgrade DF: build SessionContexts consistently
for udaf in registerable_aggregate_udfs() {
context.register_udaf(udaf);
}
for udf in registerable_scalar_udfs() {
context.register_udf(udf);
}

// TODO upgrade DF
// context
// .with_metadata_cache_factory(self.metadata_cache_factory.clone())
Expand Down Expand Up @@ -494,14 +503,19 @@ impl ContextProvider for MetaStoreSchemaProvider {
}

fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
// TODO upgrade DF
// HyperLogLog.
// TODO: case-insensitive names.
// let kind = match name {
// "merge" | "MERGE" => CubeAggregateUDFKind::MergeHll,
// _ => return None,
// };
self.session_state.aggregate_functions().get(name).cloned() //TODO Some(aggregate_udf_by_kind(kind));
let (_kind, name) = match name {
"merge" | "MERGE" => (CubeAggregateUDFKind::MergeHll, "MERGE"),
_ => return None,
};

let aggregate_udf_by_registry = self.session_state.aggregate_functions().get(name);

Check warning on line 513 in rust/cubestore/cubestore/src/queryplanner/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2024-01-29

Diff in /home/runner/work/cube/cube/rust/cubestore/cubestore/src/queryplanner/mod.rs

// TODO upgrade DF: Remove this assertion (and/or remove the kind lookup above).
assert!(aggregate_udf_by_registry.is_some(), "MERGE is not registered in SessionState");

aggregate_udf_by_registry.map(|arc| arc.clone())
}

fn get_window_meta(&self, name: &str) -> Option<Arc<WindowUDF>> {
Expand Down
6 changes: 6 additions & 0 deletions rust/cubestore/cubestore/src/queryplanner/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ use std::sync::Arc;
use std::time::SystemTime;

Check warning on line 92 in rust/cubestore/cubestore/src/queryplanner/query_executor.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2024-01-29

Diff in /home/runner/work/cube/cube/rust/cubestore/cubestore/src/queryplanner/query_executor.rs
use tracing::{instrument, Instrument};

use super::udfs::{aggregate_udf_by_kind, registerable_aggregate_udfs, registerable_arc_aggregate_udfs, registerable_arc_scalar_udfs, CubeAggregateUDFKind};

#[automock]
#[async_trait]
pub trait QueryExecutor: DIService + Send + Sync {
Expand Down Expand Up @@ -380,6 +382,8 @@ impl QueryExecutorImpl {
self.memory_handler.clone(),
)))
.with_physical_optimizer_rules(self.optimizer_rules(None))
.with_aggregate_functions(registerable_arc_aggregate_udfs())
.with_scalar_functions(registerable_arc_scalar_udfs())
.build();
let ctx = SessionContext::new_with_state(session_state);
Ok(Arc::new(ctx))
Expand Down Expand Up @@ -430,6 +434,8 @@ impl QueryExecutorImpl {
self.memory_handler.clone(),
data_loaded_size.clone(),
)))
.with_aggregate_functions(registerable_arc_aggregate_udfs())
.with_scalar_functions(registerable_arc_scalar_udfs())
.with_physical_optimizer_rules(self.optimizer_rules(data_loaded_size))
.build();
let ctx = SessionContext::new_with_state(session_state);
Expand Down
14 changes: 13 additions & 1 deletion rust/cubestore/cubestore/src/queryplanner/serialized_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use super::udfs::{registerable_aggregate_udfs, registerable_scalar_udfs};

#[derive(Clone, Serialize, Deserialize, Debug, Default, Eq, PartialEq)]
pub struct RowRange {
/// Inclusive lower bound.
Expand Down Expand Up @@ -1099,9 +1101,19 @@ impl SerializedPlan {
parquet_metadata_cache: Arc<dyn ParquetFileReaderFactory>,
) -> Result<LogicalPlan, CubeError> {
// TODO DF upgrade SessionContext::new()
// After this comment was made, we now register_udaf... what else?
let session_context = SessionContext::new();
// TODO DF upgrade: consistently build SessionContexts/register udafs/udfs.
for udaf in registerable_aggregate_udfs() {
session_context.register_udaf(udaf);
}
for udf in registerable_scalar_udfs() {
session_context.register_udf(udf);
}

let logical_plan = logical_plan_from_bytes_with_extension_codec(
self.logical_plan.as_slice(),
&SessionContext::new(),
&session_context,
&CubeExtensionCodec {
worker_context: Some(WorkerContext {
remote_to_local_names,
Expand Down
Loading

0 comments on commit 971b4c6

Please sign in to comment.