From fe5d3eb295ddb9f28f7f0dbe1b0c54dab52c46e4 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 27 Mar 2024 06:48:06 +0000 Subject: [PATCH 1/5] refactor: minimize visibility of drop database steps --- src/common/meta/src/ddl/drop_database.rs | 4 ++-- src/common/meta/src/ddl/drop_database/cursor.rs | 2 +- src/common/meta/src/ddl/drop_database/end.rs | 2 +- src/common/meta/src/ddl/drop_database/executor.rs | 2 +- src/common/meta/src/ddl/drop_database/metadata.rs | 2 +- src/common/meta/src/ddl/drop_database/start.rs | 2 +- 6 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/common/meta/src/ddl/drop_database.rs b/src/common/meta/src/ddl/drop_database.rs index 6454a403e327..31da6e444dd8 100644 --- a/src/common/meta/src/ddl/drop_database.rs +++ b/src/common/meta/src/ddl/drop_database.rs @@ -44,13 +44,13 @@ pub struct DropDatabaseProcedure { /// Target of dropping tables. #[derive(Debug, Clone, Copy, Serialize, Deserialize)] -pub enum DropTableTarget { +pub(crate) enum DropTableTarget { Logical, Physical, } /// Context of [DropDatabaseProcedure] execution. -pub struct DropDatabaseContext { +pub(crate) struct DropDatabaseContext { catalog: String, schema: String, drop_if_exists: bool, diff --git a/src/common/meta/src/ddl/drop_database/cursor.rs b/src/common/meta/src/ddl/drop_database/cursor.rs index 5ea7a19585fa..23d6bc09267a 100644 --- a/src/common/meta/src/ddl/drop_database/cursor.rs +++ b/src/common/meta/src/ddl/drop_database/cursor.rs @@ -27,7 +27,7 @@ use crate::key::table_route::TableRouteValue; use crate::table_name::TableName; #[derive(Debug, Serialize, Deserialize)] -pub struct DropDatabaseCursor { +pub(crate) struct DropDatabaseCursor { target: DropTableTarget, } diff --git a/src/common/meta/src/ddl/drop_database/end.rs b/src/common/meta/src/ddl/drop_database/end.rs index 39e5c1a1add2..8f1f68212827 100644 --- a/src/common/meta/src/ddl/drop_database/end.rs +++ b/src/common/meta/src/ddl/drop_database/end.rs @@ -20,7 +20,7 @@ use crate::ddl::DdlContext; use crate::error::Result; #[derive(Debug, Serialize, Deserialize)] -pub struct DropDatabaseEnd; +pub(crate) struct DropDatabaseEnd; #[async_trait::async_trait] #[typetag::serde] diff --git a/src/common/meta/src/ddl/drop_database/executor.rs b/src/common/meta/src/ddl/drop_database/executor.rs index 0bbdc2271955..451412633698 100644 --- a/src/common/meta/src/ddl/drop_database/executor.rs +++ b/src/common/meta/src/ddl/drop_database/executor.rs @@ -29,7 +29,7 @@ use crate::rpc::router::{operating_leader_regions, RegionRoute}; use crate::table_name::TableName; #[derive(Debug, Serialize, Deserialize)] -pub struct DropDatabaseExecutor { +pub(crate) struct DropDatabaseExecutor { table_id: TableId, table_name: TableName, region_routes: Vec, diff --git a/src/common/meta/src/ddl/drop_database/metadata.rs b/src/common/meta/src/ddl/drop_database/metadata.rs index 37129f16d566..27b0b6b9a2b5 100644 --- a/src/common/meta/src/ddl/drop_database/metadata.rs +++ b/src/common/meta/src/ddl/drop_database/metadata.rs @@ -22,7 +22,7 @@ use crate::error::Result; use crate::key::schema_name::SchemaNameKey; #[derive(Debug, Serialize, Deserialize)] -pub struct DropDatabaseRemoveMetadata; +pub(crate) struct DropDatabaseRemoveMetadata; #[async_trait::async_trait] #[typetag::serde] diff --git a/src/common/meta/src/ddl/drop_database/start.rs b/src/common/meta/src/ddl/drop_database/start.rs index 2c20517fb5f1..76de13a8ae2a 100644 --- a/src/common/meta/src/ddl/drop_database/start.rs +++ b/src/common/meta/src/ddl/drop_database/start.rs @@ -24,7 +24,7 @@ use crate::error::{self, Result}; use crate::key::schema_name::SchemaNameKey; #[derive(Debug, Serialize, Deserialize)] -pub struct DropDatabaseStart; +pub(crate) struct DropDatabaseStart; #[async_trait::async_trait] #[typetag::serde] From 3e515fa0c1e2788de3093a0b38bf9c025753bdc1 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 27 Mar 2024 07:08:37 +0000 Subject: [PATCH 2/5] feat: implement as_any --- src/common/meta/src/ddl/drop_database.rs | 4 ++++ src/common/meta/src/ddl/drop_database/cursor.rs | 6 ++++++ src/common/meta/src/ddl/drop_database/end.rs | 6 ++++++ src/common/meta/src/ddl/drop_database/executor.rs | 6 ++++++ src/common/meta/src/ddl/drop_database/metadata.rs | 6 ++++++ src/common/meta/src/ddl/drop_database/start.rs | 6 ++++++ 6 files changed, 34 insertions(+) diff --git a/src/common/meta/src/ddl/drop_database.rs b/src/common/meta/src/ddl/drop_database.rs index 31da6e444dd8..911037677af7 100644 --- a/src/common/meta/src/ddl/drop_database.rs +++ b/src/common/meta/src/ddl/drop_database.rs @@ -17,6 +17,7 @@ pub mod end; pub mod executor; pub mod metadata; pub mod start; +use std::any::Any; use std::fmt::Debug; use common_procedure::error::{Error as ProcedureError, FromJsonSnafu, ToJsonSnafu}; @@ -66,6 +67,9 @@ pub(crate) trait State: Send + Debug { ddl_ctx: &DdlContext, ctx: &mut DropDatabaseContext, ) -> Result<(Box, Status)>; + + /// Returns as [Any](std::any::Any). + fn as_any(&self) -> &dyn Any; } impl DropDatabaseProcedure { diff --git a/src/common/meta/src/ddl/drop_database/cursor.rs b/src/common/meta/src/ddl/drop_database/cursor.rs index 23d6bc09267a..e0447c02af5c 100644 --- a/src/common/meta/src/ddl/drop_database/cursor.rs +++ b/src/common/meta/src/ddl/drop_database/cursor.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; + use common_procedure::Status; use futures::TryStreamExt; use serde::{Deserialize, Serialize}; @@ -141,4 +143,8 @@ impl State for DropDatabaseCursor { None => self.handle_reach_end(ctx), } } + + fn as_any(&self) -> &dyn Any { + self + } } diff --git a/src/common/meta/src/ddl/drop_database/end.rs b/src/common/meta/src/ddl/drop_database/end.rs index 8f1f68212827..577a0d6c8c0f 100644 --- a/src/common/meta/src/ddl/drop_database/end.rs +++ b/src/common/meta/src/ddl/drop_database/end.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; + use common_procedure::Status; use serde::{Deserialize, Serialize}; @@ -32,4 +34,8 @@ impl State for DropDatabaseEnd { ) -> Result<(Box, Status)> { Ok((Box::new(DropDatabaseEnd), Status::done())) } + + fn as_any(&self) -> &dyn Any { + self + } } diff --git a/src/common/meta/src/ddl/drop_database/executor.rs b/src/common/meta/src/ddl/drop_database/executor.rs index 451412633698..7585e72c77f2 100644 --- a/src/common/meta/src/ddl/drop_database/executor.rs +++ b/src/common/meta/src/ddl/drop_database/executor.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; + use common_procedure::Status; use common_telemetry::info; use serde::{Deserialize, Serialize}; @@ -99,4 +101,8 @@ impl State for DropDatabaseExecutor { Status::executing(false), )) } + + fn as_any(&self) -> &dyn Any { + self + } } diff --git a/src/common/meta/src/ddl/drop_database/metadata.rs b/src/common/meta/src/ddl/drop_database/metadata.rs index 27b0b6b9a2b5..78ff6f069c08 100644 --- a/src/common/meta/src/ddl/drop_database/metadata.rs +++ b/src/common/meta/src/ddl/drop_database/metadata.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; + use common_procedure::Status; use serde::{Deserialize, Serialize}; @@ -40,4 +42,8 @@ impl State for DropDatabaseRemoveMetadata { return Ok((Box::new(DropDatabaseEnd), Status::done())); } + + fn as_any(&self) -> &dyn Any { + self + } } diff --git a/src/common/meta/src/ddl/drop_database/start.rs b/src/common/meta/src/ddl/drop_database/start.rs index 76de13a8ae2a..f7dc23db3f44 100644 --- a/src/common/meta/src/ddl/drop_database/start.rs +++ b/src/common/meta/src/ddl/drop_database/start.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; + use common_procedure::Status; use serde::{Deserialize, Serialize}; use snafu::ensure; @@ -62,4 +64,8 @@ impl State for DropDatabaseStart { Status::executing(true), )) } + + fn as_any(&self) -> &dyn Any { + self + } } From 0b8073398699dccc14a1f09b8227d9f1c5a3c5ae Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 27 Mar 2024 07:41:23 +0000 Subject: [PATCH 3/5] refactor: move common functions to test_util --- src/common/meta/src/ddl/test_util.rs | 155 ++++++++++++++++++ .../src/ddl/tests/alter_logical_tables.rs | 57 +------ .../src/ddl/tests/create_logical_tables.rs | 107 +----------- 3 files changed, 160 insertions(+), 159 deletions(-) diff --git a/src/common/meta/src/ddl/test_util.rs b/src/common/meta/src/ddl/test_util.rs index 0245d4fc905a..182693abf6c9 100644 --- a/src/common/meta/src/ddl/test_util.rs +++ b/src/common/meta/src/ddl/test_util.rs @@ -15,3 +15,158 @@ pub mod alter_table; pub mod columns; pub mod create_table; + +use std::collections::HashMap; + +use api::v1::meta::Partition; +use api::v1::{ColumnDataType, SemanticType}; +use common_procedure::Status; +use table::metadata::{RawTableInfo, TableId}; + +use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; +use crate::ddl::test_util::columns::TestColumnDefBuilder; +use crate::ddl::test_util::create_table::{ + build_raw_table_info_from_expr, TestCreateTableExprBuilder, +}; +use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext}; +use crate::key::table_route::TableRouteValue; +use crate::rpc::ddl::CreateTableTask; +use crate::ClusterId; + +pub async fn create_physical_table_metadata( + ddl_context: &DdlContext, + table_info: RawTableInfo, + table_route: TableRouteValue, +) { + ddl_context + .table_metadata_manager + .create_table_metadata(table_info, table_route, HashMap::default()) + .await + .unwrap(); +} + +pub async fn create_physical_table( + ddl_context: DdlContext, + cluster_id: ClusterId, + name: &str, +) -> TableId { + // Prepares physical table metadata. + let mut create_physical_table_task = test_create_physical_table_task(name); + let TableMetadata { + table_id, + table_route, + .. + } = ddl_context + .table_metadata_allocator + .create( + &TableMetadataAllocatorContext { cluster_id }, + &create_physical_table_task, + ) + .await + .unwrap(); + create_physical_table_task.set_table_id(table_id); + create_physical_table_metadata( + &ddl_context, + create_physical_table_task.table_info.clone(), + table_route, + ) + .await; + + table_id +} + +pub async fn create_logical_table( + ddl_context: DdlContext, + cluster_id: ClusterId, + physical_table_id: TableId, + table_name: &str, +) { + use std::assert_matches::assert_matches; + + let tasks = vec![test_create_logical_table_task(table_name)]; + let mut procedure = + CreateLogicalTablesProcedure::new(cluster_id, tasks, physical_table_id, ddl_context); + let status = procedure.on_prepare().await.unwrap(); + assert_matches!(status, Status::Executing { persist: true }); + let status = procedure.on_create_metadata().await.unwrap(); + assert_matches!(status, Status::Done { .. }); +} + +pub fn test_create_logical_table_task(name: &str) -> CreateTableTask { + let create_table = TestCreateTableExprBuilder::default() + .column_defs([ + TestColumnDefBuilder::default() + .name("ts") + .data_type(ColumnDataType::TimestampMillisecond) + .semantic_type(SemanticType::Timestamp) + .build() + .unwrap() + .into(), + TestColumnDefBuilder::default() + .name("host") + .data_type(ColumnDataType::String) + .semantic_type(SemanticType::Tag) + .build() + .unwrap() + .into(), + TestColumnDefBuilder::default() + .name("cpu") + .data_type(ColumnDataType::Float64) + .semantic_type(SemanticType::Field) + .build() + .unwrap() + .into(), + ]) + .time_index("ts") + .primary_keys(["host".into()]) + .table_name(name) + .build() + .unwrap() + .into(); + let table_info = build_raw_table_info_from_expr(&create_table); + CreateTableTask { + create_table, + // Single region + partitions: vec![Partition { + column_list: vec![], + value_list: vec![], + }], + table_info, + } +} + +pub fn test_create_physical_table_task(name: &str) -> CreateTableTask { + let create_table = TestCreateTableExprBuilder::default() + .column_defs([ + TestColumnDefBuilder::default() + .name("ts") + .data_type(ColumnDataType::TimestampMillisecond) + .semantic_type(SemanticType::Timestamp) + .build() + .unwrap() + .into(), + TestColumnDefBuilder::default() + .name("value") + .data_type(ColumnDataType::Float64) + .semantic_type(SemanticType::Field) + .build() + .unwrap() + .into(), + ]) + .time_index("ts") + .primary_keys(["value".into()]) + .table_name(name) + .build() + .unwrap() + .into(); + let table_info = build_raw_table_info_from_expr(&create_table); + CreateTableTask { + create_table, + // Single region + partitions: vec![Partition { + column_list: vec![], + value_list: vec![], + }], + table_info, + } +} diff --git a/src/common/meta/src/ddl/tests/alter_logical_tables.rs b/src/common/meta/src/ddl/tests/alter_logical_tables.rs index 9be0f4c584de..2c8c62cf52e4 100644 --- a/src/common/meta/src/ddl/tests/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/tests/alter_logical_tables.rs @@ -19,22 +19,16 @@ use api::v1::{ColumnDataType, SemanticType}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_procedure::{Procedure, ProcedureId, Status}; use common_procedure_test::MockContextProvider; -use table::metadata::TableId; use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; -use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; use crate::ddl::test_util::alter_table::TestAlterTableExprBuilder; use crate::ddl::test_util::columns::TestColumnDefBuilder; -use crate::ddl::tests::create_logical_tables; -use crate::ddl::tests::create_logical_tables::{ - test_create_physical_table_task, NaiveDatanodeHandler, -}; -use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext}; +use crate::ddl::test_util::{create_logical_table, create_physical_table}; +use crate::ddl::tests::create_logical_tables::NaiveDatanodeHandler; use crate::error::Error::{AlterLogicalTablesInvalidArguments, TableNotFound}; use crate::key::table_name::TableNameKey; use crate::rpc::ddl::AlterTableTask; use crate::test_util::{new_ddl_context, MockDatanodeManager}; -use crate::ClusterId; fn make_alter_logical_table_add_column_task( schema: Option<&str>, @@ -128,53 +122,6 @@ async fn test_on_prepare_check_alter_kind() { assert_matches!(err, AlterLogicalTablesInvalidArguments { .. }); } -async fn create_physical_table( - ddl_context: DdlContext, - cluster_id: ClusterId, - name: &str, -) -> TableId { - // Prepares physical table metadata. - let mut create_physical_table_task = test_create_physical_table_task(name); - let TableMetadata { - table_id, - table_route, - .. - } = ddl_context - .table_metadata_allocator - .create( - &TableMetadataAllocatorContext { cluster_id }, - &create_physical_table_task, - ) - .await - .unwrap(); - create_physical_table_task.set_table_id(table_id); - create_logical_tables::create_physical_table_metadata( - &ddl_context, - create_physical_table_task.table_info.clone(), - table_route, - ) - .await; - - table_id -} - -async fn create_logical_table( - ddl_context: DdlContext, - cluster_id: ClusterId, - physical_table_id: TableId, - table_name: &str, -) { - let tasks = vec![create_logical_tables::test_create_logical_table_task( - table_name, - )]; - let mut procedure = - CreateLogicalTablesProcedure::new(cluster_id, tasks, physical_table_id, ddl_context); - let status = procedure.on_prepare().await.unwrap(); - assert_matches!(status, Status::Executing { persist: true }); - let status = procedure.on_create_metadata().await.unwrap(); - assert_matches!(status, Status::Done { .. }); -} - #[tokio::test] async fn test_on_prepare_different_physical_table() { let cluster_id = 1; diff --git a/src/common/meta/src/ddl/tests/create_logical_tables.rs b/src/common/meta/src/ddl/tests/create_logical_tables.rs index 7223c2e43458..9c40f2972cb2 100644 --- a/src/common/meta/src/ddl/tests/create_logical_tables.rs +++ b/src/common/meta/src/ddl/tests/create_logical_tables.rs @@ -13,12 +13,9 @@ // limitations under the License. use std::assert_matches::assert_matches; -use std::collections::HashMap; use std::sync::Arc; -use api::v1::meta::Partition; use api::v1::region::{QueryRequest, RegionRequest}; -use api::v1::{ColumnDataType, SemanticType}; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status}; @@ -26,104 +23,18 @@ use common_procedure_test::MockContextProvider; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::debug; use store_api::storage::RegionId; -use table::metadata::RawTableInfo; use crate::datanode_manager::HandleResponse; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; -use crate::ddl::test_util::columns::TestColumnDefBuilder; -use crate::ddl::test_util::create_table::{ - build_raw_table_info_from_expr, TestCreateTableExprBuilder, +use crate::ddl::test_util::{ + create_physical_table_metadata, test_create_logical_table_task, test_create_physical_table_task, }; -use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext}; +use crate::ddl::{TableMetadata, TableMetadataAllocatorContext}; use crate::error::{Error, Result}; use crate::key::table_route::TableRouteValue; use crate::peer::Peer; -use crate::rpc::ddl::CreateTableTask; use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager}; -// Note: this code may be duplicated with others. -// However, it's by design, ensures the tests are easy to be modified or added. -pub(crate) fn test_create_logical_table_task(name: &str) -> CreateTableTask { - let create_table = TestCreateTableExprBuilder::default() - .column_defs([ - TestColumnDefBuilder::default() - .name("ts") - .data_type(ColumnDataType::TimestampMillisecond) - .semantic_type(SemanticType::Timestamp) - .build() - .unwrap() - .into(), - TestColumnDefBuilder::default() - .name("host") - .data_type(ColumnDataType::String) - .semantic_type(SemanticType::Tag) - .build() - .unwrap() - .into(), - TestColumnDefBuilder::default() - .name("cpu") - .data_type(ColumnDataType::Float64) - .semantic_type(SemanticType::Field) - .build() - .unwrap() - .into(), - ]) - .time_index("ts") - .primary_keys(["host".into()]) - .table_name(name) - .build() - .unwrap() - .into(); - let table_info = build_raw_table_info_from_expr(&create_table); - CreateTableTask { - create_table, - // Single region - partitions: vec![Partition { - column_list: vec![], - value_list: vec![], - }], - table_info, - } -} - -// Note: this code may be duplicated with others. -// However, it's by design, ensures the tests are easy to be modified or added. -pub(crate) fn test_create_physical_table_task(name: &str) -> CreateTableTask { - let create_table = TestCreateTableExprBuilder::default() - .column_defs([ - TestColumnDefBuilder::default() - .name("ts") - .data_type(ColumnDataType::TimestampMillisecond) - .semantic_type(SemanticType::Timestamp) - .build() - .unwrap() - .into(), - TestColumnDefBuilder::default() - .name("value") - .data_type(ColumnDataType::Float64) - .semantic_type(SemanticType::Field) - .build() - .unwrap() - .into(), - ]) - .time_index("ts") - .primary_keys(["value".into()]) - .table_name(name) - .build() - .unwrap() - .into(); - let table_info = build_raw_table_info_from_expr(&create_table); - CreateTableTask { - create_table, - // Single region - partitions: vec![Partition { - column_list: vec![], - value_list: vec![], - }], - table_info, - } -} - #[tokio::test] async fn test_on_prepare_physical_table_not_found() { let datanode_manager = Arc::new(MockDatanodeManager::new(())); @@ -137,18 +48,6 @@ async fn test_on_prepare_physical_table_not_found() { assert_matches!(err, Error::TableRouteNotFound { .. }); } -pub(crate) async fn create_physical_table_metadata( - ddl_context: &DdlContext, - table_info: RawTableInfo, - table_route: TableRouteValue, -) { - ddl_context - .table_metadata_manager - .create_table_metadata(table_info, table_route, HashMap::default()) - .await - .unwrap(); -} - #[tokio::test] async fn test_on_prepare() { let datanode_manager = Arc::new(MockDatanodeManager::new(())); From e9df15d585efbe9fc8269488dc8f63a1e6971314 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 27 Mar 2024 06:58:00 +0000 Subject: [PATCH 4/5] test: add tests for drop databases --- src/common/meta/src/ddl/drop_database.rs | 2 +- .../meta/src/ddl/drop_database/cursor.rs | 117 ++++++++++- .../meta/src/ddl/drop_database/executor.rs | 192 +++++++++++++++++- .../meta/src/ddl/drop_database/metadata.rs | 50 +++++ .../meta/src/ddl/drop_database/start.rs | 67 ++++++ 5 files changed, 415 insertions(+), 13 deletions(-) diff --git a/src/common/meta/src/ddl/drop_database.rs b/src/common/meta/src/ddl/drop_database.rs index 911037677af7..e91485ee0c52 100644 --- a/src/common/meta/src/ddl/drop_database.rs +++ b/src/common/meta/src/ddl/drop_database.rs @@ -44,7 +44,7 @@ pub struct DropDatabaseProcedure { } /// Target of dropping tables. -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] pub(crate) enum DropTableTarget { Logical, Physical, diff --git a/src/common/meta/src/ddl/drop_database/cursor.rs b/src/common/meta/src/ddl/drop_database/cursor.rs index e0447c02af5c..3913608230e7 100644 --- a/src/common/meta/src/ddl/drop_database/cursor.rs +++ b/src/common/meta/src/ddl/drop_database/cursor.rs @@ -30,7 +30,7 @@ use crate::table_name::TableName; #[derive(Debug, Serialize, Deserialize)] pub(crate) struct DropDatabaseCursor { - target: DropTableTarget, + pub(crate) target: DropTableTarget, } impl DropDatabaseCursor { @@ -43,16 +43,13 @@ impl DropDatabaseCursor { &mut self, ctx: &mut DropDatabaseContext, ) -> Result<(Box, Status)> { + // Consumes the tables stream. + ctx.tables.take(); match self.target { - DropTableTarget::Logical => { - // Consumes the tables stream. - ctx.tables.take(); - - Ok(( - Box::new(DropDatabaseCursor::new(DropTableTarget::Physical)), - Status::executing(true), - )) - } + DropTableTarget::Logical => Ok(( + Box::new(DropDatabaseCursor::new(DropTableTarget::Physical)), + Status::executing(true), + )), DropTableTarget::Physical => Ok(( Box::new(DropDatabaseRemoveMetadata), Status::executing(true), @@ -148,3 +145,103 @@ impl State for DropDatabaseCursor { self } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + + use crate::ddl::drop_database::cursor::DropDatabaseCursor; + use crate::ddl::drop_database::executor::DropDatabaseExecutor; + use crate::ddl::drop_database::metadata::DropDatabaseRemoveMetadata; + use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State}; + use crate::ddl::test_util::{create_logical_table, create_physical_table}; + use crate::test_util::{new_ddl_context, MockDatanodeManager}; + + #[tokio::test] + async fn test_next_without_logical_tables() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + create_physical_table(ddl_context.clone(), 0, "phy").await; + // It always starts from Logical + let mut state = DropDatabaseCursor::new(DropTableTarget::Logical); + let mut ctx = DropDatabaseContext { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + drop_if_exists: false, + tables: None, + }; + // Ticks + let (mut state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + assert!(!status.need_persist()); + let cursor = state.as_any().downcast_ref::().unwrap(); + assert_eq!(cursor.target, DropTableTarget::Logical); + // Ticks + let (mut state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + assert!(status.need_persist()); + assert!(ctx.tables.is_none()); + let cursor = state.as_any().downcast_ref::().unwrap(); + assert_eq!(cursor.target, DropTableTarget::Physical); + // Ticks + let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + assert!(status.need_persist()); + let executor = state + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(executor.target, DropTableTarget::Physical); + } + + #[tokio::test] + async fn test_next_with_logical_tables() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + let physical_table_id = create_physical_table(ddl_context.clone(), 0, "phy").await; + create_logical_table(ddl_context.clone(), 0, physical_table_id, "metric_0").await; + // It always starts from Logical + let mut state = DropDatabaseCursor::new(DropTableTarget::Logical); + let mut ctx = DropDatabaseContext { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + drop_if_exists: false, + tables: None, + }; + // Ticks + let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + assert!(status.need_persist()); + let executor = state + .as_any() + .downcast_ref::() + .unwrap(); + let (_, table_route) = ddl_context + .table_metadata_manager + .table_route_manager() + .get_physical_table_route(physical_table_id) + .await + .unwrap(); + assert_eq!(table_route.region_routes, executor.region_routes); + assert_eq!(executor.target, DropTableTarget::Logical); + } + + #[tokio::test] + async fn test_reach_the_end() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + let mut state = DropDatabaseCursor::new(DropTableTarget::Physical); + let mut ctx = DropDatabaseContext { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + drop_if_exists: false, + tables: None, + }; + // Ticks + let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + assert!(status.need_persist()); + state + .as_any() + .downcast_ref::() + .unwrap(); + assert!(ctx.tables.is_none()); + } +} diff --git a/src/common/meta/src/ddl/drop_database/executor.rs b/src/common/meta/src/ddl/drop_database/executor.rs index 7585e72c77f2..16308b948136 100644 --- a/src/common/meta/src/ddl/drop_database/executor.rs +++ b/src/common/meta/src/ddl/drop_database/executor.rs @@ -34,8 +34,8 @@ use crate::table_name::TableName; pub(crate) struct DropDatabaseExecutor { table_id: TableId, table_name: TableName, - region_routes: Vec, - target: DropTableTarget, + pub(crate) region_routes: Vec, + pub(crate) target: DropTableTarget, #[serde(skip)] dropping_regions: Vec, } @@ -106,3 +106,191 @@ impl State for DropDatabaseExecutor { self } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::region::{QueryRequest, RegionRequest}; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use common_error::ext::BoxedError; + use common_recordbatch::SendableRecordBatchStream; + + use crate::datanode_manager::HandleResponse; + use crate::ddl::drop_database::cursor::DropDatabaseCursor; + use crate::ddl::drop_database::executor::DropDatabaseExecutor; + use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State}; + use crate::ddl::test_util::{create_logical_table, create_physical_table}; + use crate::error::{self, Error, Result}; + use crate::peer::Peer; + use crate::table_name::TableName; + use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager}; + + #[derive(Clone)] + pub struct NaiveDatanodeHandler; + + #[async_trait::async_trait] + impl MockDatanodeHandler for NaiveDatanodeHandler { + async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result { + Ok(HandleResponse::new(0)) + } + + async fn handle_query( + &self, + _peer: &Peer, + _request: QueryRequest, + ) -> Result { + unreachable!() + } + } + + #[tokio::test] + async fn test_next_with_physical_table() { + let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(datanode_manager); + let physical_table_id = create_physical_table(ddl_context.clone(), 0, "phy").await; + let (_, table_route) = ddl_context + .table_metadata_manager + .table_route_manager() + .get_physical_table_route(physical_table_id) + .await + .unwrap(); + { + let mut state = DropDatabaseExecutor::new( + physical_table_id, + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"), + table_route.region_routes.clone(), + DropTableTarget::Physical, + ); + let mut ctx = DropDatabaseContext { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + drop_if_exists: false, + tables: None, + }; + let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + assert!(!status.need_persist()); + let cursor = state.as_any().downcast_ref::().unwrap(); + assert_eq!(cursor.target, DropTableTarget::Physical); + } + // Execute again + let mut ctx = DropDatabaseContext { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + drop_if_exists: false, + tables: None, + }; + let mut state = DropDatabaseExecutor::new( + physical_table_id, + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"), + table_route.region_routes, + DropTableTarget::Physical, + ); + let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + assert!(!status.need_persist()); + let cursor = state.as_any().downcast_ref::().unwrap(); + assert_eq!(cursor.target, DropTableTarget::Physical); + } + + #[tokio::test] + async fn test_next_logical_table() { + let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(datanode_manager); + let physical_table_id = create_physical_table(ddl_context.clone(), 0, "phy").await; + create_logical_table(ddl_context.clone(), 0, physical_table_id, "metric").await; + let logical_table_id = physical_table_id + 1; + let (_, table_route) = ddl_context + .table_metadata_manager + .table_route_manager() + .get_physical_table_route(logical_table_id) + .await + .unwrap(); + { + let mut state = DropDatabaseExecutor::new( + physical_table_id, + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"), + table_route.region_routes.clone(), + DropTableTarget::Logical, + ); + let mut ctx = DropDatabaseContext { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + drop_if_exists: false, + tables: None, + }; + let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + assert!(!status.need_persist()); + let cursor = state.as_any().downcast_ref::().unwrap(); + assert_eq!(cursor.target, DropTableTarget::Logical); + } + // Execute again + let mut ctx = DropDatabaseContext { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + drop_if_exists: false, + tables: None, + }; + let mut state = DropDatabaseExecutor::new( + physical_table_id, + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"), + table_route.region_routes, + DropTableTarget::Logical, + ); + let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + assert!(!status.need_persist()); + let cursor = state.as_any().downcast_ref::().unwrap(); + assert_eq!(cursor.target, DropTableTarget::Logical); + } + + #[derive(Clone)] + pub struct RetryErrorDatanodeHandler; + + #[async_trait::async_trait] + impl MockDatanodeHandler for RetryErrorDatanodeHandler { + async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result { + Err(Error::RetryLater { + source: BoxedError::new( + error::UnexpectedSnafu { + err_msg: "retry later", + } + .build(), + ), + }) + } + + async fn handle_query( + &self, + _peer: &Peer, + _request: QueryRequest, + ) -> Result { + unreachable!() + } + } + + #[tokio::test] + async fn test_next_retryable_err() { + let datanode_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler)); + let ddl_context = new_ddl_context(datanode_manager); + let physical_table_id = create_physical_table(ddl_context.clone(), 0, "phy").await; + let (_, table_route) = ddl_context + .table_metadata_manager + .table_route_manager() + .get_physical_table_route(physical_table_id) + .await + .unwrap(); + let mut state = DropDatabaseExecutor::new( + physical_table_id, + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"), + table_route.region_routes, + DropTableTarget::Physical, + ); + let mut ctx = DropDatabaseContext { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + drop_if_exists: false, + tables: None, + }; + let err = state.next(&ddl_context, &mut ctx).await.unwrap_err(); + assert!(err.is_retry_later()); + } +} diff --git a/src/common/meta/src/ddl/drop_database/metadata.rs b/src/common/meta/src/ddl/drop_database/metadata.rs index 78ff6f069c08..b6e25197ea8b 100644 --- a/src/common/meta/src/ddl/drop_database/metadata.rs +++ b/src/common/meta/src/ddl/drop_database/metadata.rs @@ -47,3 +47,53 @@ impl State for DropDatabaseRemoveMetadata { self } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use crate::ddl::drop_database::end::DropDatabaseEnd; + use crate::ddl::drop_database::metadata::DropDatabaseRemoveMetadata; + use crate::ddl::drop_database::{DropDatabaseContext, State}; + use crate::key::schema_name::SchemaNameKey; + use crate::test_util::{new_ddl_context, MockDatanodeManager}; + + #[tokio::test] + async fn test_next() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + ddl_context + .table_metadata_manager + .schema_manager() + .create(SchemaNameKey::new("foo", "bar"), None, true) + .await + .unwrap(); + let mut state = DropDatabaseRemoveMetadata; + let mut ctx = DropDatabaseContext { + catalog: "foo".to_string(), + schema: "bar".to_string(), + drop_if_exists: true, + tables: None, + }; + let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + state.as_any().downcast_ref::().unwrap(); + assert!(status.is_done()); + assert!(!ddl_context + .table_metadata_manager + .schema_manager() + .exists(SchemaNameKey::new("foo", "bar")) + .await + .unwrap()); + // Schema not exists + let mut state = DropDatabaseRemoveMetadata; + let mut ctx = DropDatabaseContext { + catalog: "foo".to_string(), + schema: "bar".to_string(), + drop_if_exists: true, + tables: None, + }; + let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + state.as_any().downcast_ref::().unwrap(); + assert!(status.is_done()); + } +} diff --git a/src/common/meta/src/ddl/drop_database/start.rs b/src/common/meta/src/ddl/drop_database/start.rs index f7dc23db3f44..7d71d1972d6b 100644 --- a/src/common/meta/src/ddl/drop_database/start.rs +++ b/src/common/meta/src/ddl/drop_database/start.rs @@ -69,3 +69,70 @@ impl State for DropDatabaseStart { self } } + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::sync::Arc; + + use crate::ddl::drop_database::cursor::DropDatabaseCursor; + use crate::ddl::drop_database::end::DropDatabaseEnd; + use crate::ddl::drop_database::start::DropDatabaseStart; + use crate::ddl::drop_database::{DropDatabaseContext, State}; + use crate::error; + use crate::key::schema_name::SchemaNameKey; + use crate::test_util::{new_ddl_context, MockDatanodeManager}; + + #[tokio::test] + async fn test_schema_not_exists_err() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + let mut step = DropDatabaseStart; + let mut ctx = DropDatabaseContext { + catalog: "foo".to_string(), + schema: "bar".to_string(), + drop_if_exists: false, + tables: None, + }; + let err = step.next(&ddl_context, &mut ctx).await.unwrap_err(); + assert_matches!(err, error::Error::SchemaNotFound { .. }); + } + + #[tokio::test] + async fn test_schema_not_exists() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + let mut state = DropDatabaseStart; + let mut ctx = DropDatabaseContext { + catalog: "foo".to_string(), + schema: "bar".to_string(), + drop_if_exists: true, + tables: None, + }; + let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + state.as_any().downcast_ref::().unwrap(); + assert!(status.is_done()); + } + + #[tokio::test] + async fn test_next() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + ddl_context + .table_metadata_manager + .schema_manager() + .create(SchemaNameKey::new("foo", "bar"), None, true) + .await + .unwrap(); + let mut state = DropDatabaseStart; + let mut ctx = DropDatabaseContext { + catalog: "foo".to_string(), + schema: "bar".to_string(), + drop_if_exists: false, + tables: None, + }; + let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + state.as_any().downcast_ref::().unwrap(); + assert!(status.need_persist()); + } +} From e4c21d7de6f142cb56c1d9ad38968202ffd21fbe Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 27 Mar 2024 09:06:38 +0000 Subject: [PATCH 5/5] fix: fix deteling physical table route unexpectedly --- .../meta/src/ddl/drop_database/cursor.rs | 4 +- src/common/meta/src/ddl/tests.rs | 1 + .../meta/src/ddl/tests/drop_database.rs | 123 ++++++++++++++++++ 3 files changed, 126 insertions(+), 2 deletions(-) create mode 100644 src/common/meta/src/ddl/tests/drop_database.rs diff --git a/src/common/meta/src/ddl/drop_database/cursor.rs b/src/common/meta/src/ddl/drop_database/cursor.rs index 3913608230e7..c4caf3522b8d 100644 --- a/src/common/meta/src/ddl/drop_database/cursor.rs +++ b/src/common/meta/src/ddl/drop_database/cursor.rs @@ -67,12 +67,12 @@ impl DropDatabaseCursor { ) -> Result<(Box, Status)> { match (self.target, table_route_value) { (DropTableTarget::Logical, TableRouteValue::Logical(route)) => { - let table_id = route.physical_table_id(); + let physical_table_id = route.physical_table_id(); let (_, table_route) = ddl_ctx .table_metadata_manager .table_route_manager() - .get_physical_table_route(table_id) + .get_physical_table_route(physical_table_id) .await?; Ok(( Box::new(DropDatabaseExecutor::new( diff --git a/src/common/meta/src/ddl/tests.rs b/src/common/meta/src/ddl/tests.rs index fcbe52189a84..2f83941f9096 100644 --- a/src/common/meta/src/ddl/tests.rs +++ b/src/common/meta/src/ddl/tests.rs @@ -15,3 +15,4 @@ mod alter_logical_tables; mod create_logical_tables; mod create_table; +mod drop_database; diff --git a/src/common/meta/src/ddl/tests/drop_database.rs b/src/common/meta/src/ddl/tests/drop_database.rs new file mode 100644 index 000000000000..8e5f2fe45a28 --- /dev/null +++ b/src/common/meta/src/ddl/tests/drop_database.rs @@ -0,0 +1,123 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId}; +use common_procedure_test::MockContextProvider; +use futures::TryStreamExt; + +use crate::ddl::drop_database::DropDatabaseProcedure; +use crate::ddl::test_util::{create_logical_table, create_physical_table}; +use crate::ddl::tests::create_table::{NaiveDatanodeHandler, RetryErrorDatanodeHandler}; +use crate::key::schema_name::SchemaNameKey; +use crate::test_util::{new_ddl_context, MockDatanodeManager}; + +#[tokio::test] +async fn test_drop_database_with_logical_tables() { + common_telemetry::init_default_ut_logging(); + let cluster_id = 1; + let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(datanode_manager); + ddl_context + .table_metadata_manager + .schema_manager() + .create( + SchemaNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME), + None, + false, + ) + .await + .unwrap(); + // Creates physical table + let phy_id = create_physical_table(ddl_context.clone(), cluster_id, "phy").await; + // Creates 3 logical tables + create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await; + create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table2").await; + create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table3").await; + + let mut procedure = DropDatabaseProcedure::new( + DEFAULT_CATALOG_NAME.to_string(), + DEFAULT_SCHEMA_NAME.to_string(), + false, + ddl_context.clone(), + ); + + let ctx = ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + }; + + while !procedure.execute(&ctx).await.unwrap().is_done() { + procedure.execute(&ctx).await.unwrap(); + } + + let tables = ddl_context + .table_metadata_manager + .table_name_manager() + .tables(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME) + .try_collect::>() + .await + .unwrap(); + assert!(tables.is_empty()); +} + +#[tokio::test] +async fn test_drop_database_retryable_error() { + common_telemetry::init_default_ut_logging(); + let cluster_id = 1; + let datanode_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler)); + let ddl_context = new_ddl_context(datanode_manager); + ddl_context + .table_metadata_manager + .schema_manager() + .create( + SchemaNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME), + None, + false, + ) + .await + .unwrap(); + // Creates physical table + let phy_id = create_physical_table(ddl_context.clone(), cluster_id, "phy").await; + // Creates 3 logical tables + create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await; + create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table2").await; + create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table3").await; + + let mut procedure = DropDatabaseProcedure::new( + DEFAULT_CATALOG_NAME.to_string(), + DEFAULT_SCHEMA_NAME.to_string(), + false, + ddl_context.clone(), + ); + + let ctx = ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + }; + + loop { + match procedure.execute(&ctx).await { + Ok(_) => { + // go next + } + Err(err) => { + assert!(err.is_retry_later()); + break; + } + } + } +}