Skip to content

Commit

Permalink
fix: introduce ControllerClient for cluster-wide operations
Browse files Browse the repository at this point in the history
We must not use some arbitrary broker for adding/removing topics.

All brokers can read though (w/ eventual consistency).
  • Loading branch information
crepererum committed Jan 14, 2022
1 parent 9eab27c commit 7ac1d80
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 83 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ let client = Client::new_plain(vec![connection]).await.unwrap();
// create a topic
let topic = "my_topic";
client.create_topic(
let controller_client = client.controller_client().await.unwrap();
controller_client.create_topic(
topic,
2, // partitions
1, // replication factor
Expand Down
169 changes: 169 additions & 0 deletions src/client/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
use std::sync::Arc;

use tokio::sync::Mutex;
use tracing::{debug, error, info};

use crate::{
backoff::{Backoff, BackoffConfig},
client::{Error, Result},
connection::{BrokerConnection, BrokerConnector},
messenger::RequestError,
protocol::{
error::Error as ProtocolError,
messages::{CreateTopicRequest, CreateTopicsRequest},
primitives::{Int16, Int32, NullableString, String_},
},
};

#[derive(Debug)]
pub struct ControllerClient {
brokers: Arc<BrokerConnector>,

backoff_config: BackoffConfig,

/// Current broker connection if any
current_broker: Mutex<Option<BrokerConnection>>,
}

impl ControllerClient {
pub(super) fn new(brokers: Arc<BrokerConnector>) -> Self {
Self {
brokers,
backoff_config: Default::default(),
current_broker: Mutex::new(None),
}
}

/// Create a topic
pub async fn create_topic(
&self,
name: impl Into<String> + Send,
num_partitions: i32,
replication_factor: i16,
) -> Result<()> {
let request = &CreateTopicsRequest {
topics: vec![CreateTopicRequest {
name: String_(name.into()),
num_partitions: Int32(num_partitions),
replication_factor: Int16(replication_factor),
assignments: vec![],
configs: vec![],
tagged_fields: None,
}],
// TODO: Expose as configuration parameter
timeout_ms: Int32(5_000),
validate_only: None,
tagged_fields: None,
};

self.maybe_retry("create_topic", || async move {
let broker = self.get_cached_controller_broker().await?;
let response = broker.request(request).await?;

if response.topics.len() != 1 {
return Err(Error::InvalidResponse(format!(
"Expected a single topic in response, got {}",
response.topics.len()
)));
}

let topic = response.topics.into_iter().next().unwrap();

match topic.error {
None => Ok(()),
Some(protocol_error) => match topic.error_message {
Some(NullableString(Some(msg))) => Err(Error::ServerError(protocol_error, msg)),
_ => Err(Error::ServerError(protocol_error, Default::default())),
},
}
})
.await
}

/// Takes a `request_name` and a function yielding a fallible future
/// and handles certain classes of error
async fn maybe_retry<R, F, T>(&self, request_name: &str, f: R) -> Result<T>
where
R: (Fn() -> F) + Send + Sync,
F: std::future::Future<Output = Result<T>> + Send,
{
let mut backoff = Backoff::new(&self.backoff_config);

loop {
let error = match f().await {
Ok(v) => return Ok(v),
Err(e) => e,
};

match error {
Error::Request(RequestError::Poisoned(_) | RequestError::IO(_))
| Error::Connection(_) => self.invalidate_cached_controller_broker().await,
Error::ServerError(ProtocolError::LeaderNotAvailable, _) => {}
Error::ServerError(ProtocolError::OffsetNotAvailable, _) => {}
Error::ServerError(ProtocolError::NotController, _) => {
self.invalidate_cached_controller_broker().await;
}
_ => {
error!(
e=%error,
request_name,
"request encountered fatal error",
);
return Err(error);
}
}

let backoff = backoff.next();
info!(
e=%error,
request_name,
backoff_secs=backoff.as_secs(),
"request encountered non-fatal error - backing off",
);
tokio::time::sleep(backoff).await;
}
}

/// Gets a cached [`BrokerConnection`] to any cluster controller.
async fn get_cached_controller_broker(&self) -> Result<BrokerConnection> {
let mut current_broker = self.current_broker.lock().await;
if let Some(broker) = &*current_broker {
return Ok(Arc::clone(broker));
}

info!("Creating new controller broker connection",);

let controller_id = self
.get_controller_id(self.brokers.get_cached_arbitrary_broker().await?)
.await?;
let broker = self.brokers.connect(controller_id).await?.ok_or_else(|| {
Error::InvalidResponse(format!(
"Controller {} not found in metadata response",
controller_id
))
})?;

*current_broker = Some(Arc::clone(&broker));
Ok(broker)
}

/// Invalidates the cached controller broker.
///
/// The next call to `[ContollerClient::get_cached_controller_broker]` will get a new connection
pub async fn invalidate_cached_controller_broker(&self) {
debug!("Invalidating cached controller broker");
self.current_broker.lock().await.take();
}

/// Retrieve the broker ID of the controller
async fn get_controller_id(&self, broker: BrokerConnection) -> Result<i32> {
let metadata = self.brokers.request_metadata(broker, Some(vec![])).await?;

let controller_id = metadata
.controller_id
.ok_or_else(|| Error::InvalidResponse("Leader is NULL".to_owned()))?
.0;

Ok(controller_id)
}
}
60 changes: 10 additions & 50 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,20 @@ use std::sync::Arc;
use thiserror::Error;

use crate::{
client::partition::PartitionClient,
connection::BrokerConnector,
protocol::{
messages::{CreateTopicRequest, CreateTopicsRequest},
primitives::*,
},
client::partition::PartitionClient, connection::BrokerConnector, protocol::primitives::Boolean,
topic::Topic,
};

pub mod consumer;
pub mod controller;
pub mod error;
pub mod partition;
pub mod producer;

use error::{Error, Result};

use self::controller::ControllerClient;

#[derive(Debug, Error)]
pub enum ProduceError {
#[error(transparent)]
Expand Down Expand Up @@ -67,6 +65,11 @@ impl Client {
Ok(Self { brokers })
}

/// Returns a client for performing certain cluster-wide operations.
pub async fn controller_client(&self) -> Result<ControllerClient> {
Ok(ControllerClient::new(Arc::clone(&self.brokers)))
}

/// Returns a client for performing operations on a specific partition
pub async fn partition_client(
&self,
Expand All @@ -84,7 +87,7 @@ impl Client {
pub async fn list_topics(&self) -> Result<Vec<Topic>> {
let response = self
.brokers
.request_metadata(self.brokers.get_arbitrary_cached_broker().await?, None)
.request_metadata(self.brokers.get_cached_arbitrary_broker().await?, None)
.await?;

Ok(response
Expand All @@ -101,47 +104,4 @@ impl Client {
})
.collect())
}

/// Create a topic
pub async fn create_topic(
&self,
name: impl Into<String> + Send,
num_partitions: i32,
replication_factor: i16,
) -> Result<()> {
let broker = self.brokers.get_arbitrary_cached_broker().await?;
let response = broker
.request(CreateTopicsRequest {
topics: vec![CreateTopicRequest {
name: String_(name.into()),
num_partitions: Int32(num_partitions),
replication_factor: Int16(replication_factor),
assignments: vec![],
configs: vec![],
tagged_fields: None,
}],
// TODO: Expose as configuration parameter
timeout_ms: Int32(5_000),
validate_only: None,
tagged_fields: None,
})
.await?;

if response.topics.len() != 1 {
return Err(Error::InvalidResponse(format!(
"Expected a single topic in response, got {}",
response.topics.len()
)));
}

let topic = response.topics.into_iter().next().unwrap();

match topic.error {
None => Ok(()),
Some(protocol_error) => match topic.error_message {
Some(NullableString(Some(msg))) => Err(Error::ServerError(protocol_error, msg)),
_ => Err(Error::ServerError(protocol_error, Default::default())),
},
}
}
}
2 changes: 1 addition & 1 deletion src/client/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ impl PartitionClient {
);

let leader = self
.get_leader(self.brokers.get_arbitrary_cached_broker().await?)
.get_leader(self.brokers.get_cached_arbitrary_broker().await?)
.await?;
let broker = self.brokers.connect(leader).await?.ok_or_else(|| {
Error::InvalidResponse(format!(
Expand Down
28 changes: 16 additions & 12 deletions src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use rand::prelude::*;
use std::sync::Arc;
use tracing::{info, warn};
use tracing::{debug, info, warn};

use thiserror::Error;
use tokio::io::BufStream;
Expand Down Expand Up @@ -50,8 +50,10 @@ pub struct BrokerConnector {
/// Discovered brokers in the cluster, including bootstrap brokers
topology: BrokerTopology,

/// The current cached broker
current_broker: Mutex<Option<BrokerConnection>>,
/// The cached arbitrary broker.
///
/// This one is used for metadata queries.
cached_arbitrary_broker: Mutex<Option<BrokerConnection>>,

/// The backoff configuration on error
backoff_config: BackoffConfig,
Expand All @@ -72,7 +74,7 @@ impl BrokerConnector {
Self {
bootstrap_brokers,
topology: Default::default(),
current_broker: Mutex::new(None),
cached_arbitrary_broker: Mutex::new(None),
backoff_config: Default::default(),
tls_config,
max_message_size,
Expand All @@ -82,7 +84,7 @@ impl BrokerConnector {
/// Fetch and cache broker metadata
pub async fn refresh_metadata(&self) -> Result<()> {
self.request_metadata(
self.get_arbitrary_cached_broker().await?,
self.get_cached_arbitrary_broker().await?,
// Not interested in topic metadata
Some(vec![]),
)
Expand Down Expand Up @@ -133,16 +135,18 @@ impl BrokerConnector {
tokio::time::sleep(backoff).await;
};

// Since the metadata request contains information about the cluster state, use it to update our view.
self.topology.update(&response.brokers);

Ok(response)
}

/// Invalidates the current cached broker
/// Invalidates the cached arbitrary broker.
///
/// The next call to `[BrokerPool::get_cached_broker]` will get a new connection
#[allow(dead_code)]
/// The next call to `[BrokerConnector::get_cached_arbitrary_broker]` will get a new connection
pub async fn invalidate_cached_arbitrary_broker(&self) {
self.current_broker.lock().await.take();
debug!("Invalidating cached arbitrary broker");
self.cached_arbitrary_broker.lock().await.take();
}

/// Returns a new connection to the broker with the provided id
Expand Down Expand Up @@ -171,8 +175,8 @@ impl BrokerConnector {
}

/// Gets a cached [`BrokerConnection`] to any broker
pub async fn get_arbitrary_cached_broker(&self) -> Result<BrokerConnection> {
let mut current_broker = self.current_broker.lock().await;
pub async fn get_cached_arbitrary_broker(&self) -> Result<BrokerConnection> {
let mut current_broker = self.cached_arbitrary_broker.lock().await;
if let Some(broker) = &*current_broker {
return Ok(Arc::clone(broker));
}
Expand Down Expand Up @@ -222,7 +226,7 @@ impl std::fmt::Debug for BrokerConnector {
f.debug_struct("BrokerConnector")
.field("bootstrap_brokers", &self.bootstrap_brokers)
.field("topology", &self.topology)
.field("current_broker", &self.current_broker)
.field("cached_arbitrary_broker", &self.cached_arbitrary_broker)
.field("backoff_config", &self.backoff_config)
.field("tls_config", &tls_config)
.field("max_message_size", &self.max_message_size)
Expand Down
Loading

0 comments on commit 7ac1d80

Please sign in to comment.