Skip to content

Commit

Permalink
test: add tests for drop databases
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Mar 27, 2024
1 parent 0b80733 commit e9df15d
Show file tree
Hide file tree
Showing 5 changed files with 415 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/drop_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct DropDatabaseProcedure {
}

/// Target of dropping tables.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) enum DropTableTarget {
Logical,
Physical,
Expand Down
117 changes: 107 additions & 10 deletions src/common/meta/src/ddl/drop_database/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::table_name::TableName;

#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct DropDatabaseCursor {
target: DropTableTarget,
pub(crate) target: DropTableTarget,
}

impl DropDatabaseCursor {
Expand All @@ -43,16 +43,13 @@ impl DropDatabaseCursor {
&mut self,
ctx: &mut DropDatabaseContext,
) -> Result<(Box<dyn State>, Status)> {
// Consumes the tables stream.
ctx.tables.take();
match self.target {
DropTableTarget::Logical => {
// Consumes the tables stream.
ctx.tables.take();

Ok((
Box::new(DropDatabaseCursor::new(DropTableTarget::Physical)),
Status::executing(true),
))
}
DropTableTarget::Logical => Ok((
Box::new(DropDatabaseCursor::new(DropTableTarget::Physical)),
Status::executing(true),
)),
DropTableTarget::Physical => Ok((
Box::new(DropDatabaseRemoveMetadata),
Status::executing(true),
Expand Down Expand Up @@ -148,3 +145,103 @@ impl State for DropDatabaseCursor {
self
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};

use crate::ddl::drop_database::cursor::DropDatabaseCursor;
use crate::ddl::drop_database::executor::DropDatabaseExecutor;
use crate::ddl::drop_database::metadata::DropDatabaseRemoveMetadata;
use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
use crate::ddl::test_util::{create_logical_table, create_physical_table};
use crate::test_util::{new_ddl_context, MockDatanodeManager};

#[tokio::test]
async fn test_next_without_logical_tables() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
create_physical_table(ddl_context.clone(), 0, "phy").await;
// It always starts from Logical
let mut state = DropDatabaseCursor::new(DropTableTarget::Logical);
let mut ctx = DropDatabaseContext {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
tables: None,
};
// Ticks
let (mut state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(!status.need_persist());
let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
assert_eq!(cursor.target, DropTableTarget::Logical);
// Ticks
let (mut state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(status.need_persist());
assert!(ctx.tables.is_none());
let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
assert_eq!(cursor.target, DropTableTarget::Physical);
// Ticks
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(status.need_persist());
let executor = state
.as_any()
.downcast_ref::<DropDatabaseExecutor>()
.unwrap();
assert_eq!(executor.target, DropTableTarget::Physical);
}

#[tokio::test]
async fn test_next_with_logical_tables() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let physical_table_id = create_physical_table(ddl_context.clone(), 0, "phy").await;
create_logical_table(ddl_context.clone(), 0, physical_table_id, "metric_0").await;
// It always starts from Logical
let mut state = DropDatabaseCursor::new(DropTableTarget::Logical);
let mut ctx = DropDatabaseContext {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
tables: None,
};
// Ticks
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(status.need_persist());
let executor = state
.as_any()
.downcast_ref::<DropDatabaseExecutor>()
.unwrap();
let (_, table_route) = ddl_context
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(physical_table_id)
.await
.unwrap();
assert_eq!(table_route.region_routes, executor.region_routes);
assert_eq!(executor.target, DropTableTarget::Logical);
}

#[tokio::test]
async fn test_reach_the_end() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let mut state = DropDatabaseCursor::new(DropTableTarget::Physical);
let mut ctx = DropDatabaseContext {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
tables: None,
};
// Ticks
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(status.need_persist());
state
.as_any()
.downcast_ref::<DropDatabaseRemoveMetadata>()
.unwrap();
assert!(ctx.tables.is_none());
}
}
192 changes: 190 additions & 2 deletions src/common/meta/src/ddl/drop_database/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use crate::table_name::TableName;
pub(crate) struct DropDatabaseExecutor {
table_id: TableId,
table_name: TableName,
region_routes: Vec<RegionRoute>,
target: DropTableTarget,
pub(crate) region_routes: Vec<RegionRoute>,
pub(crate) target: DropTableTarget,
#[serde(skip)]
dropping_regions: Vec<OperatingRegionGuard>,
}
Expand Down Expand Up @@ -106,3 +106,191 @@ impl State for DropDatabaseExecutor {
self
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use api::v1::region::{QueryRequest, RegionRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;

use crate::datanode_manager::HandleResponse;
use crate::ddl::drop_database::cursor::DropDatabaseCursor;
use crate::ddl::drop_database::executor::DropDatabaseExecutor;
use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
use crate::ddl::test_util::{create_logical_table, create_physical_table};
use crate::error::{self, Error, Result};
use crate::peer::Peer;
use crate::table_name::TableName;
use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager};

#[derive(Clone)]
pub struct NaiveDatanodeHandler;

#[async_trait::async_trait]
impl MockDatanodeHandler for NaiveDatanodeHandler {
async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<HandleResponse> {
Ok(HandleResponse::new(0))
}

async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}

#[tokio::test]
async fn test_next_with_physical_table() {
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let ddl_context = new_ddl_context(datanode_manager);
let physical_table_id = create_physical_table(ddl_context.clone(), 0, "phy").await;
let (_, table_route) = ddl_context
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(physical_table_id)
.await
.unwrap();
{
let mut state = DropDatabaseExecutor::new(
physical_table_id,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
table_route.region_routes.clone(),
DropTableTarget::Physical,
);
let mut ctx = DropDatabaseContext {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
tables: None,
};
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(!status.need_persist());
let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
assert_eq!(cursor.target, DropTableTarget::Physical);
}
// Execute again
let mut ctx = DropDatabaseContext {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
tables: None,
};
let mut state = DropDatabaseExecutor::new(
physical_table_id,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
table_route.region_routes,
DropTableTarget::Physical,
);
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(!status.need_persist());
let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
assert_eq!(cursor.target, DropTableTarget::Physical);
}

#[tokio::test]
async fn test_next_logical_table() {
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let ddl_context = new_ddl_context(datanode_manager);
let physical_table_id = create_physical_table(ddl_context.clone(), 0, "phy").await;
create_logical_table(ddl_context.clone(), 0, physical_table_id, "metric").await;
let logical_table_id = physical_table_id + 1;
let (_, table_route) = ddl_context
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(logical_table_id)
.await
.unwrap();
{
let mut state = DropDatabaseExecutor::new(
physical_table_id,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"),
table_route.region_routes.clone(),
DropTableTarget::Logical,
);
let mut ctx = DropDatabaseContext {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
tables: None,
};
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(!status.need_persist());
let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
assert_eq!(cursor.target, DropTableTarget::Logical);
}
// Execute again
let mut ctx = DropDatabaseContext {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
tables: None,
};
let mut state = DropDatabaseExecutor::new(
physical_table_id,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
table_route.region_routes,
DropTableTarget::Logical,
);
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(!status.need_persist());
let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
assert_eq!(cursor.target, DropTableTarget::Logical);
}

#[derive(Clone)]
pub struct RetryErrorDatanodeHandler;

#[async_trait::async_trait]
impl MockDatanodeHandler for RetryErrorDatanodeHandler {
async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<HandleResponse> {
Err(Error::RetryLater {
source: BoxedError::new(
error::UnexpectedSnafu {
err_msg: "retry later",
}
.build(),
),
})
}

async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}

#[tokio::test]
async fn test_next_retryable_err() {
let datanode_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler));
let ddl_context = new_ddl_context(datanode_manager);
let physical_table_id = create_physical_table(ddl_context.clone(), 0, "phy").await;
let (_, table_route) = ddl_context
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(physical_table_id)
.await
.unwrap();
let mut state = DropDatabaseExecutor::new(
physical_table_id,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
table_route.region_routes,
DropTableTarget::Physical,
);
let mut ctx = DropDatabaseContext {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
tables: None,
};
let err = state.next(&ddl_context, &mut ctx).await.unwrap_err();
assert!(err.is_retry_later());
}
}
Loading

0 comments on commit e9df15d

Please sign in to comment.