Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: impl KvBackend for MetaPeerClient #3076

Merged
merged 1 commit into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 156 additions & 95 deletions src/meta-srv/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -22,8 +23,12 @@ use api::v1::meta::{
RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse, ResponseHeader,
};
use common_grpc::channel_manager::ChannelManager;
use common_meta::kv_backend::ResettableKvBackendRef;
use common_meta::rpc::store::{BatchGetRequest, RangeRequest};
use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef, TxnService};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
use common_meta::rpc::KeyValue;
use common_meta::util;
use common_telemetry::warn;
Expand All @@ -49,78 +54,73 @@ pub struct MetaPeerClient {
retry_interval_ms: u64,
}

impl MetaPeerClient {
async fn get_dn_key_value(&self, keys_only: bool) -> Result<Vec<KeyValue>> {
let key = format!("{DN_STAT_PREFIX}-").into_bytes();
let range_end = util::get_prefix_end_key(&key);
self.range(key, range_end, keys_only).await
}
#[async_trait::async_trait]
impl TxnService for MetaPeerClient {
type Error = error::Error;
}

// Get all datanode stat kvs from leader meta.
pub async fn get_all_dn_stat_kvs(&self) -> Result<HashMap<StatKey, StatValue>> {
let kvs = self.get_dn_key_value(false).await?;
to_stat_kv_map(kvs)
#[async_trait::async_trait]
impl KvBackend for MetaPeerClient {
fn name(&self) -> &str {
"MetaPeerClient"
}

pub async fn get_node_cnt(&self) -> Result<i32> {
let kvs = self.get_dn_key_value(true).await?;
kvs.into_iter()
.map(|kv| kv.key.try_into())
.collect::<Result<HashSet<StatKey>>>()
.map(|hash_set| hash_set.len() as i32)
fn as_any(&self) -> &dyn Any {
self
}

// Get datanode stat kvs from leader meta by input keys.
pub async fn get_dn_stat_kvs(&self, keys: Vec<StatKey>) -> Result<HashMap<StatKey, StatValue>> {
let stat_keys = keys.into_iter().map(|key| key.into()).collect();
async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
if self.is_leader() {
return self
.in_memory
.range(req)
.await
.context(error::KvBackendSnafu);
}

let kvs = self.batch_get(stat_keys).await?;
let max_retry_count = self.max_retry_count;
let retry_interval_ms = self.retry_interval_ms;

to_stat_kv_map(kvs)
}
for _ in 0..max_retry_count {
match self
.remote_range(req.key.clone(), req.range_end.clone(), req.keys_only)
.await
{
Ok(res) => return Ok(res),
Err(e) => {
if need_retry(&e) {
warn!("Encountered an error that need to retry, err: {:?}", e);
tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await;
} else {
return Err(e);
}
}
}
}

// Get kv information from the leader's in_mem kv store.
pub async fn get(&self, key: Vec<u8>) -> Result<Option<KeyValue>> {
let mut kvs = self.range(key, vec![], false).await?;
Ok(if kvs.is_empty() {
None
} else {
debug_assert_eq!(kvs.len(), 1);
Some(kvs.remove(0))
})
error::ExceededRetryLimitSnafu {
func_name: "range",
retry_num: max_retry_count,
}
.fail()
}

// Range kv information from the leader's in_mem kv store
pub async fn range(
&self,
key: Vec<u8>,
range_end: Vec<u8>,
keys_only: bool,
) -> Result<Vec<KeyValue>> {
// Get kv information from the leader's in_mem kv store
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
if self.is_leader() {
let request = RangeRequest {
key,
range_end,
..Default::default()
};

return self
.in_memory
.range(request)
.batch_get(req)
.await
.map(|resp| resp.kvs)
.context(error::KvBackendSnafu);
}

let max_retry_count = self.max_retry_count;
let retry_interval_ms = self.retry_interval_ms;

for _ in 0..max_retry_count {
match self
.remote_range(key.clone(), range_end.clone(), keys_only)
.await
{
Ok(kvs) => return Ok(kvs),
match self.remote_batch_get(req.keys.clone()).await {
Ok(res) => return Ok(res),
Err(e) => {
if need_retry(&e) {
warn!("Encountered an error that need to retry, err: {:?}", e);
Expand All @@ -133,18 +133,111 @@ impl MetaPeerClient {
}

error::ExceededRetryLimitSnafu {
func_name: "range",
func_name: "batch_get",
retry_num: max_retry_count,
}
.fail()
}

// MetaPeerClient does not support mutable methods listed below.
lyang24 marked this conversation as resolved.
Show resolved Hide resolved
async fn put(&self, _req: PutRequest) -> Result<PutResponse> {
error::UnsupportedSnafu {
operation: "put".to_string(),
}
.fail()
}

async fn batch_put(&self, _req: BatchPutRequest) -> Result<BatchPutResponse> {
error::UnsupportedSnafu {
operation: "batch put".to_string(),
}
.fail()
}

async fn compare_and_put(&self, _req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
error::UnsupportedSnafu {
operation: "compare and put".to_string(),
}
.fail()
}

async fn delete_range(&self, _req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
error::UnsupportedSnafu {
operation: "delete range".to_string(),
}
.fail()
}

async fn batch_delete(&self, _req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
error::UnsupportedSnafu {
operation: "batch delete".to_string(),
}
.fail()
}

async fn delete(&self, _key: &[u8], _prev_kv: bool) -> Result<Option<KeyValue>> {
error::UnsupportedSnafu {
operation: "delete".to_string(),
}
.fail()
}

async fn put_conditionally(
&self,
_key: Vec<u8>,
_value: Vec<u8>,
_if_not_exists: bool,
) -> Result<bool> {
error::UnsupportedSnafu {
operation: "put conditionally".to_string(),
}
.fail()
}
}

impl MetaPeerClient {
async fn get_dn_key_value(&self, keys_only: bool) -> Result<Vec<KeyValue>> {
let key = format!("{DN_STAT_PREFIX}-").into_bytes();
let range_end = util::get_prefix_end_key(&key);
let range_request = RangeRequest {
key,
range_end,
keys_only,
..Default::default()
};
self.range(range_request).await.map(|res| res.kvs)
}

// Get all datanode stat kvs from leader meta.
pub async fn get_all_dn_stat_kvs(&self) -> Result<HashMap<StatKey, StatValue>> {
let kvs = self.get_dn_key_value(false).await?;
to_stat_kv_map(kvs)
}

pub async fn get_node_cnt(&self) -> Result<i32> {
let kvs = self.get_dn_key_value(true).await?;
kvs.into_iter()
.map(|kv| kv.key.try_into())
.collect::<Result<HashSet<StatKey>>>()
.map(|hash_set| hash_set.len() as i32)
}

// Get datanode stat kvs from leader meta by input keys.
pub async fn get_dn_stat_kvs(&self, keys: Vec<StatKey>) -> Result<HashMap<StatKey, StatValue>> {
let stat_keys = keys.into_iter().map(|key| key.into()).collect();
let batch_get_req = BatchGetRequest { keys: stat_keys };

let res = self.batch_get(batch_get_req).await?;

to_stat_kv_map(res.kvs)
}

async fn remote_range(
&self,
key: Vec<u8>,
range_end: Vec<u8>,
keys_only: bool,
) -> Result<Vec<KeyValue>> {
) -> Result<RangeResponse> {
// Safety: when self.is_leader() == false, election must not empty.
let election = self.election.as_ref().unwrap();

Expand All @@ -170,47 +263,13 @@ impl MetaPeerClient {

check_resp_header(&response.header, Context { addr: &leader_addr })?;

Ok(response.kvs.into_iter().map(KeyValue::new).collect())
}

// Get kv information from the leader's in_mem kv store
pub async fn batch_get(&self, keys: Vec<Vec<u8>>) -> Result<Vec<KeyValue>> {
if self.is_leader() {
let request = BatchGetRequest { keys };

return self
.in_memory
.batch_get(request)
.await
.map(|resp| resp.kvs)
.context(error::KvBackendSnafu);
}

let max_retry_count = self.max_retry_count;
let retry_interval_ms = self.retry_interval_ms;

for _ in 0..max_retry_count {
match self.remote_batch_get(keys.clone()).await {
Ok(kvs) => return Ok(kvs),
Err(e) => {
if need_retry(&e) {
warn!("Encountered an error that need to retry, err: {:?}", e);
tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await;
} else {
return Err(e);
}
}
}
}

error::ExceededRetryLimitSnafu {
func_name: "batch_get",
retry_num: max_retry_count,
}
.fail()
Ok(RangeResponse {
kvs: response.kvs.into_iter().map(KeyValue::new).collect(),
more: response.more,
})
}

async fn remote_batch_get(&self, keys: Vec<Vec<u8>>) -> Result<Vec<KeyValue>> {
async fn remote_batch_get(&self, keys: Vec<Vec<u8>>) -> Result<BatchGetResponse> {
// Safety: when self.is_leader() == false, election must not empty.
let election = self.election.as_ref().unwrap();

Expand All @@ -234,7 +293,9 @@ impl MetaPeerClient {

check_resp_header(&response.header, Context { addr: &leader_addr })?;

Ok(response.kvs.into_iter().map(KeyValue::new).collect())
Ok(BatchGetResponse {
kvs: response.kvs.into_iter().map(KeyValue::new).collect(),
})
}

// Check if the meta node is a leader node.
Expand Down
12 changes: 10 additions & 2 deletions src/meta-srv/src/lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::HashMap;

use common_meta::kv_backend::KvBackend;
use common_meta::peer::Peer;
use common_meta::{util, ClusterId};
use common_time::util as time_util;
Expand All @@ -39,7 +40,8 @@ pub async fn lookup_alive_datanode_peer(
cluster_id,
node_id: datanode_id,
};
let Some(kv) = meta_peer_client.get(lease_key.clone().try_into()?).await? else {
let lease_key_bytes: Vec<u8> = lease_key.clone().try_into()?;
let Some(kv) = meta_peer_client.get(&lease_key_bytes).await? else {
return Ok(None);
};
let lease_value: LeaseValue = kv.value.try_into()?;
Expand Down Expand Up @@ -74,7 +76,13 @@ where
let key = get_lease_prefix(cluster_id);
let range_end = util::get_prefix_end_key(&key);

let kvs = meta_peer_client.range(key, range_end, false).await?;
let range_req = common_meta::rpc::store::RangeRequest {
key,
range_end,
keys_only: false,
..Default::default()
};
let kvs = meta_peer_client.range(range_req).await?.kvs;
let mut lease_kvs = HashMap::new();
for kv in kvs {
let lease_key: LeaseKey = kv.key.try_into()?;
Expand Down
Loading