Skip to content

Commit

Permalink
refactor: move recovery to recover method
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed May 6, 2024
1 parent f38c0bf commit 50c2a19
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 48 deletions.
34 changes: 20 additions & 14 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,13 @@ impl CreateTableProcedure {
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;

let mut creator = TableCreator {
data,
opening_regions: vec![],
};

// Only registers regions if the table route is allocated.
if let Some(x) = &creator.data.table_route {
creator.opening_regions = creator
.register_opening_regions(&context, &x.region_routes)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
}

Ok(CreateTableProcedure { context, creator })
Ok(CreateTableProcedure {
context,
creator: TableCreator {
data,
opening_regions: vec![],
},
})
}

fn table_info(&self) -> &RawTableInfo {
Expand Down Expand Up @@ -296,6 +289,19 @@ impl Procedure for CreateTableProcedure {
Self::TYPE_NAME
}

fn recover(&mut self) -> ProcedureResult<()> {
// Only registers regions if the table route is allocated.
if let Some(x) = &self.creator.data.table_route {
self.creator.opening_regions = self
.creator
.register_opening_regions(&self.context, &x.region_routes)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
}

Ok(())
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.creator.data.state;

Expand Down
27 changes: 14 additions & 13 deletions src/common/meta/src/ddl/drop_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub(crate) trait State: Send + Debug {
) -> Result<(Box<dyn State>, Status)>;

/// The hook is called during the recovery.
fn on_recover(&mut self, _ddl_ctx: &DdlContext) -> Result<()> {
fn recover(&mut self, _ddl_ctx: &DdlContext) -> Result<()> {
Ok(())
}

Expand All @@ -94,11 +94,6 @@ impl DropDatabaseProcedure {
}
}

fn on_recover(&mut self) -> Result<()> {
let state = &mut self.state;
state.on_recover(&self.runtime_context)
}

pub fn from_json(json: &str, runtime_context: DdlContext) -> ProcedureResult<Self> {
let DropDatabaseOwnedData {
catalog,
Expand All @@ -107,7 +102,7 @@ impl DropDatabaseProcedure {
state,
} = serde_json::from_str(json).context(FromJsonSnafu)?;

let mut procedure = Self {
Ok(Self {
runtime_context,
context: DropDatabaseContext {
catalog,
Expand All @@ -116,13 +111,12 @@ impl DropDatabaseProcedure {
tables: None,
},
state,
};
procedure
.on_recover()
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
})
}

Ok(procedure)
#[cfg(test)]
pub(crate) fn state(&self) -> &dyn State {
self.state.as_ref()
}
}

Expand All @@ -132,6 +126,13 @@ impl Procedure for DropDatabaseProcedure {
Self::TYPE_NAME
}

fn recover(&mut self) -> ProcedureResult<()> {
self.state
.recover(&self.runtime_context)
.map_err(BoxedError::new)
.context(ExternalSnafu)
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;

Expand Down
4 changes: 2 additions & 2 deletions src/common/meta/src/ddl/drop_database/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl DropDatabaseExecutor {
#[async_trait::async_trait]
#[typetag::serde]
impl State for DropDatabaseExecutor {
fn on_recover(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
fn recover(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
self.register_dropping_regions(ddl_ctx)
}

Expand Down Expand Up @@ -372,7 +372,7 @@ mod tests {
drop_if_exists: false,
tables: None,
};
state.on_recover(&ddl_context).unwrap();
state.recover(&ddl_context).unwrap();
assert_eq!(state.dropping_regions.len(), 1);
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(!status.need_persist());
Expand Down
36 changes: 20 additions & 16 deletions src/common/meta/src/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,26 +69,13 @@ impl DropTableProcedure {
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data: DropTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
let executor = data.build_executor();
// Only registers regions if the metadata is deleted.
let register_operating_regions = matches!(
data.state,
DropTableState::DeleteMetadata
| DropTableState::InvalidateTableCache
| DropTableState::DatanodeDropRegions
);
let mut procedure = Self {

Ok(Self {
context,
data,
dropping_regions: vec![],
executor,
};
if register_operating_regions {
procedure
.register_dropping_regions()
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
}
Ok(procedure)
})
}

pub(crate) async fn on_prepare<'a>(&mut self) -> Result<Status> {
Expand Down Expand Up @@ -190,6 +177,23 @@ impl Procedure for DropTableProcedure {
Self::TYPE_NAME
}

fn recover(&mut self) -> ProcedureResult<()> {
// Only registers regions if the metadata is deleted.
let register_operating_regions = matches!(
self.data.state,
DropTableState::DeleteMetadata
| DropTableState::InvalidateTableCache
| DropTableState::DatanodeDropRegions
);
if register_operating_regions {
self.register_dropping_regions()
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
}

Ok(())
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.data.state;
let _timer = metrics::METRIC_META_PROCEDURE_DROP_TABLE
Expand Down
79 changes: 78 additions & 1 deletion src/common/meta/src/ddl/tests/drop_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ use std::sync::Arc;

use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId};
use common_procedure_test::MockContextProvider;
use common_procedure_test::{
execute_procedure_until, execute_procedure_until_done, MockContextProvider,
};
use futures::TryStreamExt;

use crate::ddl::drop_database::executor::DropDatabaseExecutor;
use crate::ddl::drop_database::DropDatabaseProcedure;
use crate::ddl::test_util::datanode_handler::{NaiveDatanodeHandler, RetryErrorDatanodeHandler};
use crate::ddl::test_util::{create_logical_table, create_physical_table};
Expand Down Expand Up @@ -121,3 +124,77 @@ async fn test_drop_database_retryable_error() {
}
}
}

#[tokio::test]
async fn test_drop_database_recover() {
common_telemetry::init_default_ut_logging();
let cluster_id = 1;
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let ddl_context = new_ddl_context(node_manager);
ddl_context
.table_metadata_manager
.schema_manager()
.create(
SchemaNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME),
None,
false,
)
.await
.unwrap();
// Creates a physical table
let phy_id = create_physical_table(&ddl_context, cluster_id, "phy").await;
// Creates a logical tables
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await;
let mut procedure = DropDatabaseProcedure::new(
DEFAULT_CATALOG_NAME.to_string(),
DEFAULT_SCHEMA_NAME.to_string(),
false,
ddl_context.clone(),
);
let num_operating_regions = 1;
// Before dropping the logical table
execute_procedure_until(&mut procedure, |p| {
p.state()
.as_any()
.downcast_ref::<DropDatabaseExecutor>()
.is_some()
})
.await;
// Dump data
let data = procedure.dump().unwrap();
assert_eq!(ddl_context.memory_region_keeper.len(), 0);
let mut procedure = DropDatabaseProcedure::from_json(&data, ddl_context.clone()).unwrap();
procedure.recover().unwrap();
assert_eq!(
ddl_context.memory_region_keeper.len(),
num_operating_regions
);
ddl_context.memory_region_keeper.clear();
// Before dropping the physical table
execute_procedure_until(&mut procedure, |p| {
p.state()
.as_any()
.downcast_ref::<DropDatabaseExecutor>()
.is_some()
})
.await;
// Dump data
let data = procedure.dump().unwrap();
assert_eq!(ddl_context.memory_region_keeper.len(), 0);
let mut procedure = DropDatabaseProcedure::from_json(&data, ddl_context.clone()).unwrap();
procedure.recover().unwrap();
assert_eq!(
ddl_context.memory_region_keeper.len(),
num_operating_regions
);
ddl_context.memory_region_keeper.clear();
execute_procedure_until_done(&mut procedure).await;
let tables = ddl_context
.table_metadata_manager
.table_name_manager()
.tables(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert!(tables.is_empty());
}
6 changes: 4 additions & 2 deletions src/common/meta/src/ddl/tests/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ async fn test_from_json() {
);
// Cleans up the keeper.
ddl_context.memory_region_keeper.clear();
let procedure = DropTableProcedure::from_json(&data, ddl_context.clone()).unwrap();
let mut procedure = DropTableProcedure::from_json(&data, ddl_context.clone()).unwrap();
procedure.recover().unwrap();
assert_eq!(
ddl_context.memory_region_keeper.len(),
num_operating_regions_after_recovery
Expand Down Expand Up @@ -350,7 +351,8 @@ async fn test_from_json() {
);
// Cleans up the keeper.
ddl_context.memory_region_keeper.clear();
let procedure = DropTableProcedure::from_json(&data, ddl_context.clone()).unwrap();
let mut procedure = DropTableProcedure::from_json(&data, ddl_context.clone()).unwrap();
procedure.recover().unwrap();
assert_eq!(
ddl_context.memory_region_keeper.len(),
num_operating_regions_after_recovery
Expand Down

0 comments on commit 50c2a19

Please sign in to comment.