Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support automatic DNS lookup for kafka bootstrap servers #3379

Merged
merged 14 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,9 @@ pub enum Error {
error: rskafka::client::error::Error,
},

#[snafu(display("Failed to resolve Kafka broker endpoint."))]
ResolveKafkaEndpoint { source: common_wal::error::Error },

#[snafu(display("Failed to build a Kafka controller client"))]
BuildKafkaCtrlClient {
location: Location,
Expand Down Expand Up @@ -425,6 +428,7 @@ impl ErrorExt for Error {
| BuildKafkaClient { .. }
| BuildKafkaCtrlClient { .. }
| BuildKafkaPartitionClient { .. }
| ResolveKafkaEndpoint { .. }
| ProduceRecord { .. }
| CreateKafkaWalTopic { .. }
| EmptyTopicPool { .. }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use snafu::{ensure, ResultExt};
use crate::error::{
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu,
CreateKafkaWalTopicSnafu, DecodeJsonSnafu, EncodeJsonSnafu, InvalidNumTopicsSnafu,
ProduceRecordSnafu, Result,
ProduceRecordSnafu, ResolveKafkaEndpointSnafu, Result,
};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::PutRequest;
Expand Down Expand Up @@ -117,7 +117,16 @@ impl TopicManager {
base: self.config.backoff.base as f64,
deadline: self.config.backoff.deadline,
};
let client = ClientBuilder::new(self.config.broker_endpoints.clone())
let broker_endpoints =
futures::future::try_join_all(self.config.broker_endpoints.iter().map(
|endpoint| async move {
common_wal::resolve_to_ipv4(endpoint)
.await
.context(ResolveKafkaEndpointSnafu)
},
))
.await?;
let client = ClientBuilder::new(broker_endpoints)
.backoff_config(backoff_config)
.build()
.await
Expand Down
5 changes: 5 additions & 0 deletions src/common/wal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,18 @@ testing = []
workspace = true

[dependencies]
async-trait.workspace = true
tisonkun marked this conversation as resolved.
Show resolved Hide resolved
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-telemetry.workspace = true
futures-util.workspace = true
humantime-serde.workspace = true
rskafka.workspace = true
serde.workspace = true
serde_with.workspace = true
snafu.workspace = true
tokio.workspace = true

[dev-dependencies]
serde_json.workspace = true
Expand Down
33 changes: 33 additions & 0 deletions src/common/wal/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_macro::stack_trace_debug;
use snafu::Snafu;

#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Failed to resolve endpoint {:?}", broker_endpoint))]
ResolveEndpoint {
broker_endpoint: String,
#[snafu(source)]
error: std::io::Error,
},

#[snafu(display("Failed to find ipv4 endpoint: {:?}", broker_endpoint))]
EndpointIPV4NotFound { broker_endpoint: String },
J0HN50N133 marked this conversation as resolved.
Show resolved Hide resolved
}

pub type Result<T> = std::result::Result<T, Error>;
45 changes: 45 additions & 0 deletions src/common/wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(assert_matches)]
use error::{EndpointIPV4NotFoundSnafu, ResolveEndpointSnafu, Result};
tisonkun marked this conversation as resolved.
Show resolved Hide resolved
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use tokio::net;

pub mod config;
pub mod error;
pub mod options;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
Expand All @@ -30,3 +35,43 @@ pub enum TopicSelectorType {
#[default]
RoundRobin,
}

pub async fn resolve_to_ipv4(broker_endpoint: &str) -> Result<String> {
net::lookup_host(broker_endpoint)
.await
.context(ResolveEndpointSnafu { broker_endpoint })?
// only IPv4 addresses are valid
.find_map(|addr| addr.is_ipv4().then_some(addr.to_string()))
.context(EndpointIPV4NotFoundSnafu { broker_endpoint })
}

#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;

use super::*;
use crate::error::Error;

// test for resolve_broker_endpoint
#[tokio::test]
async fn test_valid_host() {
let host = "localhost:9092";
let got = resolve_to_ipv4(host).await;
assert_eq!(got.unwrap(), "127.0.0.1:9092");
}

#[tokio::test]
async fn test_valid_host_ipv6() {
// the host is valid, it is an IPv6 address, but we only accept IPv4 addresses
tisonkun marked this conversation as resolved.
Show resolved Hide resolved
let host = "::1:9092";
let got = resolve_to_ipv4(host).await;
assert_matches!(got.unwrap_err(), Error::EndpointIPV4NotFound { .. });
}

#[tokio::test]
async fn test_invalid_host() {
let host = "non-exist-host:9092";
let got = resolve_to_ipv4(host).await;
assert_matches!(got.unwrap_err(), Error::ResolveEndpoint { .. });
}
}
3 changes: 3 additions & 0 deletions src/log-store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ pub enum Error {
error: rskafka::client::error::Error,
},

#[snafu(display("Failed to resolve Kafka broker endpoint."))]
ResolveKafkaEndpoint { source: common_wal::error::Error },

#[snafu(display(
"Failed to build a Kafka partition client, topic: {}, partition: {}",
topic,
Expand Down
14 changes: 12 additions & 2 deletions src/log-store/src/kafka/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use rskafka::BackoffConfig;
use snafu::ResultExt;
use tokio::sync::RwLock;

use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result};
use crate::error::{
BuildClientSnafu, BuildPartitionClientSnafu, ResolveKafkaEndpointSnafu, Result,
};

// Each topic only has one partition for now.
// The `DEFAULT_PARTITION` refers to the index of the partition.
Expand Down Expand Up @@ -80,7 +82,15 @@ impl ClientManager {
base: config.backoff.base as f64,
deadline: config.backoff.deadline,
};
let client = ClientBuilder::new(config.broker_endpoints.clone())
let broker_endpoints = futures::future::try_join_all(config.broker_endpoints.iter().map(
|endpoint| async move {
common_wal::resolve_to_ipv4(endpoint)
.await
.context(ResolveKafkaEndpointSnafu)
},
))
.await?;
let client = ClientBuilder::new(broker_endpoints)
.backoff_config(backoff_config)
.build()
.await
Expand Down