Skip to content

Commit

Permalink
fix: auto create table (#895)
Browse files Browse the repository at this point in the history
  • Loading branch information
chunshao90 authored May 18, 2023
1 parent 1d08bd6 commit 45c2e89
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,3 @@ DROP TABLE IF EXISTS `partition_table_t`;

affected_rows: 0

SHOW CREATE TABLE partition_table_t;

Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to create plan, query: SHOW CREATE TABLE partition_table_t;. Caused by: Failed to create plan, err:Table not found, table:partition_table_t" })

3 changes: 2 additions & 1 deletion integration_tests/cases/env/cluster/ddl/partition_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ SELECT * from partition_table_t where name in ("ceresdb5", "ceresdb6", "ceresdb7

DROP TABLE IF EXISTS `partition_table_t`;

SHOW CREATE TABLE partition_table_t;
-- The route cache will cause the data table to be queried after it is deleted. Refer to #893.
-- SHOW CREATE TABLE partition_table_t;
6 changes: 6 additions & 0 deletions meta_client/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ pub struct TableInfo {
pub partition_info: Option<PartitionInfo>,
}

impl TableInfo {
pub fn is_partition_table(&self) -> bool {
self.partition_info.is_some()
}
}

impl TryFrom<meta_service_pb::TableInfo> for TableInfo {
type Error = Error;

Expand Down
7 changes: 4 additions & 3 deletions proxy/src/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ mod tests {
use catalog::consts::DEFAULT_SCHEMA;
use ceresdbproto::storage::{Route, SqlQueryRequest, SqlQueryResponse};
use futures::FutureExt;
use router::{PartitionTableInfo, Router};
use meta_client::types::TableInfo;
use router::Router;
use tonic::IntoRequest;

use super::*;
Expand Down Expand Up @@ -392,11 +393,11 @@ mod tests {
}
}

async fn fetch_partition_table_info(
async fn fetch_table_info(
&self,
_schema: &str,
_table: &str,
) -> router::Result<Option<PartitionTableInfo>> {
) -> router::Result<Option<TableInfo>> {
return Ok(None);
}
}
Expand Down
19 changes: 13 additions & 6 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,19 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
msg: format!("Failed to find table, table_name:{table_name}"),
})?;

let partition_table_info_in_meta = self
let table_info_in_meta = self
.router
.fetch_partition_table_info(schema_name, table_name)
.fetch_table_info(schema_name, table_name)
.await?;

match (table, &partition_table_info_in_meta) {
if let Some(table_info_in_meta) = &table_info_in_meta {
// No need to handle non-partition table.
if !table_info_in_meta.is_partition_table() {
return Ok(());
}
}

match (table, &table_info_in_meta) {
(Some(table), Some(partition_table_info)) => {
// No need to create partition table when table_id match.
if table.id().as_u64() == partition_table_info.id {
Expand Down Expand Up @@ -237,13 +244,13 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
(None, Some(_)) => (),
}

let partition_table_info = partition_table_info_in_meta.unwrap();
let partition_table_info = table_info_in_meta.unwrap();

// If table not exists, open it.
// Get table_schema from first sub partition table.
let first_sub_partition_table_name = util::get_sub_partition_name(
&partition_table_info.name,
&partition_table_info.partition_info,
partition_table_info.partition_info.as_ref().unwrap(),
0usize,
);
let table = self
Expand Down Expand Up @@ -276,7 +283,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
options: table.options,
state: TableState::Stable,
shard_id: DEFAULT_SHARD_ID,
partition_info: Some(partition_table_info.partition_info),
partition_info: partition_table_info.partition_info,
};
let create_opts = CreateOptions {
table_engine: self.instance.partition_table_engine.clone(),
Expand Down
143 changes: 89 additions & 54 deletions proxy/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,16 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
ctx: Context,
req: WriteRequest,
) -> Result<WriteResponse> {
let request_id = RequestId::next_id();

let write_context = req.context.clone().context(ErrNoCause {
msg: "Missing context",
code: StatusCode::BAD_REQUEST,
})?;

self.handle_auto_create_table(request_id, &write_context.database, &req)
.await?;

let (write_request_to_local, write_requests_to_forward) =
self.split_write_request(req).await?;

Expand All @@ -88,8 +93,11 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {

// Write to local.
if !write_request_to_local.table_requests.is_empty() {
let local_handle =
async move { Ok(self.write_to_local(ctx, write_request_to_local).await) };
let local_handle = async move {
Ok(self
.write_to_local(ctx, request_id, write_request_to_local)
.await)
};
futures.push(local_handle.boxed());
}

Expand Down Expand Up @@ -234,8 +242,12 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
}
}

async fn write_to_local(&self, ctx: Context, req: WriteRequest) -> Result<WriteResponse> {
let request_id = RequestId::next_id();
async fn write_to_local(
&self,
ctx: Context,
request_id: RequestId,
req: WriteRequest,
) -> Result<WriteResponse> {
let begin_instant = Instant::now();
let deadline = ctx.timeout.map(|t| begin_instant + t);
let catalog = self.instance.catalog_manager.default_catalog_name();
Expand Down Expand Up @@ -306,59 +318,38 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
let table_name = &write_table_req.table;
self.maybe_open_partition_table_if_not_exist(&catalog, &schema, table_name)
.await?;
let mut table = self.try_get_table(&catalog, &schema, table_name)?;

match table.clone() {
None => {
if auto_create_table {
self.create_table(
request_id,
&catalog,
&schema,
&write_table_req,
&schema_config,
deadline,
)
.await?;
// try to get table again
table = self.try_get_table(&catalog, &schema, table_name)?;
}
}
Some(t) => {
if auto_create_table {
// The reasons for making the decision to add columns before writing are as
// follows:
// * If judged based on the error message returned, it may cause data that
// has already been successfully written to be written again and affect
// the accuracy of the data.
// * Currently, the decision to add columns is made at the request level,
// not at the row level, so the cost is relatively small.
let table_schema = t.schema();
let columns =
find_new_columns(&table_schema, &schema_config, &write_table_req)?;
if !columns.is_empty() {
self.execute_add_columns_plan(
request_id, &catalog, &schema, t, columns, deadline,
)
.await?;
}
}
let table = self
.try_get_table(&catalog, &schema, table_name)?
.with_context(|| ErrNoCause {
code: StatusCode::BAD_REQUEST,
msg: format!("Table not found, schema:{schema}, table:{table_name}"),
})?;

if auto_create_table {
// The reasons for making the decision to add columns before writing are as
// follows:
// * If judged based on the error message returned, it may cause data that has
// already been successfully written to be written again and affect the
// accuracy of the data.
// * Currently, the decision to add columns is made at the request level, not at
// the row level, so the cost is relatively small.
let table_schema = table.schema();
let columns = find_new_columns(&table_schema, &schema_config, &write_table_req)?;
if !columns.is_empty() {
self.execute_add_columns_plan(
request_id,
&catalog,
&schema,
table.clone(),
columns,
deadline,
)
.await?;
}
}

match table {
Some(table) => {
let plan = write_table_request_to_insert_plan(table, write_table_req)?;
plan_vec.push(plan);
}
None => {
return ErrNoCause {
code: StatusCode::BAD_REQUEST,
msg: format!("Table not found, schema:{schema}, table:{table_name}"),
}
.fail();
}
}
let plan = write_table_request_to_insert_plan(table, write_table_req)?;
plan_vec.push(plan);
}

Ok(plan_vec)
Expand Down Expand Up @@ -427,6 +418,50 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
})
}

async fn handle_auto_create_table(
&self,
request_id: RequestId,
schema: &str,
req: &WriteRequest,
) -> Result<()> {
if !self.auto_create_table {
return Ok(());
}

let schema_config = self
.schema_config_provider
.schema_config(schema)
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Fail to fetch schema config, schema_name:{schema}"),
})?
.cloned()
.unwrap_or_default();

// TODO: Consider whether to build tables concurrently when there are too many
// tables.
for write_table_req in &req.table_requests {
let table_info = self
.router
.fetch_table_info(schema, &write_table_req.table)
.await?;
if table_info.is_some() {
continue;
}
self.create_table(
request_id,
self.instance.catalog_manager.default_catalog_name(),
schema,
write_table_req,
&schema_config,
None,
)
.await?;
}
Ok(())
}

async fn create_table(
&self,
request_id: RequestId,
Expand Down
Loading

0 comments on commit 45c2e89

Please sign in to comment.