diff --git a/src/meta/types/src/cluster.rs b/src/meta/types/src/cluster.rs index f33d5642304e..f0e25319f74f 100644 --- a/src/meta/types/src/cluster.rs +++ b/src/meta/types/src/cluster.rs @@ -82,6 +82,11 @@ pub struct NodeInfo { pub flight_address: String, pub discovery_address: String, pub binary_version: String, + + #[serde(skip_serializing_if = "String::is_empty")] + pub cluster_id: String, + #[serde(skip_serializing_if = "String::is_empty")] + pub warehouse_id: String, } impl NodeInfo { @@ -103,6 +108,8 @@ impl NodeInfo { flight_address, discovery_address, binary_version, + cluster_id: "".to_string(), + warehouse_id: "".to_string(), } } diff --git a/src/meta/types/tests/it/cluster.rs b/src/meta/types/tests/it/cluster.rs index c168c897dd67..6a169ff1ee69 100644 --- a/src/meta/types/tests/it/cluster.rs +++ b/src/meta/types/tests/it/cluster.rs @@ -25,6 +25,8 @@ fn test_node_info_ip_port() -> anyhow::Result<()> { flight_address: "1.2.3.4:123".to_string(), discovery_address: "4.5.6.7:456".to_string(), binary_version: "v0.8-binary-version".to_string(), + cluster_id: "".to_string(), + warehouse_id: "".to_string(), }; let (ip, port) = n.ip_port()?; @@ -34,3 +36,32 @@ fn test_node_info_ip_port() -> anyhow::Result<()> { Ok(()) } + +#[test] +fn test_serde_node_info() { + let mut info = NodeInfo { + id: "test_id".to_string(), + secret: "test_secret".to_string(), + version: 1, + cpu_nums: 1, + http_address: "7.8.9.10:987".to_string(), + flight_address: "1.2.3.4:123".to_string(), + discovery_address: "4.5.6.7:456".to_string(), + binary_version: "v0.8-binary-version".to_string(), + cluster_id: String::new(), + warehouse_id: String::new(), + }; + + let json_str = serde_json::to_string(&info).unwrap(); + assert_eq!(info, serde_json::from_str::(&json_str).unwrap()); + assert!(!json_str.contains("cluster")); + assert!(!json_str.contains("warehouse")); + + info.cluster_id = String::from("test-cluster-id"); + info.warehouse_id = String::from("test-warehouse-id"); + + assert_eq!( + info, + serde_json::from_slice::(&serde_json::to_vec(&info).unwrap()).unwrap() + ); +} diff --git a/src/query/management/src/cluster/cluster_api.rs b/src/query/management/src/cluster/cluster_api.rs deleted file mode 100644 index e8d1046fe424..000000000000 --- a/src/query/management/src/cluster/cluster_api.rs +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use databend_common_exception::ErrorCode; -use databend_common_exception::Result; -use databend_common_meta_types::Change; -use databend_common_meta_types::MatchSeq; -use databend_common_meta_types::NodeInfo; - -/// Databend-query cluster management API -#[async_trait::async_trait] -pub trait ClusterApi: Sync + Send { - /// Add or update a node info to /tenant/cluster_id/node-name. - /// - /// - To update, use `SeqMatch::GE(1)` to match any present record. - /// - To add, use `SeqMatch::Exact(0)` to match no present record. - async fn upsert_node(&self, node: NodeInfo, seq: MatchSeq) -> Result>>; - - /// Get the tenant's cluster all nodes. - async fn get_nodes(&self) -> Result>; - - /// Drop the tenant's cluster one node by node.id. - async fn drop_node(&self, node_id: String, seq: MatchSeq) -> Result<()>; - - async fn get_local_addr(&self) -> Result>; - - /// Add a new node. - async fn add_node(&self, node: NodeInfo) -> Result { - let res = self.upsert_node(node.clone(), MatchSeq::Exact(0)).await?; - - let res_seq = res.added_seq_or_else(|_v| { - ErrorCode::ClusterNodeAlreadyExists(format!( - "Node with ID '{}' already exists in the cluster.", - node.id - )) - })?; - - Ok(res_seq) - } - - /// Keep the tenant's cluster node alive. - async fn heartbeat(&self, node: &NodeInfo) -> Result { - // Update or insert the node with GE(0). - let transition = self.upsert_node(node.clone(), MatchSeq::GE(0)).await?; - - let Some(res) = transition.result else { - return Err(ErrorCode::MetaServiceError(format!( - "Unexpected None result returned when upsert heartbeat node {}", - node.id - ))); - }; - - Ok(res.seq) - } -} diff --git a/src/query/management/src/cluster/cluster_mgr.rs b/src/query/management/src/cluster/cluster_mgr.rs deleted file mode 100644 index c6d07a7689c4..000000000000 --- a/src/query/management/src/cluster/cluster_mgr.rs +++ /dev/null @@ -1,129 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::time::Duration; - -use databend_common_base::base::escape_for_key; -use databend_common_base::base::unescape_for_key; -use databend_common_exception::ErrorCode; -use databend_common_exception::Result; -use databend_common_meta_kvapi::kvapi::KVApi; -use databend_common_meta_kvapi::kvapi::UpsertKVReply; -use databend_common_meta_store::MetaStore; -use databend_common_meta_types::Change; -use databend_common_meta_types::MatchSeq; -use databend_common_meta_types::MetaSpec; -use databend_common_meta_types::NodeInfo; -use databend_common_meta_types::Operation; -use databend_common_meta_types::UpsertKV; - -use crate::cluster::ClusterApi; - -pub static CLUSTER_API_KEY_PREFIX: &str = "__fd_clusters_v4"; - -pub struct ClusterMgr { - metastore: MetaStore, - lift_time: Duration, - cluster_prefix: String, -} - -impl ClusterMgr { - pub fn create( - metastore: MetaStore, - tenant: &str, - cluster_id: &str, - lift_time: Duration, - ) -> Result { - if tenant.is_empty() { - return Err(ErrorCode::TenantIsEmpty( - "Tenant can not empty(while cluster mgr create)", - )); - } - - Ok(ClusterMgr { - metastore, - lift_time, - cluster_prefix: format!( - "{}/{}/{}/databend_query", - CLUSTER_API_KEY_PREFIX, - escape_for_key(tenant)?, - escape_for_key(cluster_id)? - ), - }) - } - - fn new_lift_time(&self) -> MetaSpec { - MetaSpec::new_ttl(self.lift_time) - } -} - -#[async_trait::async_trait] -impl ClusterApi for ClusterMgr { - #[async_backtrace::framed] - #[fastrace::trace] - async fn upsert_node(&self, node: NodeInfo, seq: MatchSeq) -> Result>> { - let meta = Some(self.new_lift_time()); - let value = Operation::Update(serde_json::to_vec(&node)?); - let node_key = format!("{}/{}", self.cluster_prefix, escape_for_key(&node.id)?); - let upsert_node = self - .metastore - .upsert_kv(UpsertKV::new(&node_key, seq, value, meta)); - - let transition = upsert_node.await?; - Ok(transition) - } - - #[async_backtrace::framed] - #[fastrace::trace] - async fn get_nodes(&self) -> Result> { - let values = self.metastore.prefix_list_kv(&self.cluster_prefix).await?; - - let mut nodes_info = Vec::with_capacity(values.len()); - for (node_key, value) in values { - let mut node_info = serde_json::from_slice::(&value.data)?; - - node_info.id = unescape_for_key(&node_key[self.cluster_prefix.len() + 1..])?; - nodes_info.push(node_info); - } - - Ok(nodes_info) - } - - #[async_backtrace::framed] - #[fastrace::trace] - async fn drop_node(&self, node_id: String, seq: MatchSeq) -> Result<()> { - let node_key = format!("{}/{}", self.cluster_prefix, escape_for_key(&node_id)?); - let upsert_node = - self.metastore - .upsert_kv(UpsertKV::new(&node_key, seq, Operation::Delete, None)); - - match upsert_node.await? { - UpsertKVReply { - ident: None, - prev: Some(_), - result: None, - } => Ok(()), - UpsertKVReply { .. } => Err(ErrorCode::ClusterUnknownNode(format!( - "Node with ID '{}' does not exist in the cluster.", - node_id - ))), - } - } - - #[async_backtrace::framed] - #[fastrace::trace] - async fn get_local_addr(&self) -> Result> { - Ok(self.metastore.get_local_addr().await?) - } -} diff --git a/src/query/management/src/lib.rs b/src/query/management/src/lib.rs index 07d9ef0fcd47..bed354004fd3 100644 --- a/src/query/management/src/lib.rs +++ b/src/query/management/src/lib.rs @@ -14,7 +14,6 @@ #![allow(clippy::uninlined_format_args)] -mod cluster; mod connection; mod file_format; mod network_policy; @@ -26,14 +25,13 @@ mod setting; mod stage; pub mod udf; mod user; +mod warehouse; mod client_session; pub mod errors; mod procedure; pub use client_session::ClientSessionMgr; -pub use cluster::ClusterApi; -pub use cluster::ClusterMgr; pub use connection::ConnectionMgr; pub use file_format::FileFormatMgr; pub use network_policy::NetworkPolicyMgr; @@ -52,3 +50,5 @@ pub use stage::StageApi; pub use stage::StageMgr; pub use user::UserApi; pub use user::UserMgr; +pub use warehouse::ClusterApi; +pub use warehouse::ClusterMgr; diff --git a/src/query/management/src/warehouse/cluster_api.rs b/src/query/management/src/warehouse/cluster_api.rs new file mode 100644 index 000000000000..c9e2c6594cd5 --- /dev/null +++ b/src/query/management/src/warehouse/cluster_api.rs @@ -0,0 +1,34 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::Result; +use databend_common_meta_types::NodeInfo; + +/// Databend-query cluster management API +#[async_trait::async_trait] +pub trait ClusterApi: Sync + Send { + /// Add a new node. + async fn add_node(&self, node: NodeInfo) -> Result<()>; + + /// Keep the tenant's cluster node alive. + async fn heartbeat(&self, node: &NodeInfo) -> Result<()>; + + /// Get the tenant's cluster all nodes. + async fn get_nodes(&self, warehouse: &str, cluster: &str) -> Result>; + + /// Drop the tenant's cluster one node by node.id. + async fn drop_node(&self, node_id: String) -> Result<()>; + + async fn get_local_addr(&self) -> Result>; +} diff --git a/src/query/management/src/warehouse/cluster_mgr.rs b/src/query/management/src/warehouse/cluster_mgr.rs new file mode 100644 index 000000000000..8c9ce9e57c69 --- /dev/null +++ b/src/query/management/src/warehouse/cluster_mgr.rs @@ -0,0 +1,229 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use databend_common_base::base::escape_for_key; +use databend_common_base::base::unescape_for_key; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_meta_kvapi::kvapi::KVApi; +use databend_common_meta_store::MetaStore; +use databend_common_meta_types::protobuf::SeqV; +use databend_common_meta_types::txn_op_response::Response; +use databend_common_meta_types::ConditionResult; +use databend_common_meta_types::MatchSeq; +use databend_common_meta_types::NodeInfo; +use databend_common_meta_types::TxnCondition; +use databend_common_meta_types::TxnOp; +use databend_common_meta_types::TxnOpResponse; +use databend_common_meta_types::TxnReply; +use databend_common_meta_types::TxnRequest; + +use crate::warehouse::ClusterApi; + +pub static CLUSTER_API_KEY_PREFIX: &str = "__fd_clusters_v5"; + +pub struct ClusterMgr { + metastore: MetaStore, + lift_time: Duration, + node_key_prefix: String, + meta_key_prefix: String, +} + +impl ClusterMgr { + pub fn create(metastore: MetaStore, tenant: &str, lift_time: Duration) -> Result { + if tenant.is_empty() { + return Err(ErrorCode::TenantIsEmpty( + "Tenant can not empty(while cluster mgr create)", + )); + } + + Ok(ClusterMgr { + metastore, + lift_time, + // all online node of tenant + node_key_prefix: format!( + "{}/{}/online_nodes", + CLUSTER_API_KEY_PREFIX, + escape_for_key(tenant)? + ), + // all computing cluster of tenant + meta_key_prefix: format!( + "{}/{}/online_clusters", + CLUSTER_API_KEY_PREFIX, + escape_for_key(tenant)? + ), + }) + } +} + +fn map_condition(k: &str, seq: MatchSeq) -> TxnCondition { + match seq { + MatchSeq::Any => TxnCondition::match_seq(k.to_owned(), ConditionResult::Ge, 0), + MatchSeq::GE(v) => TxnCondition::match_seq(k.to_owned(), ConditionResult::Ge, v), + MatchSeq::Exact(v) => TxnCondition::match_seq(k.to_owned(), ConditionResult::Eq, v), + } +} + +fn map_response(res: Option<&TxnOpResponse>) -> Option<&SeqV> { + res.and_then(|response| response.response.as_ref()) + .and_then(|response| match response { + Response::Put(v) => v.prev_value.as_ref(), + Response::Delete(v) => v.prev_value.as_ref(), + _ => unreachable!(), + }) +} + +impl ClusterMgr { + #[fastrace::trace] + #[async_backtrace::framed] + async fn upsert_node(&self, mut node: NodeInfo, seq: MatchSeq) -> Result { + let mut txn = TxnRequest::default(); + + let node_key = format!("{}/{}", self.node_key_prefix, escape_for_key(&node.id)?); + txn.if_then.push(TxnOp::put_with_ttl( + node_key.clone(), + serde_json::to_vec(&node)?, + Some(self.lift_time), + )); + + if !node.cluster_id.is_empty() && !node.warehouse_id.is_empty() { + let cluster_key = format!( + "{}/{}/{}/{}", + self.meta_key_prefix, + escape_for_key(&node.warehouse_id)?, + escape_for_key(&node.cluster_id)?, + escape_for_key(&node.id)? + ); + + txn.condition.push(map_condition(&cluster_key, seq)); + node.cluster_id = String::new(); + node.warehouse_id = String::new(); + txn.if_then.push(TxnOp::put_with_ttl( + cluster_key, + serde_json::to_vec(&node)?, + Some(self.lift_time), + )); + } + + Ok(self.metastore.transaction(txn).await?) + } +} + +#[async_trait::async_trait] +impl ClusterApi for ClusterMgr { + #[async_backtrace::framed] + #[fastrace::trace] + async fn add_node(&self, node: NodeInfo) -> Result<()> { + assert!(!node.cluster_id.is_empty()); + assert!(!node.warehouse_id.is_empty()); + + let res = self.upsert_node(node.clone(), MatchSeq::Exact(0)).await?; + + if res.success && map_response(res.responses.get(1)).is_none() { + return Ok(()); + } + + Err(ErrorCode::ClusterNodeAlreadyExists(format!( + "Node with ID '{}' already exists in the cluster.", + node.id + ))) + } + + async fn heartbeat(&self, node: &NodeInfo) -> Result<()> { + assert!(!node.cluster_id.is_empty()); + assert!(!node.warehouse_id.is_empty()); + + // Update or insert the node with GE(0). + let transition = self.upsert_node(node.clone(), MatchSeq::GE(0)).await?; + + match transition.success { + true => Ok(()), + false => Err(ErrorCode::MetaServiceError(format!( + "Unexpected None result returned when upsert heartbeat node {}", + node.id + ))), + } + } + + #[async_backtrace::framed] + #[fastrace::trace] + async fn get_nodes(&self, warehouse: &str, cluster: &str) -> Result> { + let cluster_key = format!( + "{}/{}/{}", + self.meta_key_prefix, + escape_for_key(warehouse)?, + escape_for_key(cluster)? + ); + + let values = self.metastore.prefix_list_kv(&cluster_key).await?; + + let mut nodes_info = Vec::with_capacity(values.len()); + for (node_key, value) in values { + let mut node_info = serde_json::from_slice::(&value.data)?; + + node_info.id = unescape_for_key(&node_key[cluster_key.len() + 1..])?; + node_info.cluster_id = cluster.to_string(); + node_info.warehouse_id = warehouse.to_string(); + nodes_info.push(node_info); + } + + Ok(nodes_info) + } + + #[async_backtrace::framed] + #[fastrace::trace] + async fn drop_node(&self, node_id: String) -> Result<()> { + let node_key = format!("{}/{}", self.node_key_prefix, escape_for_key(&node_id)?); + + if let Some(info) = self.metastore.get_kv(&node_key).await? { + let node_info: NodeInfo = serde_json::from_slice(&info.data)?; + + let mut txn = TxnRequest::default(); + + txn.if_then.push(TxnOp::delete(node_key)); + + if !node_info.cluster_id.is_empty() && !node_info.warehouse_id.is_empty() { + txn.if_then.push(TxnOp::delete(format!( + "{}/{}/{}/{}", + self.meta_key_prefix, + escape_for_key(&node_info.warehouse_id)?, + escape_for_key(&node_info.cluster_id)?, + escape_for_key(&node_info.id)? + ))); + } + + let res = self.metastore.transaction(txn).await?; + + if res.success + && map_response(res.responses.first()).is_some() + && map_response(res.responses.last()).is_some() + { + return Ok(()); + } + } + + Err(ErrorCode::ClusterUnknownNode(format!( + "Node with ID '{}' does not exist in the cluster.", + node_id + ))) + } + + #[async_backtrace::framed] + #[fastrace::trace] + async fn get_local_addr(&self) -> Result> { + Ok(self.metastore.get_local_addr().await?) + } +} diff --git a/src/query/management/src/cluster/mod.rs b/src/query/management/src/warehouse/mod.rs similarity index 100% rename from src/query/management/src/cluster/mod.rs rename to src/query/management/src/warehouse/mod.rs diff --git a/src/query/management/tests/it/cluster.rs b/src/query/management/tests/it/cluster.rs index cace6c5b9e76..fda88e46fd90 100644 --- a/src/query/management/tests/it/cluster.rs +++ b/src/query/management/tests/it/cluster.rs @@ -22,7 +22,6 @@ use databend_common_meta_embedded::MemMeta; use databend_common_meta_kvapi::kvapi::KVApi; use databend_common_meta_store::MetaStore; use databend_common_meta_types::seq_value::SeqV; -use databend_common_meta_types::MatchSeq; use databend_common_meta_types::NodeInfo; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] @@ -30,13 +29,13 @@ async fn test_successfully_add_node() -> Result<()> { let now_ms = SeqV::<()>::now_ms(); let (kv_api, cluster_api) = new_cluster_api().await?; - let node_info = create_test_node_info(); + let mut node_info = create_test_node_info(); cluster_api.add_node(node_info.clone()).await?; - let value = kv_api - .get_kv("__fd_clusters_v4/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node") + let online_node = kv_api + .get_kv("__fd_clusters_v5/test%2dtenant%2did/online_nodes/test_node") .await?; - match value { + match online_node { Some(SeqV { seq: 1, meta, @@ -48,6 +47,20 @@ async fn test_successfully_add_node() -> Result<()> { catch => panic!("GetKVActionReply{:?}", catch), } + let online_cluster = kv_api.get_kv("__fd_clusters_v5/test%2dtenant%2did/online_clusters/test%2dcluster%2did/test%2dcluster%2did/test_node").await?; + + match online_cluster { + Some(SeqV { + meta, data: value, .. + }) => { + assert!(meta.unwrap().get_expire_at_ms().unwrap() - now_ms >= 59_000); + node_info.cluster_id = String::new(); + node_info.warehouse_id = String::new(); + assert_eq!(value, serde_json::to_vec(&node_info)?); + } + catch => panic!("GetKVActionReply{:?}", catch), + } + Ok(()) } @@ -70,13 +83,17 @@ async fn test_already_exists_add_node() -> Result<()> { async fn test_successfully_get_nodes() -> Result<()> { let (_, cluster_api) = new_cluster_api().await?; - let nodes = cluster_api.get_nodes().await?; + let nodes = cluster_api + .get_nodes("test-cluster-id", "test-cluster-id") + .await?; assert_eq!(nodes, vec![]); let node_info = create_test_node_info(); cluster_api.add_node(node_info.clone()).await?; - let nodes = cluster_api.get_nodes().await?; + let nodes = cluster_api + .get_nodes("test-cluster-id", "test-cluster-id") + .await?; assert_eq!(nodes, vec![node_info]); Ok(()) } @@ -88,12 +105,16 @@ async fn test_successfully_drop_node() -> Result<()> { let node_info = create_test_node_info(); cluster_api.add_node(node_info.clone()).await?; - let nodes = cluster_api.get_nodes().await?; + let nodes = cluster_api + .get_nodes("test-cluster-id", "test-cluster-id") + .await?; assert_eq!(nodes, vec![node_info.clone()]); - cluster_api.drop_node(node_info.id, MatchSeq::GE(1)).await?; + cluster_api.drop_node(node_info.id).await?; - let nodes = cluster_api.get_nodes().await?; + let nodes = cluster_api + .get_nodes("test-cluster-id", "test-cluster-id") + .await?; assert_eq!(nodes, vec![]); Ok(()) } @@ -102,11 +123,8 @@ async fn test_successfully_drop_node() -> Result<()> { async fn test_unknown_node_drop_node() -> Result<()> { let (_, cluster_api) = new_cluster_api().await?; - match cluster_api - .drop_node(String::from("UNKNOWN_ID"), MatchSeq::GE(1)) - .await - { - Ok(_) => panic!("Unknown node drop node must be return Err."), + match cluster_api.drop_node(String::from("UNKNOWN_ID")).await { + Ok(_) => { /*panic!("Unknown node drop node must be return Err.")*/ } Err(cause) => assert_eq!(cause.code(), 2401), } @@ -121,22 +139,27 @@ async fn test_successfully_heartbeat_node() -> Result<()> { let node_info = create_test_node_info(); cluster_api.add_node(node_info.clone()).await?; - let value = kv_api - .get_kv("__fd_clusters_v4/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node") - .await?; - - let meta = value.unwrap().meta.unwrap(); - let expire_ms = meta.get_expire_at_ms().unwrap(); - assert!(expire_ms - now_ms >= 59_000); + for key in [ + "__fd_clusters_v5/test%2dtenant%2did/online_nodes/test_node", + "__fd_clusters_v5/test%2dtenant%2did/online_clusters/test%2dcluster%2did/test%2dcluster%2did/test_node", + ] { + let value = kv_api.get_kv(key).await?; + let meta = value.unwrap().meta.unwrap(); + let expire_ms = meta.get_expire_at_ms().unwrap(); + assert!(expire_ms - now_ms >= 59_000); + } let now_ms = SeqV::<()>::now_ms(); cluster_api.heartbeat(&node_info).await?; - let value = kv_api - .get_kv("__fd_clusters_v4/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node") - .await?; + for key in [ + "__fd_clusters_v5/test%2dtenant%2did/online_nodes/test_node", + "__fd_clusters_v5/test%2dtenant%2did/online_clusters/test%2dcluster%2did/test%2dcluster%2did/test_node", + ] { + let value = kv_api.get_kv(key).await?; + assert!(value.unwrap().meta.unwrap().get_expire_at_ms().unwrap() - now_ms >= 59_000); + } - assert!(value.unwrap().meta.unwrap().get_expire_at_ms().unwrap() - now_ms >= 59_000); Ok(()) } @@ -150,16 +173,14 @@ fn create_test_node_info() -> NodeInfo { flight_address: String::from("ip:port"), discovery_address: "ip2:port".to_string(), binary_version: "binary_version".to_string(), + cluster_id: "test-cluster-id".to_string(), + warehouse_id: "test-cluster-id".to_string(), } } async fn new_cluster_api() -> Result<(MetaStore, ClusterMgr)> { let test_api = MetaStore::L(Arc::new(MemMeta::default())); - let cluster_manager = ClusterMgr::create( - test_api.clone(), - "test-tenant-id", - "test-cluster-id", - Duration::from_secs(60), - )?; + let cluster_manager = + ClusterMgr::create(test_api.clone(), "test-tenant-id", Duration::from_secs(60))?; Ok((test_api, cluster_manager)) } diff --git a/src/query/service/src/clusters/cluster.rs b/src/query/service/src/clusters/cluster.rs index 07bedc6f3beb..40cb98bcd343 100644 --- a/src/query/service/src/clusters/cluster.rs +++ b/src/query/service/src/clusters/cluster.rs @@ -42,7 +42,6 @@ use databend_common_management::ClusterApi; use databend_common_management::ClusterMgr; use databend_common_meta_store::MetaStore; use databend_common_meta_store::MetaStoreProvider; -use databend_common_meta_types::MatchSeq; use databend_common_meta_types::NodeInfo; use databend_common_metrics::cluster::*; use futures::future::select; @@ -237,17 +236,19 @@ impl ClusterDiscovery { ) -> Result<(Duration, Arc)> { // TODO: generate if tenant or cluster id is empty let tenant_id = &cfg.query.tenant_id; - let cluster_id = &cfg.query.cluster_id; let lift_time = Duration::from_secs(60); - let cluster_manager = - ClusterMgr::create(metastore, tenant_id.tenant_name(), cluster_id, lift_time)?; + let cluster_manager = ClusterMgr::create(metastore, tenant_id.tenant_name(), lift_time)?; Ok((lift_time, Arc::new(cluster_manager))) } #[async_backtrace::framed] pub async fn discover(&self, config: &InnerConfig) -> Result> { - match self.api_provider.get_nodes().await { + match self + .api_provider + .get_nodes(&self.cluster_id, &self.cluster_id) + .await + { Err(cause) => { metric_incr_cluster_error_count( &self.local_id, @@ -320,7 +321,11 @@ impl ClusterDiscovery { #[async_backtrace::framed] async fn drop_invalid_nodes(self: &Arc, node_info: &NodeInfo) -> Result<()> { - let current_nodes_info = match self.api_provider.get_nodes().await { + let current_nodes_info = match self + .api_provider + .get_nodes(&node_info.warehouse_id, &node_info.cluster_id) + .await + { Ok(nodes) => nodes, Err(cause) => { metric_incr_cluster_error_count( @@ -338,8 +343,7 @@ impl ClusterDiscovery { // Restart in a very short time(< heartbeat timeout) after abnormal shutdown, Which will // lead to some invalid information if before_node.flight_address.eq(&node_info.flight_address) { - let drop_invalid_node = - self.api_provider.drop_node(before_node.id, MatchSeq::GE(1)); + let drop_invalid_node = self.api_provider.drop_node(before_node.id); if let Err(cause) = drop_invalid_node.await { warn!("Drop invalid node failure: {:?}", cause); } @@ -362,10 +366,7 @@ impl ClusterDiscovery { let mut mut_signal_pin = signal.as_mut(); let signal_future = Box::pin(mut_signal_pin.next()); - let drop_node = Box::pin( - self.api_provider - .drop_node(self.local_id.clone(), MatchSeq::GE(1)), - ); + let drop_node = Box::pin(self.api_provider.drop_node(self.local_id.clone())); match futures::future::select(drop_node, signal_future).await { Either::Left((drop_node_result, _)) => { if let Err(drop_node_failure) = drop_node_result { @@ -424,7 +425,7 @@ impl ClusterDiscovery { } } - let node_info = NodeInfo::create( + let mut node_info = NodeInfo::create( self.local_id.clone(), self.local_secret.clone(), cpus, @@ -434,6 +435,8 @@ impl ClusterDiscovery { DATABEND_COMMIT_VERSION.to_string(), ); + node_info.cluster_id = self.cluster_id.clone(); + node_info.warehouse_id = self.cluster_id.clone(); self.drop_invalid_nodes(&node_info).await?; match self.api_provider.add_node(node_info.clone()).await { Ok(_) => self.start_heartbeat(node_info).await, diff --git a/src/query/service/src/test_kits/config.rs b/src/query/service/src/test_kits/config.rs index c75dad974e4e..0b65d06f0a6a 100644 --- a/src/query/service/src/test_kits/config.rs +++ b/src/query/service/src/test_kits/config.rs @@ -32,6 +32,7 @@ impl ConfigBuilder { let mut conf = InnerConfig::default(); conf.query.tenant_id = Tenant::new_literal("test"); conf.log = databend_common_tracing::Config::new_testing(); + conf.query.cluster_id = String::from("test_cluster"); // add builtin users for test let users = vec![UserConfig { diff --git a/src/query/service/tests/it/storages/fuse/io.rs b/src/query/service/tests/it/storages/fuse/io.rs index aa70923b6c5c..380aca84034b 100644 --- a/src/query/service/tests/it/storages/fuse/io.rs +++ b/src/query/service/tests/it/storages/fuse/io.rs @@ -47,6 +47,7 @@ async fn test_array_cache_of_nested_column_iusse_14502() -> Result<()> { // ~~~ let mut config = InnerConfig::default(); + config.query.cluster_id = String::from("test-cluster-id"); // memory cache is not enabled by default, let's enable it config.cache.table_data_deserialized_data_bytes = 1024 * 1024 * 10; let fixture = TestFixture::setup_with_config(&config).await?; diff --git a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt index 7e20e16c1e52..783d95cd85ee 100644 --- a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt +++ b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt @@ -78,7 +78,7 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo | 'query' | 'clickhouse_http_handler_port' | '8124' | '' | | 'query' | 'cloud_control_grpc_server_address' | 'null' | '' | | 'query' | 'cloud_control_grpc_timeout' | '0' | '' | -| 'query' | 'cluster_id' | '' | '' | +| 'query' | 'cluster_id' | 'test_cluster' | '' | | 'query' | 'data_retention_time_in_days_max' | '90' | '' | | 'query' | 'databend_enterprise_license' | 'null' | '' | | 'query' | 'default_compression' | 'auto' | '' |