diff --git a/src/common/meta/src/ddl/drop_database.rs b/src/common/meta/src/ddl/drop_database.rs index 911037677af7..e91485ee0c52 100644 --- a/src/common/meta/src/ddl/drop_database.rs +++ b/src/common/meta/src/ddl/drop_database.rs @@ -44,7 +44,7 @@ pub struct DropDatabaseProcedure { } /// Target of dropping tables. -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] pub(crate) enum DropTableTarget { Logical, Physical, diff --git a/src/common/meta/src/ddl/drop_database/cursor.rs b/src/common/meta/src/ddl/drop_database/cursor.rs index e0447c02af5c..3913608230e7 100644 --- a/src/common/meta/src/ddl/drop_database/cursor.rs +++ b/src/common/meta/src/ddl/drop_database/cursor.rs @@ -30,7 +30,7 @@ use crate::table_name::TableName; #[derive(Debug, Serialize, Deserialize)] pub(crate) struct DropDatabaseCursor { - target: DropTableTarget, + pub(crate) target: DropTableTarget, } impl DropDatabaseCursor { @@ -43,16 +43,13 @@ impl DropDatabaseCursor { &mut self, ctx: &mut DropDatabaseContext, ) -> Result<(Box, Status)> { + // Consumes the tables stream. + ctx.tables.take(); match self.target { - DropTableTarget::Logical => { - // Consumes the tables stream. - ctx.tables.take(); - - Ok(( - Box::new(DropDatabaseCursor::new(DropTableTarget::Physical)), - Status::executing(true), - )) - } + DropTableTarget::Logical => Ok(( + Box::new(DropDatabaseCursor::new(DropTableTarget::Physical)), + Status::executing(true), + )), DropTableTarget::Physical => Ok(( Box::new(DropDatabaseRemoveMetadata), Status::executing(true), @@ -148,3 +145,103 @@ impl State for DropDatabaseCursor { self } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + + use crate::ddl::drop_database::cursor::DropDatabaseCursor; + use crate::ddl::drop_database::executor::DropDatabaseExecutor; + use crate::ddl::drop_database::metadata::DropDatabaseRemoveMetadata; + use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State}; + use crate::ddl::test_util::{create_logical_table, create_physical_table}; + use crate::test_util::{new_ddl_context, MockDatanodeManager}; + + #[tokio::test] + async fn test_next_without_logical_tables() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + create_physical_table(ddl_context.clone(), 0, "phy").await; + // It always starts from Logical + let mut state = DropDatabaseCursor::new(DropTableTarget::Logical); + let mut ctx = DropDatabaseContext { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + drop_if_exists: false, + tables: None, + }; + // Ticks + let (mut state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + assert!(!status.need_persist()); + let cursor = state.as_any().downcast_ref::().unwrap(); + assert_eq!(cursor.target, DropTableTarget::Logical); + // Ticks + let (mut state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + assert!(status.need_persist()); + assert!(ctx.tables.is_none()); + let cursor = state.as_any().downcast_ref::().unwrap(); + assert_eq!(cursor.target, DropTableTarget::Physical); + // Ticks + let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + assert!(status.need_persist()); + let executor = state + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(executor.target, DropTableTarget::Physical); + } + + #[tokio::test] + async fn test_next_with_logical_tables() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + let physical_table_id = create_physical_table(ddl_context.clone(), 0, "phy").await; + create_logical_table(ddl_context.clone(), 0, physical_table_id, "metric_0").await; + // It always starts from Logical + let mut state = DropDatabaseCursor::new(DropTableTarget::Logical); + let mut ctx = DropDatabaseContext { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + drop_if_exists: false, + tables: None, + }; + // Ticks + let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + assert!(status.need_persist()); + let executor = state + .as_any() + .downcast_ref::() + .unwrap(); + let (_, table_route) = ddl_context + .table_metadata_manager + .table_route_manager() + .get_physical_table_route(physical_table_id) + .await + .unwrap(); + assert_eq!(table_route.region_routes, executor.region_routes); + assert_eq!(executor.target, DropTableTarget::Logical); + } + + #[tokio::test] + async fn test_reach_the_end() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + let mut state = DropDatabaseCursor::new(DropTableTarget::Physical); + let mut ctx = DropDatabaseContext { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + drop_if_exists: false, + tables: None, + }; + // Ticks + let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + assert!(status.need_persist()); + state + .as_any() + .downcast_ref::() + .unwrap(); + assert!(ctx.tables.is_none()); + } +} diff --git a/src/common/meta/src/ddl/drop_database/executor.rs b/src/common/meta/src/ddl/drop_database/executor.rs index 7585e72c77f2..16308b948136 100644 --- a/src/common/meta/src/ddl/drop_database/executor.rs +++ b/src/common/meta/src/ddl/drop_database/executor.rs @@ -34,8 +34,8 @@ use crate::table_name::TableName; pub(crate) struct DropDatabaseExecutor { table_id: TableId, table_name: TableName, - region_routes: Vec, - target: DropTableTarget, + pub(crate) region_routes: Vec, + pub(crate) target: DropTableTarget, #[serde(skip)] dropping_regions: Vec, } @@ -106,3 +106,191 @@ impl State for DropDatabaseExecutor { self } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::region::{QueryRequest, RegionRequest}; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use common_error::ext::BoxedError; + use common_recordbatch::SendableRecordBatchStream; + + use crate::datanode_manager::HandleResponse; + use crate::ddl::drop_database::cursor::DropDatabaseCursor; + use crate::ddl::drop_database::executor::DropDatabaseExecutor; + use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State}; + use crate::ddl::test_util::{create_logical_table, create_physical_table}; + use crate::error::{self, Error, Result}; + use crate::peer::Peer; + use crate::table_name::TableName; + use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager}; + + #[derive(Clone)] + pub struct NaiveDatanodeHandler; + + #[async_trait::async_trait] + impl MockDatanodeHandler for NaiveDatanodeHandler { + async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result { + Ok(HandleResponse::new(0)) + } + + async fn handle_query( + &self, + _peer: &Peer, + _request: QueryRequest, + ) -> Result { + unreachable!() + } + } + + #[tokio::test] + async fn test_next_with_physical_table() { + let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(datanode_manager); + let physical_table_id = create_physical_table(ddl_context.clone(), 0, "phy").await; + let (_, table_route) = ddl_context + .table_metadata_manager + .table_route_manager() + .get_physical_table_route(physical_table_id) + .await + .unwrap(); + { + let mut state = DropDatabaseExecutor::new( + physical_table_id, + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"), + table_route.region_routes.clone(), + DropTableTarget::Physical, + ); + let mut ctx = DropDatabaseContext { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + drop_if_exists: false, + tables: None, + }; + let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + assert!(!status.need_persist()); + let cursor = state.as_any().downcast_ref::().unwrap(); + assert_eq!(cursor.target, DropTableTarget::Physical); + } + // Execute again + let mut ctx = DropDatabaseContext { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + drop_if_exists: false, + tables: None, + }; + let mut state = DropDatabaseExecutor::new( + physical_table_id, + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"), + table_route.region_routes, + DropTableTarget::Physical, + ); + let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + assert!(!status.need_persist()); + let cursor = state.as_any().downcast_ref::().unwrap(); + assert_eq!(cursor.target, DropTableTarget::Physical); + } + + #[tokio::test] + async fn test_next_logical_table() { + let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(datanode_manager); + let physical_table_id = create_physical_table(ddl_context.clone(), 0, "phy").await; + create_logical_table(ddl_context.clone(), 0, physical_table_id, "metric").await; + let logical_table_id = physical_table_id + 1; + let (_, table_route) = ddl_context + .table_metadata_manager + .table_route_manager() + .get_physical_table_route(logical_table_id) + .await + .unwrap(); + { + let mut state = DropDatabaseExecutor::new( + physical_table_id, + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"), + table_route.region_routes.clone(), + DropTableTarget::Logical, + ); + let mut ctx = DropDatabaseContext { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + drop_if_exists: false, + tables: None, + }; + let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + assert!(!status.need_persist()); + let cursor = state.as_any().downcast_ref::().unwrap(); + assert_eq!(cursor.target, DropTableTarget::Logical); + } + // Execute again + let mut ctx = DropDatabaseContext { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + drop_if_exists: false, + tables: None, + }; + let mut state = DropDatabaseExecutor::new( + physical_table_id, + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"), + table_route.region_routes, + DropTableTarget::Logical, + ); + let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + assert!(!status.need_persist()); + let cursor = state.as_any().downcast_ref::().unwrap(); + assert_eq!(cursor.target, DropTableTarget::Logical); + } + + #[derive(Clone)] + pub struct RetryErrorDatanodeHandler; + + #[async_trait::async_trait] + impl MockDatanodeHandler for RetryErrorDatanodeHandler { + async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result { + Err(Error::RetryLater { + source: BoxedError::new( + error::UnexpectedSnafu { + err_msg: "retry later", + } + .build(), + ), + }) + } + + async fn handle_query( + &self, + _peer: &Peer, + _request: QueryRequest, + ) -> Result { + unreachable!() + } + } + + #[tokio::test] + async fn test_next_retryable_err() { + let datanode_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler)); + let ddl_context = new_ddl_context(datanode_manager); + let physical_table_id = create_physical_table(ddl_context.clone(), 0, "phy").await; + let (_, table_route) = ddl_context + .table_metadata_manager + .table_route_manager() + .get_physical_table_route(physical_table_id) + .await + .unwrap(); + let mut state = DropDatabaseExecutor::new( + physical_table_id, + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"), + table_route.region_routes, + DropTableTarget::Physical, + ); + let mut ctx = DropDatabaseContext { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + drop_if_exists: false, + tables: None, + }; + let err = state.next(&ddl_context, &mut ctx).await.unwrap_err(); + assert!(err.is_retry_later()); + } +} diff --git a/src/common/meta/src/ddl/drop_database/metadata.rs b/src/common/meta/src/ddl/drop_database/metadata.rs index 78ff6f069c08..b6e25197ea8b 100644 --- a/src/common/meta/src/ddl/drop_database/metadata.rs +++ b/src/common/meta/src/ddl/drop_database/metadata.rs @@ -47,3 +47,53 @@ impl State for DropDatabaseRemoveMetadata { self } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use crate::ddl::drop_database::end::DropDatabaseEnd; + use crate::ddl::drop_database::metadata::DropDatabaseRemoveMetadata; + use crate::ddl::drop_database::{DropDatabaseContext, State}; + use crate::key::schema_name::SchemaNameKey; + use crate::test_util::{new_ddl_context, MockDatanodeManager}; + + #[tokio::test] + async fn test_next() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + ddl_context + .table_metadata_manager + .schema_manager() + .create(SchemaNameKey::new("foo", "bar"), None, true) + .await + .unwrap(); + let mut state = DropDatabaseRemoveMetadata; + let mut ctx = DropDatabaseContext { + catalog: "foo".to_string(), + schema: "bar".to_string(), + drop_if_exists: true, + tables: None, + }; + let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + state.as_any().downcast_ref::().unwrap(); + assert!(status.is_done()); + assert!(!ddl_context + .table_metadata_manager + .schema_manager() + .exists(SchemaNameKey::new("foo", "bar")) + .await + .unwrap()); + // Schema not exists + let mut state = DropDatabaseRemoveMetadata; + let mut ctx = DropDatabaseContext { + catalog: "foo".to_string(), + schema: "bar".to_string(), + drop_if_exists: true, + tables: None, + }; + let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + state.as_any().downcast_ref::().unwrap(); + assert!(status.is_done()); + } +} diff --git a/src/common/meta/src/ddl/drop_database/start.rs b/src/common/meta/src/ddl/drop_database/start.rs index f7dc23db3f44..7d71d1972d6b 100644 --- a/src/common/meta/src/ddl/drop_database/start.rs +++ b/src/common/meta/src/ddl/drop_database/start.rs @@ -69,3 +69,70 @@ impl State for DropDatabaseStart { self } } + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::sync::Arc; + + use crate::ddl::drop_database::cursor::DropDatabaseCursor; + use crate::ddl::drop_database::end::DropDatabaseEnd; + use crate::ddl::drop_database::start::DropDatabaseStart; + use crate::ddl::drop_database::{DropDatabaseContext, State}; + use crate::error; + use crate::key::schema_name::SchemaNameKey; + use crate::test_util::{new_ddl_context, MockDatanodeManager}; + + #[tokio::test] + async fn test_schema_not_exists_err() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + let mut step = DropDatabaseStart; + let mut ctx = DropDatabaseContext { + catalog: "foo".to_string(), + schema: "bar".to_string(), + drop_if_exists: false, + tables: None, + }; + let err = step.next(&ddl_context, &mut ctx).await.unwrap_err(); + assert_matches!(err, error::Error::SchemaNotFound { .. }); + } + + #[tokio::test] + async fn test_schema_not_exists() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + let mut state = DropDatabaseStart; + let mut ctx = DropDatabaseContext { + catalog: "foo".to_string(), + schema: "bar".to_string(), + drop_if_exists: true, + tables: None, + }; + let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + state.as_any().downcast_ref::().unwrap(); + assert!(status.is_done()); + } + + #[tokio::test] + async fn test_next() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + ddl_context + .table_metadata_manager + .schema_manager() + .create(SchemaNameKey::new("foo", "bar"), None, true) + .await + .unwrap(); + let mut state = DropDatabaseStart; + let mut ctx = DropDatabaseContext { + catalog: "foo".to_string(), + schema: "bar".to_string(), + drop_if_exists: false, + tables: None, + }; + let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + state.as_any().downcast_ref::().unwrap(); + assert!(status.need_persist()); + } +}