Skip to content

Commit

Permalink
chore(cubestore): Upgrade DF: fix create table with location tests
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov committed Nov 28, 2024
1 parent 9f11549 commit 2c01a51
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 155 deletions.
2 changes: 1 addition & 1 deletion rust/cubestore/cubestore-sql-tests/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2263,7 +2263,7 @@ async fn create_table_with_url(service: Box<dyn SqlClient>) {
.exec_query("CREATE SCHEMA IF NOT EXISTS foo")
.await
.unwrap();
let create_table_sql = format!("CREATE TABLE foo.bikes (`Response ID` int, `Start Date` text, `End Date` text) LOCATION '{}'", url);
let create_table_sql = format!("CREATE TABLE foo.bikes (`Response ID` int, `Start Date` text, `End Date` text) WITH (input_format = 'csv') LOCATION '{}'", url);
let (_, query_result) = tokio::join!(
service.exec_query(&create_table_sql),
service.exec_query("SELECT count(*) from foo.bikes")
Expand Down
23 changes: 23 additions & 0 deletions rust/cubestore/cubestore/src/queryplanner/planning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use datafusion::logical_expr::{
expr, Aggregate, BinaryExpr, Expr, Extension, Filter, Join, Limit, LogicalPlan, Operator,
Projection, Sort, SortExpr, SubqueryAlias, TableScan, Union, UserDefinedLogicalNode,
};
use datafusion::physical_expr::{Distribution, LexRequirement};
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
use serde::{Deserialize as SerdeDeser, Deserializer, Serialize as SerdeSer, Serializer};
Expand Down Expand Up @@ -1720,6 +1721,28 @@ impl ExecutionPlan for WorkerExec {
fn properties(&self) -> &PlanProperties {
self.input.properties()
}

fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition; self.children().len()]
}

fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
let input_ordering = self.input.required_input_ordering();
if !input_ordering.is_empty() {
vec![input_ordering[0].clone()]
} else {
vec![None]
}
}

fn maintains_input_order(&self) -> Vec<bool> {
let maintains_input_order = self.input.maintains_input_order();
if !maintains_input_order.is_empty() {
vec![maintains_input_order[0]]
} else {
vec![false]
}
}
}

/// Use this to pick the part of the plan that the worker must execute.
Expand Down
55 changes: 32 additions & 23 deletions rust/cubestore/cubestore/src/queryplanner/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ use crate::util::memory::MemoryHandler;
use crate::{app_metrics, CubeError};
use async_trait::async_trait;
use core::fmt;
use datafusion::arrow::array::{make_array, Array, ArrayRef, BinaryArray, BooleanArray, Decimal128Array, Float64Array, Int16Array, Int32Array, Int64Array, MutableArrayData, StringArray, TimestampMicrosecondArray, TimestampNanosecondArray, UInt16Array, UInt32Array, UInt64Array};
use datafusion::arrow::array::{
make_array, Array, ArrayRef, BinaryArray, BooleanArray, Decimal128Array, Float64Array,
Int16Array, Int32Array, Int64Array, MutableArrayData, StringArray, TimestampMicrosecondArray,
TimestampNanosecondArray, UInt16Array, UInt32Array, UInt64Array,
};
use datafusion::arrow::compute::SortOptions;
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use datafusion::arrow::ipc::reader::StreamReader;
Expand All @@ -43,9 +47,11 @@ use datafusion::execution::{SessionStateBuilder, TaskContext};
use datafusion::logical_expr::{Expr, LogicalPlan};
use datafusion::physical_expr;
use datafusion::physical_expr::{
expressions, EquivalenceProperties, LexRequirement, PhysicalSortExpr, PhysicalSortRequirement,
expressions, Distribution, EquivalenceProperties, LexRequirement, PhysicalSortExpr,
PhysicalSortRequirement,
};
use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::projection::ProjectionExec;
Expand Down Expand Up @@ -607,15 +613,13 @@ impl CubeTable {
.get(remote_path.as_str())
.expect(format!("Missing remote path {}", remote_path).as_str());

let file_scan = FileScanConfig::new(
ObjectStoreUrl::local_filesystem(),
index_schema.clone(),
)
.with_file(PartitionedFile::from_path(local_path.to_string())?)
.with_projection(index_projection_or_none_on_schema_match.clone())
.with_output_ordering(vec![(0..key_len)
.map(|i| -> Result<_, DataFusionError> {
Ok(PhysicalSortExpr::new(
let file_scan =
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), index_schema.clone())
.with_file(PartitionedFile::from_path(local_path.to_string())?)
.with_projection(index_projection_or_none_on_schema_match.clone())
.with_output_ordering(vec![(0..key_len)
.map(|i| -> Result<_, DataFusionError> {
Ok(PhysicalSortExpr::new(
Arc::new(
datafusion::physical_expr::expressions::Column::new_with_schema(
index_schema.field(i).name(),
Expand All @@ -624,8 +628,8 @@ impl CubeTable {
),
SortOptions::default(),
))
})
.collect::<Result<Vec<_>, _>>()?]);
})
.collect::<Result<Vec<_>, _>>()?]);
let parquet_exec = ParquetExecBuilder::new(file_scan)
.with_parquet_file_reader_factory(self.parquet_metadata_cache.clone())
.build();
Expand Down Expand Up @@ -982,7 +986,7 @@ impl ExecutionPlan for CubeTableExec {
sort_order = None
}
}
vec![sort_order.map(|order| {
let order = sort_order.map(|order| {
order
.into_iter()
.map(|col_index| {
Expand All @@ -999,7 +1003,9 @@ impl ExecutionPlan for CubeTableExec {
))
})
.collect()
})]
});

(0..self.children().len()).map(|_| order.clone()).collect()
}

// TODO upgrade DF
Expand Down Expand Up @@ -1070,6 +1076,10 @@ impl ExecutionPlan for CubeTableExec {
fn maintains_input_order(&self) -> Vec<bool> {
vec![true; self.children().len()]
}

fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition; self.children().len()]
}
}

pub fn lex_ordering_for_index(
Expand Down Expand Up @@ -1540,6 +1550,10 @@ impl ExecutionPlan for ClusterSendExec {
vec![false]
}
}

fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition; self.children().len()]
}
}

impl fmt::Debug for ClusterSendExec {
Expand Down Expand Up @@ -1704,14 +1718,9 @@ pub fn batches_to_dataframe(batches: Vec<RecordBatch>) -> Result<DataFrame, Cube
}
}
// TODO upgrade DF
DataType::Decimal128(_, _) => convert_array!(
array,
num_rows,
rows,
Decimal128Array,
Decimal,
(Decimal)
),
DataType::Decimal128(_, _) => {
convert_array!(array, num_rows, rows, Decimal128Array, Decimal, (Decimal))
}
// DataType::Int64Decimal(1) => convert_array!(
// array,
// num_rows,
Expand Down
Loading

0 comments on commit 2c01a51

Please sign in to comment.