Skip to content

Commit

Permalink
chore(cubestore): Upgrade DF: fix limit pushdown for LastRowByKey
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov committed Dec 2, 2024
1 parent 019c5f2 commit faf7d75
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 19 deletions.
16 changes: 8 additions & 8 deletions rust/cubestore/cubestore-sql-tests/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8647,12 +8647,12 @@ async fn limit_pushdown_unique_key(service: Box<dyn SqlClient>) {
//===========================
let res = assert_limit_pushdown(
&service,
"SELECT a, b, SUM(c) FROM (
"SELECT a FROM (SELECT a, b, SUM(c) FROM (
SELECT * FROM foo.pushdown_where_group1
union all
SELECT * FROM foo.pushdown_where_group2
) as `tb`
GROUP BY 1, 2 ORDER BY 1 LIMIT 3",
GROUP BY 1, 2 ORDER BY 1 LIMIT 3) x",
Some("ind1"),
true,
false,
Expand All @@ -8665,18 +8665,18 @@ async fn limit_pushdown_unique_key(service: Box<dyn SqlClient>) {
vec![
Row::new(vec![
TableValue::Int(11),
TableValue::Int(18),
TableValue::Int(3)
// TableValue::Int(18),
// TableValue::Int(3)
]),
Row::new(vec![
TableValue::Int(11),
TableValue::Int(45),
TableValue::Int(1)
// TableValue::Int(45),
// TableValue::Int(1)
]),
Row::new(vec![
TableValue::Int(12),
TableValue::Int(20),
TableValue::Int(4)
// TableValue::Int(20),
// TableValue::Int(4)
]),
]
);
Expand Down
12 changes: 6 additions & 6 deletions rust/cubestore/cubestore/src/queryplanner/merge_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,11 @@ impl LastRowByUniqueKeyExec {
"Empty unique_key passed for LastRowByUniqueKeyExec".to_string(),
));
}
let schema = input.schema();
let properties = input.properties().clone();
Ok(Self {
input,
unique_key,
properties: PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
),
properties,
})
}

Expand Down Expand Up @@ -83,6 +79,10 @@ impl ExecutionPlan for LastRowByUniqueKeyExec {
&self.properties
}

fn maintains_input_order(&self) -> Vec<bool> {
vec![true]
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
Expand Down
18 changes: 13 additions & 5 deletions rust/cubestore/cubestore/src/queryplanner/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,11 +691,19 @@ impl CubeTable {
)));
}
}
Arc::new(MemoryExec::try_new(
&[record_batches.clone()],
index_projection_schema.clone(),
index_projection_or_none_on_schema_match.clone(),
)?)
Arc::new(
MemoryExec::try_new(
&[record_batches.clone()],
index_projection_schema.clone(),
index_projection_or_none_on_schema_match.clone(),
)?
.with_sort_information(vec![
lex_ordering_for_index(
self.index_snapshot.index.get_row(),
&index_projection_schema,
)?,
]),
)
} else {
let remote_path = chunk.get_row().get_full_name(chunk.get_id());
let local_path = self
Expand Down

0 comments on commit faf7d75

Please sign in to comment.