Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: acquire all locks in procedure #3514

Merged
merged 4 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/common/meta/src/ddl/create_logical_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::ddl::DdlContext;
use crate::error::{Result, TableAlreadyExistsSnafu};
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::lock_key::{TableLock, TableNameLock};
use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
Expand Down Expand Up @@ -307,8 +307,15 @@ impl Procedure for CreateLogicalTablesProcedure {
}

fn lock_key(&self) -> LockKey {
let mut lock_key = Vec::with_capacity(1 + self.creator.data.tasks.len());
// CatalogLock, SchemaLock,
// TableLock
// TableNameLock(s)
let mut lock_key = Vec::with_capacity(2 + 1 + self.creator.data.tasks.len());
let table_ref = self.creator.data.tasks[0].table_ref();
lock_key.push(CatalogLock::Read(table_ref.catalog).into());
lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
lock_key.push(TableLock::Write(self.creator.data.physical_table_id()).into());

for task in &self.creator.data.tasks {
lock_key.push(
TableNameLock::new(
Expand Down
12 changes: 6 additions & 6 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext};
use crate::error::{self, Result, TableRouteNotFoundSnafu};
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::lock_key::TableNameLock;
use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
use crate::region_keeper::OperatingRegionGuard;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{
Expand Down Expand Up @@ -343,11 +343,11 @@ impl Procedure for CreateTableProcedure {
fn lock_key(&self) -> LockKey {
let table_ref = &self.creator.data.table_ref();

LockKey::single(TableNameLock::new(
table_ref.catalog,
table_ref.schema,
table_ref.table,
))
LockKey::new(vec![
CatalogLock::Read(table_ref.catalog).into(),
SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
])
}
}

Expand Down
68 changes: 49 additions & 19 deletions src/meta-srv/src/procedure/region_failover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use async_trait::async_trait;
use common_meta::key::datanode_table::DatanodeTableKey;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::ResettableKvBackendRef;
use common_meta::lock_key::{RegionLock, TableLock};
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
use common_meta::table_name::TableName;
use common_meta::{ClusterId, RegionIdent};
use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
Expand All @@ -44,7 +45,7 @@ use snafu::ResultExt;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;

use crate::error::{RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu};
use crate::error::{self, RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu};
use crate::lock::DistLockRef;
use crate::metasrv::{SelectorContext, SelectorRef};
use crate::service::mailbox::MailboxRef;
Expand Down Expand Up @@ -164,7 +165,14 @@ impl RegionFailoverManager {
return Ok(());
};

if !self.table_exists(failed_region).await? {
let table_info = self
.table_metadata_manager
.table_info_manager()
.get(failed_region.table_id)
.await
.context(error::TableMetadataManagerSnafu)?;

if table_info.is_none() {
// The table could be dropped before the failure detector knows it. Then the region
// failover is not needed.
// Or the table could be renamed. But we will have a new region ident to detect failure.
Expand All @@ -178,7 +186,15 @@ impl RegionFailoverManager {
}

let context = self.create_context();
let procedure = RegionFailoverProcedure::new(failed_region.clone(), context);
// Safety: Check before.
let table_info = table_info.unwrap();
let TableName {
catalog_name,
schema_name,
..
} = table_info.table_name();
let procedure =
RegionFailoverProcedure::new(catalog_name, schema_name, failed_region.clone(), context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let procedure_id = procedure_with_id.id;
info!("Starting region failover procedure {procedure_id} for region {failed_region:?}");
Expand Down Expand Up @@ -206,16 +222,6 @@ impl RegionFailoverManager {
Ok(())
}

async fn table_exists(&self, failed_region: &RegionIdent) -> Result<bool> {
Ok(self
.table_metadata_manager
.table_route_manager()
.get_region_distribution(failed_region.table_id)
.await
.context(TableMetadataManagerSnafu)?
.is_some())
}

async fn failed_region_exists(&self, failed_region: &RegionIdent) -> Result<bool> {
let table_id = failed_region.table_id;
let datanode_id = failed_region.datanode_id;
Expand All @@ -238,10 +244,17 @@ impl RegionFailoverManager {
}
}

#[derive(Serialize, Deserialize, Debug)]
struct LockMeta {
catalog: String,
schema: String,
}

/// A "Node" in the state machine of region failover procedure.
/// Contains the current state and the data.
#[derive(Serialize, Deserialize, Debug)]
struct Node {
lock_meta: LockMeta,
failed_region: RegionIdent,
state: Box<dyn State>,
}
Expand Down Expand Up @@ -330,9 +343,15 @@ pub struct RegionFailoverProcedure {
impl RegionFailoverProcedure {
const TYPE_NAME: &'static str = "metasrv-procedure::RegionFailover";

pub fn new(failed_region: RegionIdent, context: RegionFailoverContext) -> Self {
pub fn new(
catalog: String,
schema: String,
failed_region: RegionIdent,
context: RegionFailoverContext,
) -> Self {
let state = RegionFailoverStart::new();
let node = Node {
lock_meta: LockMeta { catalog, schema },
failed_region,
state: Box::new(state),
};
Expand Down Expand Up @@ -372,8 +391,9 @@ impl Procedure for RegionFailoverProcedure {

fn lock_key(&self) -> LockKey {
let region_ident = &self.node.failed_region;
// TODO(weny): acquires the catalog, schema read locks.
let lock_key = vec![
CatalogLock::Read(&self.node.lock_meta.catalog).into(),
SchemaLock::read(&self.node.lock_meta.catalog, &self.node.lock_meta.catalog).into(),
TableLock::Read(region_ident.table_id).into(),
RegionLock::Write(RegionId::new(
region_ident.table_id,
Expand Down Expand Up @@ -568,6 +588,8 @@ mod tests {
let failed_region = env.failed_region(1).await;

let mut procedure = Box::new(RegionFailoverProcedure::new(
"greptime".into(),
"public".into(),
failed_region.clone(),
env.context.clone(),
)) as BoxedProcedure;
Expand Down Expand Up @@ -671,7 +693,7 @@ mod tests {

assert_eq!(
procedure.dump().unwrap(),
r#"{"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1,"engine":"mito2"},"state":{"region_failover_state":"RegionFailoverEnd"}}"#
r#"{"lock_meta":{"catalog":"greptime","schema":"public"},"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1,"engine":"mito2"},"state":{"region_failover_state":"RegionFailoverEnd"}}"#
);

// Verifies that the failed region (region 1) is moved from failed datanode (datanode 1) to the candidate datanode.
Expand Down Expand Up @@ -700,6 +722,10 @@ mod tests {

let state = RegionFailoverStart::new();
let node = Node {
lock_meta: LockMeta {
catalog: "greptime".into(),
schema: "public".into(),
},
failed_region,
state: Box::new(state),
};
Expand All @@ -711,12 +737,12 @@ mod tests {
let s = procedure.dump().unwrap();
assert_eq!(
s,
r#"{"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1,"engine":"mito2"},"state":{"region_failover_state":"RegionFailoverStart","failover_candidate":null}}"#
r#"{"lock_meta":{"catalog":"greptime","schema":"public"},"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1,"engine":"mito2"},"state":{"region_failover_state":"RegionFailoverStart","failover_candidate":null}}"#,
);
let n: Node = serde_json::from_str(&s).unwrap();
assert_eq!(
format!("{n:?}"),
r#"Node { failed_region: RegionIdent { cluster_id: 0, datanode_id: 1, table_id: 1, region_number: 1, engine: "mito2" }, state: RegionFailoverStart { failover_candidate: None } }"#
r#"Node { lock_meta: LockMeta { catalog: "greptime", schema: "public" }, failed_region: RegionIdent { cluster_id: 0, datanode_id: 1, table_id: 1, region_number: 1, engine: "mito2" }, state: RegionFailoverStart { failover_candidate: None } }"#,
);
}

Expand Down Expand Up @@ -765,6 +791,10 @@ mod tests {

let state = RegionFailoverStart::new();
let node = Node {
lock_meta: LockMeta {
catalog: "greptime".into(),
schema: "public".into(),
},
failed_region,
state: Box::new(state),
};
Expand Down
19 changes: 9 additions & 10 deletions src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
// limitations under the License.

pub(crate) mod downgrade_leader_region;
// TODO(weny): remove it.
#[allow(dead_code)]
pub(crate) mod manager;
pub(crate) mod migration_abort;
pub(crate) mod migration_end;
Expand All @@ -36,7 +34,7 @@ use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use common_meta::lock_key::{RegionLock, TableLock};
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
use common_meta::peer::Peer;
use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
use common_meta::ClusterId;
Expand All @@ -61,6 +59,10 @@ use crate::service::mailbox::{BroadcastChannel, MailboxRef};
/// **Notes: Stores with too large data in the context might incur replication overhead.**
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PersistentContext {
/// The table catalog.
catalog: String,
/// The table schema.
schema: String,
/// The Id of the cluster.
cluster_id: ClusterId,
/// The [Peer] of migration source.
Expand All @@ -81,8 +83,9 @@ fn default_replay_timeout() -> Duration {
impl PersistentContext {
pub fn lock_key(&self) -> Vec<StringKey> {
let region_id = self.region_id;
// TODO(weny): acquires the catalog, schema read locks.
let lock_key = vec![
CatalogLock::Read(&self.catalog).into(),
SchemaLock::read(&self.catalog, &self.schema).into(),
TableLock::Read(region_id.table_id()).into(),
RegionLock::Write(region_id).into(),
];
Expand Down Expand Up @@ -185,8 +188,6 @@ impl ContextFactory for DefaultContextFactory {
}
}

// TODO(weny): remove it.
#[allow(dead_code)]
/// The context of procedure execution.
pub struct Context {
persistent_ctx: PersistentContext,
Expand Down Expand Up @@ -368,7 +369,6 @@ pub struct RegionMigrationProcedure {
context: Context,
}

// TODO(weny): remove it.
#[allow(dead_code)]
impl RegionMigrationProcedure {
const TYPE_NAME: &'static str = "metasrv-procedure::RegionMigration";
Expand Down Expand Up @@ -487,16 +487,15 @@ mod tests {
let procedure = RegionMigrationProcedure::new(persistent_context, context);

let serialized = procedure.dump().unwrap();

let expected = r#"{"persistent_ctx":{"cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105,"replay_timeout":"1s"},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
let expected = r#"{"persistent_ctx":{"catalog":"greptime","schema":"public","cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105,"replay_timeout":"1s"},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
assert_eq!(expected, serialized);
}

#[test]
fn test_backward_compatibility() {
let persistent_ctx = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
// NOTES: Changes it will break backward compatibility.
let serialized = r#"{"cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105}"#;
let serialized = r#"{"catalog":"greptime","schema":"public","cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105}"#;
let deserialized: PersistentContext = serde_json::from_str(serialized).unwrap();

assert_eq!(persistent_ctx, deserialized);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ mod tests {

fn new_persistent_context() -> PersistentContext {
PersistentContext {
catalog: "greptime".into(),
schema: "public".into(),
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
region_id: RegionId::new(1024, 1),
Expand Down
Loading