Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): add discovery nodes api #16353

Merged
merged 5 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/meta/types/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub struct NodeInfo {
pub cpu_nums: u64,
pub version: u32,
pub flight_address: String,
pub discovery_address: String,
pub binary_version: String,
}

Expand All @@ -88,6 +89,7 @@ impl NodeInfo {
secret: String,
cpu_nums: u64,
flight_address: String,
discovery_address: String,
binary_version: String,
) -> NodeInfo {
NodeInfo {
Expand All @@ -96,6 +98,7 @@ impl NodeInfo {
cpu_nums,
version: 0,
flight_address,
discovery_address,
binary_version,
}
}
Expand Down
1 change: 1 addition & 0 deletions src/meta/types/tests/it/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ fn test_node_info_ip_port() -> anyhow::Result<()> {
cpu_nums: 1,
version: 1,
flight_address: "1.2.3.4:123".to_string(),
discovery_address: "4.5.6.7:456".to_string(),
binary_version: "v0.8-binary-version".to_string(),
};

Expand Down
5 changes: 5 additions & 0 deletions src/query/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1446,6 +1446,9 @@ pub struct QueryConfig {
#[clap(long, value_name = "VALUE", default_value = "127.0.0.1:9090")]
pub flight_api_address: String,

#[clap(long, value_name = "VALUE", default_value = "")]
pub discovery_address: String,

#[clap(long, value_name = "VALUE", default_value = "127.0.0.1:8080")]
pub admin_api_address: String,

Expand Down Expand Up @@ -1714,6 +1717,7 @@ impl TryInto<InnerQueryConfig> for QueryConfig {
http_handler_port: self.http_handler_port,
http_handler_result_timeout_secs: self.http_handler_result_timeout_secs,
flight_api_address: self.flight_api_address,
discovery_address: self.discovery_address,
flight_sql_handler_host: self.flight_sql_handler_host,
flight_sql_handler_port: self.flight_sql_handler_port,
admin_api_address: self.admin_api_address,
Expand Down Expand Up @@ -1805,6 +1809,7 @@ impl From<InnerQueryConfig> for QueryConfig {
flight_api_address: inner.flight_api_address,
flight_sql_handler_host: inner.flight_sql_handler_host,
flight_sql_handler_port: inner.flight_sql_handler_port,
discovery_address: inner.discovery_address,
admin_api_address: inner.admin_api_address,
metric_api_address: inner.metric_api_address,
http_handler_tls_server_cert: inner.http_handler_tls_server_cert,
Expand Down
2 changes: 2 additions & 0 deletions src/query/config/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ pub struct QueryConfig {
pub http_handler_port: u16,
pub http_handler_result_timeout_secs: u64,
pub flight_api_address: String,
pub discovery_address: String,
pub flight_sql_handler_host: String,
pub flight_sql_handler_port: u16,
pub admin_api_address: String,
Expand Down Expand Up @@ -267,6 +268,7 @@ impl Default for QueryConfig {
flight_api_address: "127.0.0.1:9090".to_string(),
flight_sql_handler_host: "127.0.0.1".to_string(),
flight_sql_handler_port: 8900,
discovery_address: "".to_string(),
admin_api_address: "127.0.0.1:8080".to_string(),
metric_api_address: "127.0.0.1:7070".to_string(),
api_tls_server_cert: "".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion src/query/management/src/cluster/cluster_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use databend_common_meta_types::Operation;

use crate::cluster::ClusterApi;

pub static CLUSTER_API_KEY_PREFIX: &str = "__fd_clusters_v2";
pub static CLUSTER_API_KEY_PREFIX: &str = "__fd_clusters_v3";

pub struct ClusterMgr {
metastore: MetaStore,
Expand Down
7 changes: 4 additions & 3 deletions src/query/management/tests/it/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async fn test_successfully_add_node() -> Result<()> {
let node_info = create_test_node_info();
cluster_api.add_node(node_info.clone()).await?;
let value = kv_api
.get_kv("__fd_clusters_v2/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node")
.get_kv("__fd_clusters_v3/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node")
.await?;

match value {
Expand Down Expand Up @@ -122,7 +122,7 @@ async fn test_successfully_heartbeat_node() -> Result<()> {
cluster_api.add_node(node_info.clone()).await?;

let value = kv_api
.get_kv("__fd_clusters_v2/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node")
.get_kv("__fd_clusters_v3/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node")
.await?;

let meta = value.unwrap().meta.unwrap();
Expand All @@ -133,7 +133,7 @@ async fn test_successfully_heartbeat_node() -> Result<()> {
cluster_api.heartbeat(&node_info, MatchSeq::GE(1)).await?;

let value = kv_api
.get_kv("__fd_clusters_v2/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node")
.get_kv("__fd_clusters_v3/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node")
.await?;

assert!(value.unwrap().meta.unwrap().get_expire_at_ms().unwrap() - now_ms >= 59_000);
Expand All @@ -147,6 +147,7 @@ fn create_test_node_info() -> NodeInfo {
cpu_nums: 0,
version: 0,
flight_address: String::from("ip:port"),
discovery_address: "ip2:port".to_string(),
binary_version: "binary_version".to_string(),
}
}
Expand Down
41 changes: 27 additions & 14 deletions src/query/service/src/clusters/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,21 +336,33 @@ impl ClusterDiscovery {
pub async fn register_to_metastore(self: &Arc<Self>, cfg: &InnerConfig) -> Result<()> {
let cpus = cfg.query.num_cpus;
let mut address = cfg.query.flight_api_address.clone();
let mut discovery_address = match cfg.query.discovery_address.is_empty() {
true => format!(
"{}:{}",
cfg.query.http_handler_host, cfg.query.http_handler_port
),
false => cfg.query.discovery_address.clone(),
};

if let Ok(socket_addr) = SocketAddr::from_str(&address) {
let ip_addr = socket_addr.ip();
if ip_addr.is_loopback() || ip_addr.is_unspecified() {
if let Some(local_addr) = self.api_provider.get_local_addr().await? {
let local_socket_addr = SocketAddr::from_str(&local_addr)?;
let new_addr = format!("{}:{}", local_socket_addr.ip(), socket_addr.port());
warn!(
"Detected loopback or unspecified address as cluster flight endpoint. \
We rewrite it(\"{}\" -> \"{}\") for advertising to other nodes. \
If there are proxies between nodes, you can specify endpoint with --flight-api-address.",
address, new_addr
);

address = new_addr;
for (lookup_ip, typ) in [
(&mut address, "flight-api-address"),
(&mut discovery_address, "discovery-address"),
] {
if let Ok(socket_addr) = SocketAddr::from_str(lookup_ip) {
let ip_addr = socket_addr.ip();
if ip_addr.is_loopback() || ip_addr.is_unspecified() {
if let Some(local_addr) = self.api_provider.get_local_addr().await? {
let local_socket_addr = SocketAddr::from_str(&local_addr)?;
let new_addr = format!("{}:{}", local_socket_addr.ip(), socket_addr.port());
warn!(
"Detected loopback or unspecified address as {} endpoint. \
We rewrite it(\"{}\" -> \"{}\") for advertising to other nodes. \
If there are proxies between nodes, you can specify endpoint with --{}.",
typ, lookup_ip, new_addr, typ
);

*lookup_ip = new_addr;
}
}
}
}
Expand All @@ -360,6 +372,7 @@ impl ClusterDiscovery {
self.local_secret.clone(),
cpus,
address,
discovery_address,
DATABEND_COMMIT_VERSION.to_string(),
);

Expand Down
8 changes: 8 additions & 0 deletions src/query/service/src/servers/http/http_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use poem::EndpointExt;
use poem::IntoEndpoint;
use poem::Route;

use super::v1::discovery_nodes;
use super::v1::logout_handler;
use super::v1::upload_to_stage;
use crate::servers::http::middleware::json_response;
Expand Down Expand Up @@ -136,6 +137,13 @@ impl HttpHandler {
self.kind,
EndpointKind::StartQuery,
)),
)
.at(
"/discovery_nodes",
get(discovery_nodes).with(HTTPSessionMiddleware::create(
self.kind,
EndpointKind::StartQuery,
)),
);

let ep_clickhouse =
Expand Down
52 changes: 52 additions & 0 deletions src/query/service/src/servers/http/v1/discovery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_config::GlobalConfig;
use poem::error::InternalServerError;
use poem::error::Result as PoemResult;
use poem::web::Json;
use poem::Request;

use crate::clusters::ClusterDiscovery;
use crate::clusters::ClusterHelper;
use crate::servers::http::v1::HttpQueryContext;

#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub struct DiscoveryNode {
pub address: String,
}

#[poem::handler]
#[async_backtrace::framed]
pub async fn discovery_nodes(
_: &HttpQueryContext,
_req: &Request,
) -> PoemResult<Json<Vec<DiscoveryNode>>> {
let config = GlobalConfig::instance();
let cluster = ClusterDiscovery::instance()
.discover(&config)
.await
.map_err(InternalServerError)?;

let nodes = cluster.get_nodes();
let mut discovery_nodes = Vec::with_capacity(nodes.len());

for node in nodes {
discovery_nodes.push(DiscoveryNode {
address: node.discovery_address.clone(),
});
}

Ok(Json(discovery_nodes))
}
2 changes: 2 additions & 0 deletions src/query/service/src/servers/http/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod discovery;
mod http_query_handlers;
mod query;
mod session;
mod stage;
pub mod string_block;
mod suggestions;

pub use discovery::discovery_nodes;
pub use http_query_handlers::make_final_uri;
pub use http_query_handlers::make_page_uri;
pub use http_query_handlers::make_state_uri;
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/test_kits/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ impl ClusterDescriptor {
"".to_string(),
0,
addr.into(),
"".to_string(),
DATABEND_COMMIT_VERSION.to_string(),
)));
ClusterDescriptor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo
| 'query' | 'default_compression' | 'auto' | '' |
| 'query' | 'default_storage_format' | 'auto' | '' |
| 'query' | 'disable_system_table_load' | 'false' | '' |
| 'query' | 'discovery_address' | '' | '' |
| 'query' | 'enable_meta_data_upgrade_json_to_pb_from_v307' | 'false' | '' |
| 'query' | 'enable_udf_server' | 'false' | '' |
| 'query' | 'flight_api_address' | '127.0.0.1:9090' | '' |
Expand Down
Loading