Skip to content

Commit

Permalink
feat: expose region migration http endpoint (#3032)
Browse files Browse the repository at this point in the history
* feat: add region migration endpoint

* feat: implement naive peer registry

* chore: apply suggestions from CR

* chore: rename `ContextFactoryImpl` to `DefaultContextFactory`

* chore: rename unregister to deregister

* refactor: use lease-based alive datanode checking
  • Loading branch information
WenyXu authored Dec 29, 2023
1 parent b526d15 commit d22072f
Show file tree
Hide file tree
Showing 12 changed files with 185 additions and 38 deletions.
13 changes: 12 additions & 1 deletion src/meta-srv/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@ impl MetaPeerClient {
to_stat_kv_map(kvs)
}

// Get kv information from the leader's in_mem kv store.
pub async fn get(&self, key: Vec<u8>) -> Result<Option<KeyValue>> {
let mut kvs = self.range(key, vec![], false).await?;
Ok(if kvs.is_empty() {
None
} else {
debug_assert_eq!(kvs.len(), 1);
Some(kvs.remove(0))
})
}

// Range kv information from the leader's in_mem kv store
pub async fn range(
&self,
Expand Down Expand Up @@ -228,7 +239,7 @@ impl MetaPeerClient {

// Check if the meta node is a leader node.
// Note: when self.election is None, we also consider the meta node is leader
fn is_leader(&self) -> bool {
pub(crate) fn is_leader(&self) -> bool {
self.election
.as_ref()
.map(|election| election.is_leader())
Expand Down
6 changes: 5 additions & 1 deletion src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ use crate::pubsub::Message;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("The target peer is unavailable temporally: {}", peer_id))]
PeerUnavailable { location: Location, peer_id: u64 },

#[snafu(display("Another migration procedure is running for region: {}", region_id))]
MigrationRunning {
location: Location,
Expand Down Expand Up @@ -650,7 +653,8 @@ impl ErrorExt for Error {
| Error::Join { .. }
| Error::WeightArray { .. }
| Error::NotSetWeightArray { .. }
| Error::Unsupported { .. } => StatusCode::Internal,
| Error::Unsupported { .. }
| Error::PeerUnavailable { .. } => StatusCode::Internal,
Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
Error::EmptyKey { .. }
| Error::MissingRequiredParameter { .. }
Expand Down
2 changes: 1 addition & 1 deletion src/meta-srv/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl HeartbeatHandlerGroup {
let _ = self.pushers.insert(key.to_string(), pusher).await;
}

pub async fn unregister(&self, key: impl AsRef<str>) -> Option<Pusher> {
pub async fn deregister(&self, key: impl AsRef<str>) -> Option<Pusher> {
let key = key.as_ref();
METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
info!("Pusher unregister: {}", key);
Expand Down
42 changes: 36 additions & 6 deletions src/meta-srv/src/lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,57 @@

use std::collections::HashMap;

use common_meta::util;
use common_meta::peer::Peer;
use common_meta::{util, ClusterId};
use common_time::util as time_util;

use crate::cluster::MetaPeerClientRef;
use crate::error::Result;
use crate::keys::{LeaseKey, LeaseValue, DN_LEASE_PREFIX};

fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseKey, &LeaseValue) -> bool {
move |_: &LeaseKey, v: &LeaseValue| {
((time_util::current_time_millis() - v.timestamp_millis) as u64) < lease_secs * 1000
}
}

pub async fn lookup_alive_datanode_peer(
cluster_id: ClusterId,
datanode_id: u64,
meta_peer_client: &MetaPeerClientRef,
lease_secs: u64,
) -> Result<Option<Peer>> {
let lease_filter = build_lease_filter(lease_secs);
let lease_key = LeaseKey {
cluster_id,
node_id: datanode_id,
};
let Some(kv) = meta_peer_client.get(lease_key.clone().try_into()?).await? else {
return Ok(None);
};
let lease_value: LeaseValue = kv.value.try_into()?;
if lease_filter(&lease_key, &lease_value) {
Ok(Some(Peer {
id: lease_key.node_id,
addr: lease_value.node_addr,
}))
} else {
Ok(None)
}
}

pub async fn alive_datanodes(
cluster_id: u64,
cluster_id: ClusterId,
meta_peer_client: &MetaPeerClientRef,
lease_secs: u64,
) -> Result<HashMap<LeaseKey, LeaseValue>> {
let lease_filter = |_: &LeaseKey, v: &LeaseValue| {
((time_util::current_time_millis() - v.timestamp_millis) as u64) < lease_secs * 1000
};
let lease_filter = build_lease_filter(lease_secs);

filter_datanodes(cluster_id, meta_peer_client, lease_filter).await
}

pub async fn filter_datanodes<P>(
cluster_id: u64,
cluster_id: ClusterId,
meta_peer_client: &MetaPeerClientRef,
predicate: P,
) -> Result<HashMap<LeaseKey, LeaseValue>>
Expand Down
6 changes: 6 additions & 0 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use crate::error::{
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::handler::HeartbeatHandlerGroup;
use crate::lock::DistLockRef;
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
use crate::pubsub::{PublishRef, SubscribeManagerRef};
use crate::selector::{Selector, SelectorType};
use crate::service::mailbox::MailboxRef;
Expand Down Expand Up @@ -249,6 +250,7 @@ pub struct MetaSrv {
table_metadata_manager: TableMetadataManagerRef,
memory_region_keeper: MemoryRegionKeeperRef,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
region_migration_manager: RegionMigrationManagerRef,

plugins: Plugins,
}
Expand Down Expand Up @@ -411,6 +413,10 @@ impl MetaSrv {
&self.memory_region_keeper
}

pub fn region_migration_manager(&self) -> &RegionMigrationManagerRef {
&self.region_migration_manager
}

pub fn publish(&self) -> Option<PublishRef> {
self.plugins.get::<PublishRef>()
}
Expand Down
14 changes: 14 additions & 0 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ use crate::metasrv::{
ElectionRef, MetaSrv, MetaSrvOptions, MetasrvInfo, SelectorContext, SelectorRef, TABLE_ID_SEQ,
};
use crate::procedure::region_failover::RegionFailoverManager;
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::DefaultContextFactory;
use crate::pubsub::PublishRef;
use crate::selector::lease_based::LeaseBasedSelector;
use crate::service::mailbox::MailboxRef;
Expand Down Expand Up @@ -236,6 +238,17 @@ impl MetaSrvBuilder {
&opening_region_keeper,
)?;

let region_migration_manager = Arc::new(RegionMigrationManager::new(
procedure_manager.clone(),
DefaultContextFactory::new(
table_metadata_manager.clone(),
opening_region_keeper.clone(),
mailbox.clone(),
options.server_addr.clone(),
),
));
region_migration_manager.try_start()?;

let handler_group = match handler_group {
Some(handler_group) => handler_group,
None => {
Expand Down Expand Up @@ -323,6 +336,7 @@ impl MetaSrvBuilder {
.await,
plugins: plugins.unwrap_or_else(Plugins::default),
memory_region_keeper: opening_region_keeper,
region_migration_manager,
})
}
}
Expand Down
22 changes: 20 additions & 2 deletions src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,33 @@ pub trait ContextFactory {

/// Default implementation.
#[derive(Clone)]
pub struct ContextFactoryImpl {
pub struct DefaultContextFactory {
volatile_ctx: VolatileContext,
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: MemoryRegionKeeperRef,
mailbox: MailboxRef,
server_addr: String,
}

impl ContextFactory for ContextFactoryImpl {
impl DefaultContextFactory {
/// Returns an [ContextFactoryImpl].
pub fn new(
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: MemoryRegionKeeperRef,
mailbox: MailboxRef,
server_addr: String,
) -> Self {
Self {
volatile_ctx: VolatileContext::default(),
table_metadata_manager,
opening_region_keeper,
mailbox,
server_addr,
}
}
}

impl ContextFactory for DefaultContextFactory {
fn new_context(self, persistent_ctx: PersistentContext) -> Context {
Context {
persistent_ctx,
Expand Down
29 changes: 17 additions & 12 deletions src/meta-srv/src/procedure/region_migration/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,23 @@ use common_meta::key::table_route::TableRouteValue;
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
use common_meta::ClusterId;
use common_procedure::{watcher, ProcedureManagerRef, ProcedureWithId};
use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
use common_telemetry::{error, info};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;

use crate::error::{self, Result};
use crate::procedure::region_migration::{
ContextFactoryImpl, PersistentContext, RegionMigrationProcedure,
DefaultContextFactory, PersistentContext, RegionMigrationProcedure,
};

pub type RegionMigrationManagerRef = Arc<RegionMigrationManager>;

/// Manager of region migration procedure.
pub(crate) struct RegionMigrationManager {
pub struct RegionMigrationManager {
procedure_manager: ProcedureManagerRef,
running_procedures: Arc<RwLock<HashMap<RegionId, RegionMigrationProcedureTask>>>,
context_factory: ContextFactoryImpl,
context_factory: DefaultContextFactory,
}

/// The guard of running [RegionMigrationProcedureTask].
Expand All @@ -55,10 +57,10 @@ impl Drop for RegionMigrationProcedureGuard {

#[derive(Debug, Clone)]
pub(crate) struct RegionMigrationProcedureTask {
cluster_id: ClusterId,
region_id: RegionId,
from_peer: Peer,
to_peer: Peer,
pub(crate) cluster_id: ClusterId,
pub(crate) region_id: RegionId,
pub(crate) from_peer: Peer,
pub(crate) to_peer: Peer,
}

impl Display for RegionMigrationProcedureTask {
Expand Down Expand Up @@ -93,7 +95,7 @@ impl RegionMigrationManager {
/// Returns new [RegionMigrationManager]
pub(crate) fn new(
procedure_manager: ProcedureManagerRef,
context_factory: ContextFactoryImpl,
context_factory: DefaultContextFactory,
) -> Self {
Self {
procedure_manager,
Expand Down Expand Up @@ -221,7 +223,10 @@ impl RegionMigrationManager {
}

/// Submits a new region migration procedure.
pub(crate) async fn submit_procedure(&self, task: RegionMigrationProcedureTask) -> Result<()> {
pub(crate) async fn submit_procedure(
&self,
task: RegionMigrationProcedureTask,
) -> Result<Option<ProcedureId>> {
let Some(guard) = self.insert_running_procedure(&task) else {
return error::MigrationRunningSnafu {
region_id: task.region_id,
Expand All @@ -243,7 +248,7 @@ impl RegionMigrationManager {

if self.has_migrated(&region_route, &task)? {
info!("Skipping region migration task: {task}");
return Ok(());
return Ok(None);
}

self.verify_region_leader_peer(&region_route, &task)?;
Expand Down Expand Up @@ -274,7 +279,7 @@ impl RegionMigrationManager {
info!("Region migration procedure {procedure_id} for {task} is finished successfully!");
});

Ok(())
Ok(Some(procedure_id))
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/meta-srv/src/procedure/region_migration/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use tokio::sync::mpsc::{Receiver, Sender};

use super::migration_abort::RegionMigrationAbort;
use super::upgrade_candidate_region::UpgradeCandidateRegion;
use super::{Context, ContextFactory, ContextFactoryImpl, State, VolatileContext};
use super::{Context, ContextFactory, DefaultContextFactory, State, VolatileContext};
use crate::error::{self, Error, Result};
use crate::handler::{HeartbeatMailbox, Pusher, Pushers};
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
Expand Down Expand Up @@ -120,8 +120,8 @@ impl TestingEnv {
}

/// Returns a context of region migration procedure.
pub fn context_factory(&self) -> ContextFactoryImpl {
ContextFactoryImpl {
pub fn context_factory(&self) -> DefaultContextFactory {
DefaultContextFactory {
table_metadata_manager: self.table_metadata_manager.clone(),
opening_region_keeper: self.opening_region_keeper.clone(),
volatile_ctx: Default::default(),
Expand Down
6 changes: 6 additions & 0 deletions src/meta-srv/src/service/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {
.route("/route", handler.clone())
.route("/route/help", handler);

let handler = region_migration::SubmitRegionMigrationTaskHandler {
region_migration_manager: meta_srv.region_migration_manager().clone(),
meta_peer_client: meta_srv.meta_peer_client().clone(),
};
let router = router.route("/region-migration", handler);

let router = Router::nest("/admin", router);

Admin::new(router)
Expand Down
Loading

0 comments on commit d22072f

Please sign in to comment.