Skip to content

Commit

Permalink
feat(metasrv): implement maintenance (#3527)
Browse files Browse the repository at this point in the history
* feat(metasrv): implement maintenance

Signed-off-by: tison <wander4096@gmail.com>

* fixup and test

Signed-off-by: tison <wander4096@gmail.com>

* Add coauthors

Co-authored-by: Yingwen <realevenyag@gmail.com>
Co-authored-by: xifyang <595482900@qq.com>

* tidy code

Signed-off-by: tison <wander4096@gmail.com>

* Apply suggestions from code review

Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com>

* always read kv_backend maintenance state

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
Co-authored-by: xifyang <595482900@qq.com>
Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com>
  • Loading branch information
4 people authored Mar 18, 2024
1 parent 393ea44 commit 0afac58
Show file tree
Hide file tree
Showing 11 changed files with 230 additions and 38 deletions.
4 changes: 2 additions & 2 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ use crate::kv_backend::KvBackendRef;
use crate::rpc::router::{region_distribution, RegionRoute, RegionStatus};
use crate::DatanodeId;

pub const REMOVED_PREFIX: &str = "__removed";

pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.]*";
pub const MAINTENANCE_KEY: &str = "maintenance";

const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
const TABLE_REGION_KEY_PREFIX: &str = "__table_region";

pub const REMOVED_PREFIX: &str = "__removed";
pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
Expand Down
9 changes: 9 additions & 0 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,14 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to parse bool: {}", err_msg))]
ParseBool {
err_msg: String,
#[snafu(source)]
error: std::str::ParseBoolError,
location: Location,
},

#[snafu(display("Invalid arguments: {}", err_msg))]
InvalidArguments { err_msg: String, location: Location },

Expand Down Expand Up @@ -709,6 +717,7 @@ impl ErrorExt for Error {
| Error::InvalidStatKey { .. }
| Error::InvalidInactiveRegionKey { .. }
| Error::ParseNum { .. }
| Error::ParseBool { .. }
| Error::ParseAddr { .. }
| Error::ParseDuration { .. }
| Error::UnsupportedSelectorType { .. }
Expand Down
36 changes: 36 additions & 0 deletions src/meta-srv/src/handler/failure_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ impl HeartbeatHandler for RegionFailureHandler {

#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;

use common_meta::key::MAINTENANCE_KEY;
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;

Expand Down Expand Up @@ -163,4 +166,37 @@ mod tests {
let dump = handler.failure_detect_runner.dump().await;
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 0);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_maintenance_mode() {
let region_failover_manager = create_region_failover_manager();
let kv_backend = region_failover_manager.create_context().kv_backend.clone();
let _handler = RegionFailureHandler::try_new(
None,
region_failover_manager.clone(),
PhiAccrualFailureDetectorOptions::default(),
)
.await
.unwrap();

let kv_req = common_meta::rpc::store::PutRequest {
key: Vec::from(MAINTENANCE_KEY),
value: vec![],
prev_kv: false,
};
let _ = kv_backend.put(kv_req.clone()).await.unwrap();
assert_matches!(
region_failover_manager.is_maintenance_mode().await,
Ok(true)
);

let _ = kv_backend
.delete(MAINTENANCE_KEY.as_bytes(), false)
.await
.unwrap();
assert_matches!(
region_failover_manager.is_maintenance_mode().await,
Ok(false)
);
}
}
77 changes: 48 additions & 29 deletions src/meta-srv/src/handler/failure_handler/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,40 +140,59 @@ impl FailureDetectRunner {
let election = self.election.clone();
let region_failover_manager = self.region_failover_manager.clone();
let runner_handle = common_runtime::spawn_bg(async move {
async fn maybe_region_failover(
failure_detectors: &Arc<FailureDetectorContainer>,
region_failover_manager: &Arc<RegionFailoverManager>,
) {
match region_failover_manager.is_maintenance_mode().await {
Ok(false) => {}
Ok(true) => {
info!("Maintenance mode is enabled, skip failover");
return;
}
Err(err) => {
error!(err; "Failed to check maintenance mode");
return;
}
}

let failed_regions = failure_detectors
.iter()
.filter_map(|e| {
// Intentionally not place `current_time_millis()` out of the iteration.
// The failure detection determination should be happened "just in time",
// i.e., failed or not has to be compared with the most recent "now".
// Besides, it might reduce the false positive of failure detection,
// because during the iteration, heartbeats are coming in as usual,
// and the `phi`s are still updating.
if !e.failure_detector().is_available(current_time_millis()) {
Some(e.region_ident().clone())
} else {
None
}
})
.collect::<Vec<RegionIdent>>();

for r in failed_regions {
if let Err(e) = region_failover_manager.do_region_failover(&r).await {
error!(e; "Failed to do region failover for {r}");
} else {
// Now that we know the region is starting to do failover, remove it
// from the failure detectors, avoiding the failover procedure to be
// triggered again.
// If the region is back alive (the failover procedure runs successfully),
// it will be added back to the failure detectors again.
failure_detectors.remove(&r);
}
}
}

loop {
let start = Instant::now();

let is_leader = election.as_ref().map(|x| x.is_leader()).unwrap_or(true);
if is_leader {
let failed_regions = failure_detectors
.iter()
.filter_map(|e| {
// Intentionally not place `current_time_millis()` out of the iteration.
// The failure detection determination should be happened "just in time",
// i.e., failed or not has to be compared with the most recent "now".
// Besides, it might reduce the false positive of failure detection,
// because during the iteration, heartbeats are coming in as usual,
// and the `phi`s are still updating.
if !e.failure_detector().is_available(current_time_millis()) {
Some(e.region_ident().clone())
} else {
None
}
})
.collect::<Vec<RegionIdent>>();

for r in failed_regions {
if let Err(e) = region_failover_manager.do_region_failover(&r).await {
error!(e; "Failed to do region failover for {r}");
} else {
// Now that we know the region is starting to do failover, remove it
// from the failure detectors, avoiding the failover procedure to be
// triggered again.
// If the region is back alive (the failover procedure runs successfully),
// it will be added back to the failure detectors again.
failure_detectors.remove(&r);
}
}
maybe_region_failover(&failure_detectors, &region_failover_manager).await;
}

let elapsed = Instant::now().duration_since(start);
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use tokio::sync::broadcast::error::RecvError;
use crate::cluster::MetaPeerClientRef;
use crate::election::{Election, LeaderChangeMessage};
use crate::error::{
self, InitMetadataSnafu, Result, StartProcedureManagerSnafu, StartTelemetryTaskSnafu,
InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu, StartTelemetryTaskSnafu,
StopProcedureManagerSnafu,
};
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
Expand Down Expand Up @@ -357,7 +357,7 @@ impl MetaSrv {
self.leader_cached_kv_backend
.load()
.await
.context(error::KvBackendSnafu)?;
.context(KvBackendSnafu)?;
self.procedure_manager
.start()
.await
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ impl MetaSrvBuilder {
let region_failover_manager = Arc::new(RegionFailoverManager::new(
distributed_time_constants::REGION_LEASE_SECS,
in_memory.clone(),
kv_backend.clone(),
mailbox.clone(),
procedure_manager.clone(),
(selector.clone(), selector_ctx.clone()),
Expand Down
22 changes: 19 additions & 3 deletions src/meta-srv/src/procedure/region_failover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use std::time::Duration;

use async_trait::async_trait;
use common_meta::key::datanode_table::DatanodeTableKey;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::ResettableKvBackendRef;
use common_meta::key::{TableMetadataManagerRef, MAINTENANCE_KEY};
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
use common_meta::table_name::TableName;
use common_meta::{ClusterId, RegionIdent};
Expand All @@ -45,7 +45,9 @@ use snafu::ResultExt;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;

use crate::error::{self, RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu};
use crate::error::{
self, KvBackendSnafu, RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu,
};
use crate::lock::DistLockRef;
use crate::metasrv::{SelectorContext, SelectorRef};
use crate::service::mailbox::MailboxRef;
Expand Down Expand Up @@ -73,6 +75,7 @@ impl From<RegionIdent> for RegionFailoverKey {
pub(crate) struct RegionFailoverManager {
region_lease_secs: u64,
in_memory: ResettableKvBackendRef,
kv_backend: KvBackendRef,
mailbox: MailboxRef,
procedure_manager: ProcedureManagerRef,
selector: SelectorRef,
Expand All @@ -94,9 +97,11 @@ impl Drop for FailoverProcedureGuard {
}

impl RegionFailoverManager {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
region_lease_secs: u64,
in_memory: ResettableKvBackendRef,
kv_backend: KvBackendRef,
mailbox: MailboxRef,
procedure_manager: ProcedureManagerRef,
(selector, selector_ctx): (SelectorRef, SelectorContext),
Expand All @@ -106,6 +111,7 @@ impl RegionFailoverManager {
Self {
region_lease_secs,
in_memory,
kv_backend,
mailbox,
procedure_manager,
selector,
Expand All @@ -120,6 +126,7 @@ impl RegionFailoverManager {
RegionFailoverContext {
region_lease_secs: self.region_lease_secs,
in_memory: self.in_memory.clone(),
kv_backend: self.kv_backend.clone(),
mailbox: self.mailbox.clone(),
selector: self.selector.clone(),
selector_ctx: self.selector_ctx.clone(),
Expand Down Expand Up @@ -159,6 +166,13 @@ impl RegionFailoverManager {
}
}

pub(crate) async fn is_maintenance_mode(&self) -> Result<bool> {
self.kv_backend
.exists(MAINTENANCE_KEY.as_bytes())
.await
.context(KvBackendSnafu)
}

pub(crate) async fn do_region_failover(&self, failed_region: &RegionIdent) -> Result<()> {
let Some(guard) = self.insert_running_procedures(failed_region) else {
warn!("Region failover procedure for region {failed_region} is already running!");
Expand Down Expand Up @@ -264,6 +278,7 @@ struct Node {
pub struct RegionFailoverContext {
pub region_lease_secs: u64,
pub in_memory: ResettableKvBackendRef,
pub kv_backend: KvBackendRef,
pub mailbox: MailboxRef,
pub selector: SelectorRef,
pub selector_ctx: SelectorContext,
Expand Down Expand Up @@ -569,6 +584,7 @@ mod tests {
context: RegionFailoverContext {
region_lease_secs: 10,
in_memory,
kv_backend,
mailbox,
selector,
selector_ctx,
Expand Down
10 changes: 8 additions & 2 deletions src/meta-srv/src/service/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
mod health;
mod heartbeat;
mod leader;
mod maintenance;
mod meta;
// TODO(weny): removes it.
mod node_lease;
#[allow(dead_code)]
mod region_migration;
mod route;
mod util;
Expand Down Expand Up @@ -99,6 +98,13 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {
};
let router = router.route("/region-migration", handler);

let handler = maintenance::MaintenanceHandler {
kv_backend: meta_srv.kv_backend().clone(),
};
let router = router
.route("/maintenance", handler.clone())
.route("/maintenance/set", handler);

let router = Router::nest("/admin", router);

Admin::new(router)
Expand Down
Loading

0 comments on commit 0afac58

Please sign in to comment.