Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support automatic DNS lookup for kafka bootstrap servers #3379

Merged
merged 14 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,16 @@ pub enum Error {
error: rskafka::client::error::Error,
},

#[snafu(display("Failed to resolve Kafka broker endpoint {:?}", broker_endpoint))]
ResolveKafkaEndpoint {
broker_endpoint: String,
#[snafu(source)]
error: std::io::Error,
},

#[snafu(display("Endpoint ip not found for broker endpoint: {:?}", broker_endpoint))]
EndpointIpNotFound { broker_endpoint: String },

#[snafu(display("Failed to build a Kafka controller client"))]
BuildKafkaCtrlClient {
location: Location,
Expand Down Expand Up @@ -425,6 +435,8 @@ impl ErrorExt for Error {
| BuildKafkaClient { .. }
| BuildKafkaCtrlClient { .. }
| BuildKafkaPartitionClient { .. }
| ResolveKafkaEndpoint { .. }
| EndpointIpNotFound { .. }
| ProduceRecord { .. }
| CreateKafkaWalTopic { .. }
| EmptyTopicPool { .. }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ use rskafka::client::{Client, ClientBuilder};
use rskafka::record::Record;
use rskafka::BackoffConfig;
use snafu::{ensure, ResultExt};
use tokio::net;

use crate::error::{
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu,
CreateKafkaWalTopicSnafu, DecodeJsonSnafu, EncodeJsonSnafu, InvalidNumTopicsSnafu,
ProduceRecordSnafu, Result,
CreateKafkaWalTopicSnafu, DecodeJsonSnafu, EncodeJsonSnafu, EndpointIpNotFoundSnafu,
InvalidNumTopicsSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu, Result,
};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::PutRequest;
Expand Down Expand Up @@ -108,6 +109,26 @@ impl TopicManager {
Ok(())
}

async fn resolve_broker_endpoint(broker_endpoint: &str) -> Result<String> {
let ips: Vec<_> = net::lookup_host(broker_endpoint)
.await
.with_context(|_| ResolveKafkaEndpointSnafu {
broker_endpoint: broker_endpoint.to_string(),
})?
.into_iter()
// Not sure if we should filter out ipv6 addresses
.filter(|addr| addr.is_ipv4())
.collect();
J0HN50N133 marked this conversation as resolved.
Show resolved Hide resolved
if ips.is_empty() {
return (|| {
EndpointIpNotFoundSnafu {
broker_endpoint: broker_endpoint.to_string(),
}
.fail()
})();
}
Ok(ips[0].to_string())
}
/// Tries to create topics specified by indexes in `to_be_created`.
async fn try_create_topics(&self, topics: &[String], to_be_created: &[usize]) -> Result<()> {
// Builds an kafka controller client for creating topics.
Expand All @@ -117,7 +138,11 @@ impl TopicManager {
base: self.config.backoff.base as f64,
deadline: self.config.backoff.deadline,
};
let client = ClientBuilder::new(self.config.broker_endpoints.clone())
let mut broker_endpoints = Vec::with_capacity(self.config.broker_endpoints.len());
for endpoint in &self.config.broker_endpoints {
broker_endpoints.push(Self::resolve_broker_endpoint(endpoint).await?);
}
J0HN50N133 marked this conversation as resolved.
Show resolved Hide resolved
let client = ClientBuilder::new(broker_endpoints)
.backoff_config(backoff_config)
.build()
.await
Expand Down
10 changes: 10 additions & 0 deletions src/log-store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,16 @@ pub enum Error {
error: rskafka::client::error::Error,
},

#[snafu(display("Failed to resolve Kafka broker endpoint {:?}", broker_endpoint))]
ResolveKafkaEndpoint {
broker_endpoint: String,
#[snafu(source)]
error: std::io::Error,
},

#[snafu(display("Failed to find ip for broker endpoint: {:?}", broker_endpoint))]
EndpointIpNotFound { broker_endpoint: String },
niebayes marked this conversation as resolved.
Show resolved Hide resolved

#[snafu(display(
"Failed to build a Kafka partition client, topic: {}, partition: {}",
topic,
Expand Down
32 changes: 30 additions & 2 deletions src/log-store/src/kafka/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@ use rskafka::client::producer::{BatchProducer, BatchProducerBuilder};
use rskafka::client::{Client as RsKafkaClient, ClientBuilder};
use rskafka::BackoffConfig;
use snafu::ResultExt;
use tokio::net;
use tokio::sync::RwLock;

use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result};
use crate::error::{
BuildClientSnafu, BuildPartitionClientSnafu, EndpointIpNotFoundSnafu,
ResolveKafkaEndpointSnafu, Result,
};

// Each topic only has one partition for now.
// The `DEFAULT_PARTITION` refers to the index of the partition.
Expand Down Expand Up @@ -80,7 +84,11 @@ impl ClientManager {
base: config.backoff.base as f64,
deadline: config.backoff.deadline,
};
let client = ClientBuilder::new(config.broker_endpoints.clone())
let mut broker_endpoints = Vec::with_capacity(config.broker_endpoints.len());
for endpoint in &config.broker_endpoints {
broker_endpoints.push(Self::resolve_broker_endpoint(endpoint).await?);
}
J0HN50N133 marked this conversation as resolved.
Show resolved Hide resolved
let client = ClientBuilder::new(broker_endpoints)
.backoff_config(backoff_config)
.build()
.await
Expand All @@ -94,6 +102,26 @@ impl ClientManager {
client_pool: RwLock::new(HashMap::new()),
})
}
async fn resolve_broker_endpoint(broker_endpoint: &str) -> Result<String> {
let ips: Vec<_> = net::lookup_host(broker_endpoint)
.await
.with_context(|_| ResolveKafkaEndpointSnafu {
broker_endpoint: broker_endpoint.to_string(),
})?
.into_iter()
// Not sure if we should filter out ipv6 addresses
.filter(|addr| addr.is_ipv4())
.collect();
if ips.is_empty() {
return (|| {
EndpointIpNotFoundSnafu {
broker_endpoint: broker_endpoint.to_string(),
}
.fail()
})();
}
Ok(ips[0].to_string())
}

/// Gets the client associated with the topic. If the client does not exist, a new one will
/// be created and returned.
Expand Down
Loading