From 0c24374a84bb7fae250e0fbea2691030e0d095ad Mon Sep 17 00:00:00 2001 From: lyang24 Date: Tue, 2 Jan 2024 21:38:10 -0800 Subject: [PATCH] chore: impl KvBackend for MetaPeerClient --- src/meta-srv/src/cluster.rs | 251 ++++++++++++++++++++++-------------- src/meta-srv/src/lease.rs | 12 +- 2 files changed, 166 insertions(+), 97 deletions(-) diff --git a/src/meta-srv/src/cluster.rs b/src/meta-srv/src/cluster.rs index 7e460664c181..2dbc0c1ea66c 100644 --- a/src/meta-srv/src/cluster.rs +++ b/src/meta-srv/src/cluster.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Duration; @@ -22,8 +23,12 @@ use api::v1::meta::{ RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse, ResponseHeader, }; use common_grpc::channel_manager::ChannelManager; -use common_meta::kv_backend::ResettableKvBackendRef; -use common_meta::rpc::store::{BatchGetRequest, RangeRequest}; +use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef, TxnService}; +use common_meta::rpc::store::{ + BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, + BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, + DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, +}; use common_meta::rpc::KeyValue; use common_meta::util; use common_telemetry::warn; @@ -49,66 +54,64 @@ pub struct MetaPeerClient { retry_interval_ms: u64, } -impl MetaPeerClient { - async fn get_dn_key_value(&self, keys_only: bool) -> Result> { - let key = format!("{DN_STAT_PREFIX}-").into_bytes(); - let range_end = util::get_prefix_end_key(&key); - self.range(key, range_end, keys_only).await - } +#[async_trait::async_trait] +impl TxnService for MetaPeerClient { + type Error = error::Error; +} - // Get all datanode stat kvs from leader meta. - pub async fn get_all_dn_stat_kvs(&self) -> Result> { - let kvs = self.get_dn_key_value(false).await?; - to_stat_kv_map(kvs) +#[async_trait::async_trait] +impl KvBackend for MetaPeerClient { + fn name(&self) -> &str { + "MetaPeerClient" } - pub async fn get_node_cnt(&self) -> Result { - let kvs = self.get_dn_key_value(true).await?; - kvs.into_iter() - .map(|kv| kv.key.try_into()) - .collect::>>() - .map(|hash_set| hash_set.len() as i32) + fn as_any(&self) -> &dyn Any { + self } - // Get datanode stat kvs from leader meta by input keys. - pub async fn get_dn_stat_kvs(&self, keys: Vec) -> Result> { - let stat_keys = keys.into_iter().map(|key| key.into()).collect(); + async fn range(&self, req: RangeRequest) -> Result { + if self.is_leader() { + return self + .in_memory + .range(req) + .await + .context(error::KvBackendSnafu); + } - let kvs = self.batch_get(stat_keys).await?; + let max_retry_count = self.max_retry_count; + let retry_interval_ms = self.retry_interval_ms; - to_stat_kv_map(kvs) - } + for _ in 0..max_retry_count { + match self + .remote_range(req.key.clone(), req.range_end.clone(), req.keys_only) + .await + { + Ok(res) => return Ok(res), + Err(e) => { + if need_retry(&e) { + warn!("Encountered an error that need to retry, err: {:?}", e); + tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await; + } else { + return Err(e); + } + } + } + } - // Get kv information from the leader's in_mem kv store. - pub async fn get(&self, key: Vec) -> Result> { - let mut kvs = self.range(key, vec![], false).await?; - Ok(if kvs.is_empty() { - None - } else { - debug_assert_eq!(kvs.len(), 1); - Some(kvs.remove(0)) - }) + error::ExceededRetryLimitSnafu { + func_name: "range", + retry_num: max_retry_count, + } + .fail() } - // Range kv information from the leader's in_mem kv store - pub async fn range( - &self, - key: Vec, - range_end: Vec, - keys_only: bool, - ) -> Result> { + // Get kv information from the leader's in_mem kv store + async fn batch_get(&self, req: BatchGetRequest) -> Result { if self.is_leader() { - let request = RangeRequest { - key, - range_end, - ..Default::default() - }; - return self .in_memory - .range(request) + .batch_get(req) .await - .map(|resp| resp.kvs) .context(error::KvBackendSnafu); } @@ -116,11 +119,8 @@ impl MetaPeerClient { let retry_interval_ms = self.retry_interval_ms; for _ in 0..max_retry_count { - match self - .remote_range(key.clone(), range_end.clone(), keys_only) - .await - { - Ok(kvs) => return Ok(kvs), + match self.remote_batch_get(req.keys.clone()).await { + Ok(res) => return Ok(res), Err(e) => { if need_retry(&e) { warn!("Encountered an error that need to retry, err: {:?}", e); @@ -133,18 +133,111 @@ impl MetaPeerClient { } error::ExceededRetryLimitSnafu { - func_name: "range", + func_name: "batch_get", retry_num: max_retry_count, } .fail() } + // MetaPeerClient does not support mutable methods listed below. + async fn put(&self, _req: PutRequest) -> Result { + error::UnsupportedSnafu { + operation: "put".to_string(), + } + .fail() + } + + async fn batch_put(&self, _req: BatchPutRequest) -> Result { + error::UnsupportedSnafu { + operation: "batch put".to_string(), + } + .fail() + } + + async fn compare_and_put(&self, _req: CompareAndPutRequest) -> Result { + error::UnsupportedSnafu { + operation: "compare and put".to_string(), + } + .fail() + } + + async fn delete_range(&self, _req: DeleteRangeRequest) -> Result { + error::UnsupportedSnafu { + operation: "delete range".to_string(), + } + .fail() + } + + async fn batch_delete(&self, _req: BatchDeleteRequest) -> Result { + error::UnsupportedSnafu { + operation: "batch delete".to_string(), + } + .fail() + } + + async fn delete(&self, _key: &[u8], _prev_kv: bool) -> Result> { + error::UnsupportedSnafu { + operation: "delete".to_string(), + } + .fail() + } + + async fn put_conditionally( + &self, + _key: Vec, + _value: Vec, + _if_not_exists: bool, + ) -> Result { + error::UnsupportedSnafu { + operation: "put conditionally".to_string(), + } + .fail() + } +} + +impl MetaPeerClient { + async fn get_dn_key_value(&self, keys_only: bool) -> Result> { + let key = format!("{DN_STAT_PREFIX}-").into_bytes(); + let range_end = util::get_prefix_end_key(&key); + let range_request = RangeRequest { + key, + range_end, + keys_only, + ..Default::default() + }; + self.range(range_request).await.map(|res| res.kvs) + } + + // Get all datanode stat kvs from leader meta. + pub async fn get_all_dn_stat_kvs(&self) -> Result> { + let kvs = self.get_dn_key_value(false).await?; + to_stat_kv_map(kvs) + } + + pub async fn get_node_cnt(&self) -> Result { + let kvs = self.get_dn_key_value(true).await?; + kvs.into_iter() + .map(|kv| kv.key.try_into()) + .collect::>>() + .map(|hash_set| hash_set.len() as i32) + } + + // Get datanode stat kvs from leader meta by input keys. + pub async fn get_dn_stat_kvs(&self, keys: Vec) -> Result> { + let stat_keys = keys.into_iter().map(|key| key.into()).collect(); + let batch_get_req = BatchGetRequest { keys: stat_keys }; + + let res = self.batch_get(batch_get_req).await?; + + to_stat_kv_map(res.kvs) + } + async fn remote_range( &self, key: Vec, range_end: Vec, keys_only: bool, - ) -> Result> { + ) -> Result { // Safety: when self.is_leader() == false, election must not empty. let election = self.election.as_ref().unwrap(); @@ -170,47 +263,13 @@ impl MetaPeerClient { check_resp_header(&response.header, Context { addr: &leader_addr })?; - Ok(response.kvs.into_iter().map(KeyValue::new).collect()) - } - - // Get kv information from the leader's in_mem kv store - pub async fn batch_get(&self, keys: Vec>) -> Result> { - if self.is_leader() { - let request = BatchGetRequest { keys }; - - return self - .in_memory - .batch_get(request) - .await - .map(|resp| resp.kvs) - .context(error::KvBackendSnafu); - } - - let max_retry_count = self.max_retry_count; - let retry_interval_ms = self.retry_interval_ms; - - for _ in 0..max_retry_count { - match self.remote_batch_get(keys.clone()).await { - Ok(kvs) => return Ok(kvs), - Err(e) => { - if need_retry(&e) { - warn!("Encountered an error that need to retry, err: {:?}", e); - tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await; - } else { - return Err(e); - } - } - } - } - - error::ExceededRetryLimitSnafu { - func_name: "batch_get", - retry_num: max_retry_count, - } - .fail() + Ok(RangeResponse { + kvs: response.kvs.into_iter().map(KeyValue::new).collect(), + more: response.more, + }) } - async fn remote_batch_get(&self, keys: Vec>) -> Result> { + async fn remote_batch_get(&self, keys: Vec>) -> Result { // Safety: when self.is_leader() == false, election must not empty. let election = self.election.as_ref().unwrap(); @@ -234,7 +293,9 @@ impl MetaPeerClient { check_resp_header(&response.header, Context { addr: &leader_addr })?; - Ok(response.kvs.into_iter().map(KeyValue::new).collect()) + Ok(BatchGetResponse { + kvs: response.kvs.into_iter().map(KeyValue::new).collect(), + }) } // Check if the meta node is a leader node. diff --git a/src/meta-srv/src/lease.rs b/src/meta-srv/src/lease.rs index 2fa3224e35b4..0a091e4a683b 100644 --- a/src/meta-srv/src/lease.rs +++ b/src/meta-srv/src/lease.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; +use common_meta::kv_backend::KvBackend; use common_meta::peer::Peer; use common_meta::{util, ClusterId}; use common_time::util as time_util; @@ -39,7 +40,8 @@ pub async fn lookup_alive_datanode_peer( cluster_id, node_id: datanode_id, }; - let Some(kv) = meta_peer_client.get(lease_key.clone().try_into()?).await? else { + let lease_key_bytes: Vec = lease_key.clone().try_into()?; + let Some(kv) = meta_peer_client.get(&lease_key_bytes).await? else { return Ok(None); }; let lease_value: LeaseValue = kv.value.try_into()?; @@ -74,7 +76,13 @@ where let key = get_lease_prefix(cluster_id); let range_end = util::get_prefix_end_key(&key); - let kvs = meta_peer_client.range(key, range_end, false).await?; + let range_req = common_meta::rpc::store::RangeRequest { + key, + range_end, + keys_only: false, + ..Default::default() + }; + let kvs = meta_peer_client.range(range_req).await?.kvs; let mut lease_kvs = HashMap::new(); for kv in kvs { let lease_key: LeaseKey = kv.key.try_into()?;