Skip to content

Commit

Permalink
Merge pull request #60 from influxdata/crepererum/controller_handling
Browse files Browse the repository at this point in the history
fix: select cluster controller for certain actions
  • Loading branch information
kodiakhq[bot] authored Jan 14, 2022
2 parents dd8ad43 + 48ced12 commit 377b1c1
Show file tree
Hide file tree
Showing 9 changed files with 246 additions and 88 deletions.
8 changes: 6 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ jobs:
RUST_LOG: "trace"
# Run integration tests
TEST_INTEGRATION: 1
KAFKA_CONNECT: "redpanda-0:9092"
# Don't use the first node here since this is likely the controller and we want to ensure that we automatically
# pick the controller for certain actions (e.g. topic creation) and don't just get lucky.
KAFKA_CONNECT: "redpanda-1:9092"
steps:
- checkout
- rust_components
Expand Down Expand Up @@ -244,7 +246,9 @@ jobs:
RUST_LOG: "trace"
# Run integration tests
TEST_INTEGRATION: 1
KAFKA_CONNECT: "kafka-0:9093"
# Don't use the first node here since this is likely the controller and we want to ensure that we automatically
# pick the controller for certain actions (e.g. topic creation) and don't just get lucky.
KAFKA_CONNECT: "kafka-1:9093"
steps:
- checkout
- rust_components
Expand Down
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ let client = Client::new_plain(vec![connection]).await.unwrap();
// create a topic
let topic = "my_topic";
client.create_topic(
let controller_client = client.controller_client().await.unwrap();
controller_client.create_topic(
topic,
2, // partitions
1, // replication factor
Expand Down Expand Up @@ -90,7 +91,7 @@ $ docker-compose -f docker-compose-redpanda.yml up
in one session, and then run:

```console
$ TEST_INTEGRATION=1 KAFKA_CONNECT=0.0.0.0:9092 cargo test
$ TEST_INTEGRATION=1 KAFKA_CONNECT=0.0.0.0:9093 cargo test
```

in another session.
Expand All @@ -106,7 +107,7 @@ $ docker-compose -f docker-compose-kafka.yml up
in one session, and then run:

```console
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9093 cargo test
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9094 cargo test
```

in another session.
Expand Down
2 changes: 1 addition & 1 deletion src/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub struct BackoffConfig {
impl Default for BackoffConfig {
fn default() -> Self {
Self {
init_backoff: Duration::from_secs(1),
init_backoff: Duration::from_millis(100),
max_backoff: Duration::from_secs(500),
base: 3.,
}
Expand Down
170 changes: 170 additions & 0 deletions src/client/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
use std::sync::Arc;

use tokio::sync::Mutex;
use tracing::{debug, error, info};

use crate::{
backoff::{Backoff, BackoffConfig},
client::{Error, Result},
connection::{BrokerConnection, BrokerConnector},
messenger::RequestError,
protocol::{
error::Error as ProtocolError,
messages::{CreateTopicRequest, CreateTopicsRequest},
primitives::{Int16, Int32, NullableString, String_},
},
};

#[derive(Debug)]
pub struct ControllerClient {
brokers: Arc<BrokerConnector>,

backoff_config: BackoffConfig,

/// Current broker connection if any
current_broker: Mutex<Option<BrokerConnection>>,
}

impl ControllerClient {
pub(super) fn new(brokers: Arc<BrokerConnector>) -> Self {
Self {
brokers,
backoff_config: Default::default(),
current_broker: Mutex::new(None),
}
}

/// Create a topic
pub async fn create_topic(
&self,
name: impl Into<String> + Send,
num_partitions: i32,
replication_factor: i16,
) -> Result<()> {
let request = &CreateTopicsRequest {
topics: vec![CreateTopicRequest {
name: String_(name.into()),
num_partitions: Int32(num_partitions),
replication_factor: Int16(replication_factor),
assignments: vec![],
configs: vec![],
tagged_fields: None,
}],
// TODO: Expose as configuration parameter
timeout_ms: Int32(5_000),
validate_only: None,
tagged_fields: None,
};

self.maybe_retry("create_topic", || async move {
let broker = self.get_cached_controller_broker().await?;
let response = broker.request(request).await?;

if response.topics.len() != 1 {
return Err(Error::InvalidResponse(format!(
"Expected a single topic in response, got {}",
response.topics.len()
)));
}

let topic = response.topics.into_iter().next().unwrap();

match topic.error {
None => Ok(()),
Some(protocol_error) => match topic.error_message {
Some(NullableString(Some(msg))) => Err(Error::ServerError(protocol_error, msg)),
_ => Err(Error::ServerError(protocol_error, Default::default())),
},
}
})
.await
}

/// Takes a `request_name` and a function yielding a fallible future
/// and handles certain classes of error
async fn maybe_retry<R, F, T>(&self, request_name: &str, f: R) -> Result<T>
where
R: (Fn() -> F) + Send + Sync,
F: std::future::Future<Output = Result<T>> + Send,
{
let mut backoff = Backoff::new(&self.backoff_config);

loop {
let error = match f().await {
Ok(v) => return Ok(v),
Err(e) => e,
};

match error {
// broken connection
Error::Request(RequestError::Poisoned(_) | RequestError::IO(_))
| Error::Connection(_) => self.invalidate_cached_controller_broker().await,

// our broker is actually not the controller
Error::ServerError(ProtocolError::NotController, _) => {
self.invalidate_cached_controller_broker().await;
}

// fatal
_ => {
error!(
e=%error,
request_name,
"request encountered fatal error",
);
return Err(error);
}
}

let backoff = backoff.next();
info!(
e=%error,
request_name,
backoff_secs=backoff.as_secs(),
"request encountered non-fatal error - backing off",
);
tokio::time::sleep(backoff).await;
}
}

/// Gets a cached [`BrokerConnection`] to any cluster controller.
async fn get_cached_controller_broker(&self) -> Result<BrokerConnection> {
let mut current_broker = self.current_broker.lock().await;
if let Some(broker) = &*current_broker {
return Ok(Arc::clone(broker));
}

info!("Creating new controller broker connection",);

let controller_id = self.get_controller_id().await?;
let broker = self.brokers.connect(controller_id).await?.ok_or_else(|| {
Error::InvalidResponse(format!(
"Controller {} not found in metadata response",
controller_id
))
})?;

*current_broker = Some(Arc::clone(&broker));
Ok(broker)
}

/// Invalidates the cached controller broker.
///
/// The next call to `[ContollerClient::get_cached_controller_broker]` will get a new connection
pub async fn invalidate_cached_controller_broker(&self) {
debug!("Invalidating cached controller broker");
self.current_broker.lock().await.take();
}

/// Retrieve the broker ID of the controller
async fn get_controller_id(&self) -> Result<i32> {
let metadata = self.brokers.request_metadata(None, Some(vec![])).await?;

let controller_id = metadata
.controller_id
.ok_or_else(|| Error::InvalidResponse("Leader is NULL".to_owned()))?
.0;

Ok(controller_id)
}
}
58 changes: 9 additions & 49 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,20 @@ use std::sync::Arc;
use thiserror::Error;

use crate::{
client::partition::PartitionClient,
connection::BrokerConnector,
protocol::{
messages::{CreateTopicRequest, CreateTopicsRequest},
primitives::*,
},
client::partition::PartitionClient, connection::BrokerConnector, protocol::primitives::Boolean,
topic::Topic,
};

pub mod consumer;
pub mod controller;
pub mod error;
pub mod partition;
pub mod producer;

use error::{Error, Result};

use self::controller::ControllerClient;

#[derive(Debug, Error)]
pub enum ProduceError {
#[error(transparent)]
Expand Down Expand Up @@ -67,6 +65,11 @@ impl Client {
Ok(Self { brokers })
}

/// Returns a client for performing certain cluster-wide operations.
pub async fn controller_client(&self) -> Result<ControllerClient> {
Ok(ControllerClient::new(Arc::clone(&self.brokers)))
}

/// Returns a client for performing operations on a specific partition
pub async fn partition_client(
&self,
Expand Down Expand Up @@ -98,47 +101,4 @@ impl Client {
})
.collect())
}

/// Create a topic
pub async fn create_topic(
&self,
name: impl Into<String> + Send,
num_partitions: i32,
replication_factor: i16,
) -> Result<()> {
let broker = self.brokers.get_arbitrary_cached_broker().await?;
let response = broker
.request(CreateTopicsRequest {
topics: vec![CreateTopicRequest {
name: String_(name.into()),
num_partitions: Int32(num_partitions),
replication_factor: Int16(replication_factor),
assignments: vec![],
configs: vec![],
tagged_fields: None,
}],
// TODO: Expose as configuration parameter
timeout_ms: Int32(5_000),
validate_only: None,
tagged_fields: None,
})
.await?;

if response.topics.len() != 1 {
return Err(Error::InvalidResponse(format!(
"Expected a single topic in response, got {}",
response.topics.len()
)));
}

let topic = response.topics.into_iter().next().unwrap();

match topic.error {
None => Ok(()),
Some(protocol_error) => match topic.error_message {
Some(NullableString(Some(msg))) => Err(Error::ServerError(protocol_error, msg)),
_ => Err(Error::ServerError(protocol_error, Default::default())),
},
}
}
}
27 changes: 16 additions & 11 deletions src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use rand::prelude::*;
use std::sync::Arc;
use tracing::{info, warn};
use tracing::{debug, info, warn};

use thiserror::Error;
use tokio::io::BufStream;
Expand Down Expand Up @@ -50,8 +50,10 @@ pub struct BrokerConnector {
/// Discovered brokers in the cluster, including bootstrap brokers
topology: BrokerTopology,

/// The current cached broker
current_broker: Mutex<Option<BrokerConnection>>,
/// The cached arbitrary broker.
///
/// This one is used for metadata queries.
cached_arbitrary_broker: Mutex<Option<BrokerConnection>>,

/// The backoff configuration on error
backoff_config: BackoffConfig,
Expand All @@ -72,7 +74,7 @@ impl BrokerConnector {
Self {
bootstrap_brokers,
topology: Default::default(),
current_broker: Mutex::new(None),
cached_arbitrary_broker: Mutex::new(None),
backoff_config: Default::default(),
tls_config,
max_message_size,
Expand Down Expand Up @@ -113,7 +115,7 @@ impl BrokerConnector {
// Retrieve the broker within the loop, in case it is invalidated
let broker = match broker_override.as_ref() {
Some(b) => Arc::clone(b),
None => self.get_arbitrary_cached_broker().await?,
None => self.get_cached_arbitrary_broker().await?,
};

let error = match broker.request(&request).await {
Expand Down Expand Up @@ -142,15 +144,18 @@ impl BrokerConnector {
tokio::time::sleep(backoff).await;
};

// Since the metadata request contains information about the cluster state, use it to update our view.
self.topology.update(&response.brokers);

Ok(response)
}

/// Invalidates the current cached broker
/// Invalidates the cached arbitrary broker.
///
/// The next call to `[BrokerConnector::get_arbitrary_cached_broker]` will get a new connection
/// The next call to `[BrokerConnector::get_cached_arbitrary_broker]` will get a new connection
pub async fn invalidate_cached_arbitrary_broker(&self) {
self.current_broker.lock().await.take();
debug!("Invalidating cached arbitrary broker");
self.cached_arbitrary_broker.lock().await.take();
}

/// Returns a new connection to the broker with the provided id
Expand Down Expand Up @@ -179,8 +184,8 @@ impl BrokerConnector {
}

/// Gets a cached [`BrokerConnection`] to any broker
pub async fn get_arbitrary_cached_broker(&self) -> Result<BrokerConnection> {
let mut current_broker = self.current_broker.lock().await;
pub async fn get_cached_arbitrary_broker(&self) -> Result<BrokerConnection> {
let mut current_broker = self.cached_arbitrary_broker.lock().await;
if let Some(broker) = &*current_broker {
return Ok(Arc::clone(broker));
}
Expand Down Expand Up @@ -230,7 +235,7 @@ impl std::fmt::Debug for BrokerConnector {
f.debug_struct("BrokerConnector")
.field("bootstrap_brokers", &self.bootstrap_brokers)
.field("topology", &self.topology)
.field("current_broker", &self.current_broker)
.field("cached_arbitrary_broker", &self.cached_arbitrary_broker)
.field("backoff_config", &self.backoff_config)
.field("tls_config", &tls_config)
.field("max_message_size", &self.max_message_size)
Expand Down
Loading

0 comments on commit 377b1c1

Please sign in to comment.