Skip to content

Commit

Permalink
fix: address CR
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Sep 7, 2023
1 parent 34669aa commit 3c05f0e
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ CREATE TABLE `random_partition_table_t`(
`value` double NOT NULL,
`t` timestamp NOT NULL,
TIMESTAMP KEY(t)
) PARTITIONS 4 ENGINE = Analytic with (enable_ttl='false', update_mode="APPEND");
) PARTITION BY RANDOM PARTITIONS 4 ENGINE = Analytic with (enable_ttl='false', update_mode="APPEND");

affected_rows: 0

Expand Down Expand Up @@ -163,7 +163,7 @@ CREATE TABLE `random_partition_table_t_overwrite`(
`value` double NOT NULL,
`t` timestamp NOT NULL,
TIMESTAMP KEY(t)
) PARTITIONS 4 ENGINE = Analytic with (enable_ttl='false', update_mode="OVERWRITE");
) PARTITION BY RANDOM PARTITIONS 4 ENGINE = Analytic with (enable_ttl='false', update_mode="OVERWRITE");

Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute plan, sql: CREATE TABLE `random_partition_table_t_overwrite`( \n `name`string TAG, \n `id` int TAG, \n `value` double NOT NULL, \n `t` timestamp NOT NULL, \n TIMESTAMP KEY(t) \n ) PARTITIONS 4 ENGINE = Analytic with (enable_ttl='false', update_mode=\"OVERWRITE\");. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:invalid parameters to create table, plan:CreateTablePlan { engine: \"Analytic\", if_not_exists: false, table: \"random_partition_table_t_overwrite\", table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, is_dictionary: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, is_dictionary: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"name\", data_type: String, is_nullable: true, is_tag: true, is_dictionary: false, comment: \"\", escaped_name: \"name\", default_value: None }, ColumnSchema { id: 4, name: \"id\", data_type: Int32, is_nullable: true, is_tag: true, is_dictionary: false, comment: \"\", escaped_name: \"id\", default_value: None }, ColumnSchema { id: 5, name: \"value\", data_type: Double, is_nullable: false, is_tag: false, is_dictionary: false, comment: \"\", escaped_name: \"value\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, options: {\"enable_ttl\": \"false\", \"update_mode\": \"OVERWRITE\"} }, err:Invalid arguments, table:random_partition_table_t_overwrite, err:Try to create a random partition table in overwrite mode, table:random_partition_table_t_overwrite." })
Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute plan, sql: CREATE TABLE `random_partition_table_t_overwrite`( \n `name`string TAG, \n `id` int TAG, \n `value` double NOT NULL, \n `t` timestamp NOT NULL, \n TIMESTAMP KEY(t) \n ) PARTITION BY RANDOM PARTITIONS 4 ENGINE = Analytic with (enable_ttl='false', update_mode=\"OVERWRITE\");. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:invalid parameters to create table, plan:CreateTablePlan { engine: \"Analytic\", if_not_exists: false, table: \"random_partition_table_t_overwrite\", table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, is_dictionary: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, is_dictionary: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"name\", data_type: String, is_nullable: true, is_tag: true, is_dictionary: false, comment: \"\", escaped_name: \"name\", default_value: None }, ColumnSchema { id: 4, name: \"id\", data_type: Int32, is_nullable: true, is_tag: true, is_dictionary: false, comment: \"\", escaped_name: \"id\", default_value: None }, ColumnSchema { id: 5, name: \"value\", data_type: Double, is_nullable: false, is_tag: false, is_dictionary: false, comment: \"\", escaped_name: \"value\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, options: {\"enable_ttl\": \"false\", \"update_mode\": \"OVERWRITE\"} }, err:Invalid arguments, table:random_partition_table_t_overwrite, err:Try to create a random partition table in overwrite mode, table:random_partition_table_t_overwrite." })

4 changes: 2 additions & 2 deletions integration_tests/cases/env/cluster/ddl/partition_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ CREATE TABLE `random_partition_table_t`(
`value` double NOT NULL,
`t` timestamp NOT NULL,
TIMESTAMP KEY(t)
) PARTITIONS 4 ENGINE = Analytic with (enable_ttl='false', update_mode="APPEND");
) PARTITION BY RANDOM PARTITIONS 4 ENGINE = Analytic with (enable_ttl='false', update_mode="APPEND");

SHOW CREATE TABLE random_partition_table_t;

Expand Down Expand Up @@ -85,4 +85,4 @@ CREATE TABLE `random_partition_table_t_overwrite`(
`value` double NOT NULL,
`t` timestamp NOT NULL,
TIMESTAMP KEY(t)
) PARTITIONS 4 ENGINE = Analytic with (enable_ttl='false', update_mode="OVERWRITE");
) PARTITION BY RANDOM PARTITIONS 4 ENGINE = Analytic with (enable_ttl='false', update_mode="OVERWRITE");
34 changes: 17 additions & 17 deletions query_frontend/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,14 +332,8 @@ impl<'a> Parser<'a> {
let table_name = self.parser.parse_object_name()?.into();
let (columns, constraints) = self.parse_columns()?;

// Parse the simple partition rule, starting with `Partitions $Num`.
let partition = match self.maybe_parse_simple_partition()? {
Some(p) => Some(p),
None => {
// Parse the complex partition rule, starting with `PARTITION BY`
self.maybe_parse_complex_partition(Keyword::PARTITION, &columns)?
}
};
// Parse the partition clause, starting with `PARTITION BY ...`
let partition = self.maybe_parse_partition(Keyword::PARTITION, &columns)?;

// ENGINE = ...
let engine = self.parse_table_engine()?;
Expand Down Expand Up @@ -568,15 +562,7 @@ impl<'a> Parser<'a> {
}
}

fn maybe_parse_simple_partition(&mut self) -> Result<Option<Partition>> {
let partition = self
.parse_partition_num()?
.map(|partition_num| Partition::Random(RandomPartition { partition_num }));

Ok(partition)
}

fn maybe_parse_complex_partition(
fn maybe_parse_partition(
&mut self,
keyword: Keyword,
columns: &[ColumnDef],
Expand All @@ -595,6 +581,10 @@ impl<'a> Parser<'a> {
&mut self,
columns: &[ColumnDef],
) -> Result<Option<Partition>> {
if let Some(key) = self.maybe_parse_and_check_random_partition()? {
return Ok(Some(Partition::Random(key)));
}

if let Some(key) = self.maybe_parse_and_check_key_partition(columns)? {
return Ok(Some(Partition::Key(key)));
}
Expand All @@ -605,6 +595,16 @@ impl<'a> Parser<'a> {
Ok(None)
}

fn maybe_parse_and_check_random_partition(&mut self) -> Result<Option<RandomPartition>> {
if !self.consume_token("RANDOM") {
return Ok(None);
}

// Parse the clause `PARTITIONS ...`.
let partition_num = self.parse_partition_num()?.unwrap_or(1);
Ok(Some(RandomPartition { partition_num }))
}

fn maybe_parse_and_check_hash_partition(
&mut self,
columns: &[ColumnDef],
Expand Down
5 changes: 4 additions & 1 deletion table_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,10 @@ pub trait TableEngine: Send + Sync {
/// Close the engine gracefully.
async fn close(&self) -> Result<()>;

/// Validate the request of create table.
/// Validate the params used to create a table.
///
/// This validation can be used before doing real table creation to avoid
/// unnecessary works if the params is invalid.
async fn validate_create_table(&self, request: &CreateTableParams) -> Result<()>;

/// Create table
Expand Down
8 changes: 8 additions & 0 deletions table_engine/src/partition/rule/df_adapter/extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ pub trait FilterExtractor: Send + Sync + 'static {
fn extract(&self, filters: &[Expr], columns: &[String]) -> Vec<PartitionFilter>;
}

pub struct NoopExtractor;

impl FilterExtractor for NoopExtractor {
fn extract(&self, _filters: &[Expr], _columns: &[String]) -> Vec<PartitionFilter> {
vec![]
}
}

pub struct KeyExtractor;

impl FilterExtractor for KeyExtractor {
Expand Down
15 changes: 6 additions & 9 deletions table_engine/src/partition/rule/df_adapter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use common_types::{row::RowGroup, schema::Schema};
use datafusion::logical_expr::Expr;

use self::extractor::KeyExtractor;
use self::extractor::{KeyExtractor, NoopExtractor};
use crate::partition::{
rule::{
df_adapter::extractor::FilterExtractorRef, factory::PartitionRuleFactory, PartitionRuleRef,
Expand All @@ -33,7 +33,7 @@ pub struct DfPartitionRuleAdapter {
rule: PartitionRuleRef,

/// `PartitionFilter` extractor for datafusion `Expr`
extractor: Option<FilterExtractorRef>,
extractor: FilterExtractorRef,
}

impl DfPartitionRuleAdapter {
Expand All @@ -55,23 +55,20 @@ impl DfPartitionRuleAdapter {
pub fn locate_partitions_for_read(&self, filters: &[Expr]) -> Result<Vec<usize>> {
// Extract partition filters from datafusion filters.
let columns = self.columns();
let partition_filters = match &self.extractor {
Some(extractor) => extractor.extract(filters, &columns),
None => vec![],
};
let partition_filters = self.extractor.extract(filters, &columns);

// Locate partitions from filters.
self.rule.locate_partitions_for_read(&partition_filters)
}

fn create_extractor(partition_info: &PartitionInfo) -> Result<Option<FilterExtractorRef>> {
fn create_extractor(partition_info: &PartitionInfo) -> Result<FilterExtractorRef> {
match partition_info {
PartitionInfo::Key(_) => Ok(Some(Box::new(KeyExtractor))),
PartitionInfo::Key(_) => Ok(Box::new(KeyExtractor)),
PartitionInfo::Hash(_) => BuildPartitionRule {
msg: format!("unsupported partition strategy, strategy:{partition_info:?}"),
}
.fail(),
PartitionInfo::Random(_) => Ok(None),
PartitionInfo::Random(_) => Ok(Box::new(NoopExtractor)),
}
}
}
Expand Down

0 comments on commit 3c05f0e

Please sign in to comment.