Skip to content

Commit

Permalink
chore: impl KvBackend for MetaPeerClient (#3076)
Browse files Browse the repository at this point in the history
  • Loading branch information
lyang24 authored Jan 10, 2024
1 parent 7fad4e8 commit d521bc9
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 97 deletions.
251 changes: 156 additions & 95 deletions src/meta-srv/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -22,8 +23,12 @@ use api::v1::meta::{
RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse, ResponseHeader,
};
use common_grpc::channel_manager::ChannelManager;
use common_meta::kv_backend::ResettableKvBackendRef;
use common_meta::rpc::store::{BatchGetRequest, RangeRequest};
use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef, TxnService};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
use common_meta::rpc::KeyValue;
use common_meta::util;
use common_telemetry::warn;
Expand All @@ -49,78 +54,73 @@ pub struct MetaPeerClient {
retry_interval_ms: u64,
}

impl MetaPeerClient {
async fn get_dn_key_value(&self, keys_only: bool) -> Result<Vec<KeyValue>> {
let key = format!("{DN_STAT_PREFIX}-").into_bytes();
let range_end = util::get_prefix_end_key(&key);
self.range(key, range_end, keys_only).await
}
#[async_trait::async_trait]
impl TxnService for MetaPeerClient {
type Error = error::Error;
}

// Get all datanode stat kvs from leader meta.
pub async fn get_all_dn_stat_kvs(&self) -> Result<HashMap<StatKey, StatValue>> {
let kvs = self.get_dn_key_value(false).await?;
to_stat_kv_map(kvs)
#[async_trait::async_trait]
impl KvBackend for MetaPeerClient {
fn name(&self) -> &str {
"MetaPeerClient"
}

pub async fn get_node_cnt(&self) -> Result<i32> {
let kvs = self.get_dn_key_value(true).await?;
kvs.into_iter()
.map(|kv| kv.key.try_into())
.collect::<Result<HashSet<StatKey>>>()
.map(|hash_set| hash_set.len() as i32)
fn as_any(&self) -> &dyn Any {
self
}

// Get datanode stat kvs from leader meta by input keys.
pub async fn get_dn_stat_kvs(&self, keys: Vec<StatKey>) -> Result<HashMap<StatKey, StatValue>> {
let stat_keys = keys.into_iter().map(|key| key.into()).collect();
async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
if self.is_leader() {
return self
.in_memory
.range(req)
.await
.context(error::KvBackendSnafu);
}

let kvs = self.batch_get(stat_keys).await?;
let max_retry_count = self.max_retry_count;
let retry_interval_ms = self.retry_interval_ms;

to_stat_kv_map(kvs)
}
for _ in 0..max_retry_count {
match self
.remote_range(req.key.clone(), req.range_end.clone(), req.keys_only)
.await
{
Ok(res) => return Ok(res),
Err(e) => {
if need_retry(&e) {
warn!("Encountered an error that need to retry, err: {:?}", e);
tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await;
} else {
return Err(e);
}
}
}
}

// Get kv information from the leader's in_mem kv store.
pub async fn get(&self, key: Vec<u8>) -> Result<Option<KeyValue>> {
let mut kvs = self.range(key, vec![], false).await?;
Ok(if kvs.is_empty() {
None
} else {
debug_assert_eq!(kvs.len(), 1);
Some(kvs.remove(0))
})
error::ExceededRetryLimitSnafu {
func_name: "range",
retry_num: max_retry_count,
}
.fail()
}

// Range kv information from the leader's in_mem kv store
pub async fn range(
&self,
key: Vec<u8>,
range_end: Vec<u8>,
keys_only: bool,
) -> Result<Vec<KeyValue>> {
// Get kv information from the leader's in_mem kv store
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
if self.is_leader() {
let request = RangeRequest {
key,
range_end,
..Default::default()
};

return self
.in_memory
.range(request)
.batch_get(req)
.await
.map(|resp| resp.kvs)
.context(error::KvBackendSnafu);
}

let max_retry_count = self.max_retry_count;
let retry_interval_ms = self.retry_interval_ms;

for _ in 0..max_retry_count {
match self
.remote_range(key.clone(), range_end.clone(), keys_only)
.await
{
Ok(kvs) => return Ok(kvs),
match self.remote_batch_get(req.keys.clone()).await {
Ok(res) => return Ok(res),
Err(e) => {
if need_retry(&e) {
warn!("Encountered an error that need to retry, err: {:?}", e);
Expand All @@ -133,18 +133,111 @@ impl MetaPeerClient {
}

error::ExceededRetryLimitSnafu {
func_name: "range",
func_name: "batch_get",
retry_num: max_retry_count,
}
.fail()
}

// MetaPeerClient does not support mutable methods listed below.
async fn put(&self, _req: PutRequest) -> Result<PutResponse> {
error::UnsupportedSnafu {
operation: "put".to_string(),
}
.fail()
}

async fn batch_put(&self, _req: BatchPutRequest) -> Result<BatchPutResponse> {
error::UnsupportedSnafu {
operation: "batch put".to_string(),
}
.fail()
}

async fn compare_and_put(&self, _req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
error::UnsupportedSnafu {
operation: "compare and put".to_string(),
}
.fail()
}

async fn delete_range(&self, _req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
error::UnsupportedSnafu {
operation: "delete range".to_string(),
}
.fail()
}

async fn batch_delete(&self, _req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
error::UnsupportedSnafu {
operation: "batch delete".to_string(),
}
.fail()
}

async fn delete(&self, _key: &[u8], _prev_kv: bool) -> Result<Option<KeyValue>> {
error::UnsupportedSnafu {
operation: "delete".to_string(),
}
.fail()
}

async fn put_conditionally(
&self,
_key: Vec<u8>,
_value: Vec<u8>,
_if_not_exists: bool,
) -> Result<bool> {
error::UnsupportedSnafu {
operation: "put conditionally".to_string(),
}
.fail()
}
}

impl MetaPeerClient {
async fn get_dn_key_value(&self, keys_only: bool) -> Result<Vec<KeyValue>> {
let key = format!("{DN_STAT_PREFIX}-").into_bytes();
let range_end = util::get_prefix_end_key(&key);
let range_request = RangeRequest {
key,
range_end,
keys_only,
..Default::default()
};
self.range(range_request).await.map(|res| res.kvs)
}

// Get all datanode stat kvs from leader meta.
pub async fn get_all_dn_stat_kvs(&self) -> Result<HashMap<StatKey, StatValue>> {
let kvs = self.get_dn_key_value(false).await?;
to_stat_kv_map(kvs)
}

pub async fn get_node_cnt(&self) -> Result<i32> {
let kvs = self.get_dn_key_value(true).await?;
kvs.into_iter()
.map(|kv| kv.key.try_into())
.collect::<Result<HashSet<StatKey>>>()
.map(|hash_set| hash_set.len() as i32)
}

// Get datanode stat kvs from leader meta by input keys.
pub async fn get_dn_stat_kvs(&self, keys: Vec<StatKey>) -> Result<HashMap<StatKey, StatValue>> {
let stat_keys = keys.into_iter().map(|key| key.into()).collect();
let batch_get_req = BatchGetRequest { keys: stat_keys };

let res = self.batch_get(batch_get_req).await?;

to_stat_kv_map(res.kvs)
}

async fn remote_range(
&self,
key: Vec<u8>,
range_end: Vec<u8>,
keys_only: bool,
) -> Result<Vec<KeyValue>> {
) -> Result<RangeResponse> {
// Safety: when self.is_leader() == false, election must not empty.
let election = self.election.as_ref().unwrap();

Expand All @@ -170,47 +263,13 @@ impl MetaPeerClient {

check_resp_header(&response.header, Context { addr: &leader_addr })?;

Ok(response.kvs.into_iter().map(KeyValue::new).collect())
}

// Get kv information from the leader's in_mem kv store
pub async fn batch_get(&self, keys: Vec<Vec<u8>>) -> Result<Vec<KeyValue>> {
if self.is_leader() {
let request = BatchGetRequest { keys };

return self
.in_memory
.batch_get(request)
.await
.map(|resp| resp.kvs)
.context(error::KvBackendSnafu);
}

let max_retry_count = self.max_retry_count;
let retry_interval_ms = self.retry_interval_ms;

for _ in 0..max_retry_count {
match self.remote_batch_get(keys.clone()).await {
Ok(kvs) => return Ok(kvs),
Err(e) => {
if need_retry(&e) {
warn!("Encountered an error that need to retry, err: {:?}", e);
tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await;
} else {
return Err(e);
}
}
}
}

error::ExceededRetryLimitSnafu {
func_name: "batch_get",
retry_num: max_retry_count,
}
.fail()
Ok(RangeResponse {
kvs: response.kvs.into_iter().map(KeyValue::new).collect(),
more: response.more,
})
}

async fn remote_batch_get(&self, keys: Vec<Vec<u8>>) -> Result<Vec<KeyValue>> {
async fn remote_batch_get(&self, keys: Vec<Vec<u8>>) -> Result<BatchGetResponse> {
// Safety: when self.is_leader() == false, election must not empty.
let election = self.election.as_ref().unwrap();

Expand All @@ -234,7 +293,9 @@ impl MetaPeerClient {

check_resp_header(&response.header, Context { addr: &leader_addr })?;

Ok(response.kvs.into_iter().map(KeyValue::new).collect())
Ok(BatchGetResponse {
kvs: response.kvs.into_iter().map(KeyValue::new).collect(),
})
}

// Check if the meta node is a leader node.
Expand Down
12 changes: 10 additions & 2 deletions src/meta-srv/src/lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::HashMap;

use common_meta::kv_backend::KvBackend;
use common_meta::peer::Peer;
use common_meta::{util, ClusterId};
use common_time::util as time_util;
Expand All @@ -39,7 +40,8 @@ pub async fn lookup_alive_datanode_peer(
cluster_id,
node_id: datanode_id,
};
let Some(kv) = meta_peer_client.get(lease_key.clone().try_into()?).await? else {
let lease_key_bytes: Vec<u8> = lease_key.clone().try_into()?;
let Some(kv) = meta_peer_client.get(&lease_key_bytes).await? else {
return Ok(None);
};
let lease_value: LeaseValue = kv.value.try_into()?;
Expand Down Expand Up @@ -74,7 +76,13 @@ where
let key = get_lease_prefix(cluster_id);
let range_end = util::get_prefix_end_key(&key);

let kvs = meta_peer_client.range(key, range_end, false).await?;
let range_req = common_meta::rpc::store::RangeRequest {
key,
range_end,
keys_only: false,
..Default::default()
};
let kvs = meta_peer_client.range(range_req).await?.kvs;
let mut lease_kvs = HashMap::new();
for kv in kvs {
let lease_key: LeaseKey = kv.key.try_into()?;
Expand Down

0 comments on commit d521bc9

Please sign in to comment.