Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add tests for drop databases #3594

Merged
merged 5 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/common/meta/src/ddl/drop_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod end;
pub mod executor;
pub mod metadata;
pub mod start;
use std::any::Any;
use std::fmt::Debug;

use common_procedure::error::{Error as ProcedureError, FromJsonSnafu, ToJsonSnafu};
Expand All @@ -43,14 +44,14 @@ pub struct DropDatabaseProcedure {
}

/// Target of dropping tables.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum DropTableTarget {
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) enum DropTableTarget {
Logical,
Physical,
}

/// Context of [DropDatabaseProcedure] execution.
pub struct DropDatabaseContext {
pub(crate) struct DropDatabaseContext {
catalog: String,
schema: String,
drop_if_exists: bool,
Expand All @@ -66,6 +67,9 @@ pub(crate) trait State: Send + Debug {
ddl_ctx: &DdlContext,
ctx: &mut DropDatabaseContext,
) -> Result<(Box<dyn State>, Status)>;

/// Returns as [Any](std::any::Any).
fn as_any(&self) -> &dyn Any;
}

impl DropDatabaseProcedure {
Expand Down
129 changes: 116 additions & 13 deletions src/common/meta/src/ddl/drop_database/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;

use common_procedure::Status;
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
Expand All @@ -27,8 +29,8 @@ use crate::key::table_route::TableRouteValue;
use crate::table_name::TableName;

#[derive(Debug, Serialize, Deserialize)]
pub struct DropDatabaseCursor {
target: DropTableTarget,
pub(crate) struct DropDatabaseCursor {
pub(crate) target: DropTableTarget,
}

impl DropDatabaseCursor {
Expand All @@ -41,16 +43,13 @@ impl DropDatabaseCursor {
&mut self,
ctx: &mut DropDatabaseContext,
) -> Result<(Box<dyn State>, Status)> {
// Consumes the tables stream.
ctx.tables.take();
match self.target {
DropTableTarget::Logical => {
// Consumes the tables stream.
ctx.tables.take();

Ok((
Box::new(DropDatabaseCursor::new(DropTableTarget::Physical)),
Status::executing(true),
))
}
DropTableTarget::Logical => Ok((
Box::new(DropDatabaseCursor::new(DropTableTarget::Physical)),
Status::executing(true),
)),
DropTableTarget::Physical => Ok((
Box::new(DropDatabaseRemoveMetadata),
Status::executing(true),
Expand All @@ -68,12 +67,12 @@ impl DropDatabaseCursor {
) -> Result<(Box<dyn State>, Status)> {
match (self.target, table_route_value) {
(DropTableTarget::Logical, TableRouteValue::Logical(route)) => {
let table_id = route.physical_table_id();
let physical_table_id = route.physical_table_id();

let (_, table_route) = ddl_ctx
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(table_id)
.get_physical_table_route(physical_table_id)
.await?;
Ok((
Box::new(DropDatabaseExecutor::new(
Expand Down Expand Up @@ -141,4 +140,108 @@ impl State for DropDatabaseCursor {
None => self.handle_reach_end(ctx),
}
}

fn as_any(&self) -> &dyn Any {
self
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};

use crate::ddl::drop_database::cursor::DropDatabaseCursor;
use crate::ddl::drop_database::executor::DropDatabaseExecutor;
use crate::ddl::drop_database::metadata::DropDatabaseRemoveMetadata;
use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
use crate::ddl::test_util::{create_logical_table, create_physical_table};
use crate::test_util::{new_ddl_context, MockDatanodeManager};

#[tokio::test]
async fn test_next_without_logical_tables() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
create_physical_table(ddl_context.clone(), 0, "phy").await;
// It always starts from Logical
let mut state = DropDatabaseCursor::new(DropTableTarget::Logical);
let mut ctx = DropDatabaseContext {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
tables: None,
};
// Ticks
let (mut state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(!status.need_persist());
let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
assert_eq!(cursor.target, DropTableTarget::Logical);
// Ticks
let (mut state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(status.need_persist());
assert!(ctx.tables.is_none());
let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
assert_eq!(cursor.target, DropTableTarget::Physical);
// Ticks
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(status.need_persist());
let executor = state
.as_any()
.downcast_ref::<DropDatabaseExecutor>()
.unwrap();
assert_eq!(executor.target, DropTableTarget::Physical);
}

#[tokio::test]
async fn test_next_with_logical_tables() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let physical_table_id = create_physical_table(ddl_context.clone(), 0, "phy").await;
create_logical_table(ddl_context.clone(), 0, physical_table_id, "metric_0").await;
// It always starts from Logical
let mut state = DropDatabaseCursor::new(DropTableTarget::Logical);
let mut ctx = DropDatabaseContext {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
tables: None,
};
// Ticks
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(status.need_persist());
let executor = state
.as_any()
.downcast_ref::<DropDatabaseExecutor>()
.unwrap();
let (_, table_route) = ddl_context
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(physical_table_id)
.await
.unwrap();
assert_eq!(table_route.region_routes, executor.region_routes);
assert_eq!(executor.target, DropTableTarget::Logical);
}

#[tokio::test]
async fn test_reach_the_end() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let mut state = DropDatabaseCursor::new(DropTableTarget::Physical);
let mut ctx = DropDatabaseContext {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
tables: None,
};
// Ticks
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(status.need_persist());
state
.as_any()
.downcast_ref::<DropDatabaseRemoveMetadata>()
.unwrap();
assert!(ctx.tables.is_none());
}
}
8 changes: 7 additions & 1 deletion src/common/meta/src/ddl/drop_database/end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;

use common_procedure::Status;
use serde::{Deserialize, Serialize};

Expand All @@ -20,7 +22,7 @@ use crate::ddl::DdlContext;
use crate::error::Result;

#[derive(Debug, Serialize, Deserialize)]
pub struct DropDatabaseEnd;
pub(crate) struct DropDatabaseEnd;

#[async_trait::async_trait]
#[typetag::serde]
Expand All @@ -32,4 +34,8 @@ impl State for DropDatabaseEnd {
) -> Result<(Box<dyn State>, Status)> {
Ok((Box::new(DropDatabaseEnd), Status::done()))
}

fn as_any(&self) -> &dyn Any {
self
}
}
Loading