diff --git a/Cargo.toml b/Cargo.toml index e33543752..e70843764 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,10 +20,12 @@ serde_derive = "1.0.0" serde_json = "1.0.0" [dev-dependencies] +backoff = "0.1.5" chrono = "0.4.0" clap = "2.18.0" env_logger = "0.3.0" rand = "0.3.15" +regex = "1.1.6" tokio = "0.1.7" [features] diff --git a/docker-compose.yaml b/docker-compose.yaml index 84b0a5ae5..7525cf1d0 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -11,6 +11,8 @@ services: volumes: - .:/mount command: ./run_tests.sh + environment: + - KAFKA_VERSION=2.2.0 kafka: image: confluentinc/cp-kafka:5.2.1 diff --git a/rdkafka-sys/build.rs b/rdkafka-sys/build.rs index 2f3e9ec0e..5a427e8ad 100644 --- a/rdkafka-sys/build.rs +++ b/rdkafka-sys/build.rs @@ -73,6 +73,9 @@ fn main() { .rustified_enum("rd_kafka_conf_res_t") .rustified_enum("rd_kafka_resp_err_t") .rustified_enum("rd_kafka_timestamp_type_t") + .rustified_enum("rd_kafka_admin_op_t") + .rustified_enum("rd_kafka_ResourceType_t") + .rustified_enum("rd_kafka_ConfigSource_t") .generate() .expect("failed to generate bindings"); diff --git a/rdkafka-sys/src/types.rs b/rdkafka-sys/src/types.rs index 08d76499e..ad2e2cb1d 100644 --- a/rdkafka-sys/src/types.rs +++ b/rdkafka-sys/src/types.rs @@ -55,6 +55,30 @@ pub type RDKafkaGroupMemberInfo = bindings::rd_kafka_group_member_info; /// Native rdkafka group member information pub type RDKafkaHeaders = bindings::rd_kafka_headers_t; +/// Native rdkafka queue +pub type RDKafkaQueue = bindings::rd_kafka_queue_t; + +// Native rdkafka new topic object +pub type RDKafkaNewTopic = bindings::rd_kafka_NewTopic_t; + +// Native rdkafka delete topic object +pub type RDKafkaDeleteTopic = bindings::rd_kafka_DeleteTopic_t; + +// Native rdkafka new partitions object +pub type RDKafkaNewPartitions = bindings::rd_kafka_NewPartitions_t; + +// Native rdkafka config resource +pub type RDKafkaConfigResource = bindings::rd_kafka_ConfigResource_t; + +// Native rdkafka event +pub type RDKafkaEvent = bindings::rd_kafka_event_t; + +// Native rdkafka admin options +pub type RDKafkaAdminOptions = bindings::rd_kafka_AdminOptions_t; + +// Native rdkafka topic result +pub type RDKafkaTopicResult = bindings::rd_kafka_topic_result_t; + // ENUMS /// Client types @@ -66,6 +90,15 @@ pub use bindings::rd_kafka_conf_res_t as RDKafkaConfRes; /// Response error pub use bindings::rd_kafka_resp_err_t as RDKafkaRespErr; +/// Admin operation +pub use bindings::rd_kafka_admin_op_t as RDKafkaAdminOp; + +/// Config resource type +pub use bindings::rd_kafka_ResourceType_t as RDKafkaResourceType; + +/// Config source +pub use bindings::rd_kafka_ConfigSource_t as RDKafkaConfigSource; + /// Errors enum /// Error from the underlying rdkafka library. diff --git a/src/admin.rs b/src/admin.rs new file mode 100644 index 000000000..ca9012a87 --- /dev/null +++ b/src/admin.rs @@ -0,0 +1,1241 @@ +//! Admin client. +//! +//! The main object is the [`AdminClient`] struct. +//! +//! [`AdminClient`]: struct.AdminClient.html + +use crate::rdsys; +use crate::rdsys::types::*; + +use crate::client::{Client, ClientContext, DefaultClientContext, NativeQueue}; +use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext}; +use crate::error::{IsError, KafkaError, KafkaResult}; +use crate::util::{cstr_to_owned, timeout_to_ms, AsCArray, ErrBuf, IntoOpaque, WrappedCPointer}; + +use futures::future::{self, Either}; +use futures::{Async, Canceled, Complete, Future, Oneshot, Poll}; + +use std::collections::HashMap; +use std::ffi::{CStr, CString}; +use std::mem; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::thread::{self, JoinHandle}; +use std::time::Duration; + +// +// ********** ADMIN CLIENT ********** +// + +/// A client for the Kafka admin API. +/// +/// `AdminClient` provides programmatic access to managing a Kafka cluster, +/// notably manipulating topics, partitions, and configuration paramaters. +pub struct AdminClient { + client: Client, + queue: Arc, + should_stop: Arc, + handle: Option>, +} + +impl AdminClient { + /// Creates new topics according to the provided `NewTopic` specifications. + /// + /// Note that while the API supports creating multiple topics at once, it + /// is not transactional. Creation of some topics may succeed while others + /// fail. Be sure to check the result of each individual operation. + pub fn create_topics<'a, I>( + &self, + topics: I, + opts: &AdminOptions, + ) -> impl Future, Error = KafkaError> + where + I: IntoIterator>, + { + match self.create_topics_inner(topics, opts) { + Ok(rx) => Either::A(CreateTopicsFuture { rx }), + Err(err) => Either::B(future::err(err)), + } + } + + fn create_topics_inner<'a, I>(&self, topics: I, opts: &AdminOptions) -> KafkaResult> + where + I: IntoIterator>, + { + let mut native_topics = Vec::new(); + let mut err_buf = ErrBuf::new(); + for t in topics { + native_topics.push(t.to_native(&mut err_buf)?); + } + let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?; + unsafe { + rdsys::rd_kafka_CreateTopics( + self.client.native_ptr(), + native_topics.as_c_array(), + native_topics.len(), + native_opts.ptr(), + self.queue.ptr(), + ); + } + Ok(rx) + } + + /// Deletes the named topics. + /// + /// Note that while the API supports deleting multiple topics at once, it is + /// not transactional. Deletion of some topics may succeed while others + /// fail. Be sure to check the result of each individual operation. + pub fn delete_topics( + &self, + topic_names: &[&str], + opts: &AdminOptions, + ) -> impl Future, Error = KafkaError> { + match self.delete_topics_inner(topic_names, opts) { + Ok(rx) => Either::A(DeleteTopicsFuture { rx }), + Err(err) => Either::B(future::err(err)), + } + } + + fn delete_topics_inner(&self, topic_names: &[&str], opts: &AdminOptions) -> KafkaResult> { + let mut native_topics = Vec::new(); + let mut err_buf = ErrBuf::new(); + for tn in topic_names { + let tn_c = CString::new(*tn)?; + let native_topic = unsafe { NativeDeleteTopic::from_ptr(rdsys::rd_kafka_DeleteTopic_new(tn_c.as_ptr())) }; + native_topics.push(native_topic); + } + let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?; + unsafe { + rdsys::rd_kafka_DeleteTopics( + self.client.native_ptr(), + native_topics.as_c_array(), + native_topics.len(), + native_opts.ptr(), + self.queue.ptr(), + ); + } + Ok(rx) + } + + /// Adds additional partitions to existing topics according to the provided + /// `NewPartitions` specifications. + /// + /// Note that while the API supports creating partitions for multiple topics + /// at once, it is not transactional. Creation of partitions for some topics + /// may succeed while others fail. Be sure to check the result of each + /// individual operation. + pub fn create_partitions<'a, I>( + &self, + partitions: I, + opts: &AdminOptions, + ) -> impl Future, Error = KafkaError> + where + I: IntoIterator>, + { + match self.create_partitions_inner(partitions, opts) { + Ok(rx) => Either::A(CreatePartitionsFuture { rx }), + Err(err) => Either::B(future::err(err)), + } + } + + fn create_partitions_inner<'a, I>(&self, partitions: I, opts: &AdminOptions) -> KafkaResult> + where + I: IntoIterator>, + { + let mut native_partitions = Vec::new(); + let mut err_buf = ErrBuf::new(); + for p in partitions { + native_partitions.push(p.to_native(&mut err_buf)?); + } + let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?; + unsafe { + rdsys::rd_kafka_CreatePartitions( + self.client.native_ptr(), + native_partitions.as_c_array(), + native_partitions.len(), + native_opts.ptr(), + self.queue.ptr(), + ); + } + Ok(rx) + } + + /// Retrieves the configuration parameters for the specified resources. + /// + /// Note that while the API supports describing multiple configurations at + /// once, it is not transactional. There is no guarantee that you will see + /// a consistent snapshot of the configuration across different resources. + pub fn describe_configs<'a, I>( + &self, + configs: I, + opts: &AdminOptions, + ) -> impl Future, Error = KafkaError> + where + I: IntoIterator>, + { + match self.describe_configs_inner(configs, opts) { + Ok(rx) => Either::A(DescribeConfigsFuture { rx }), + Err(err) => Either::B(future::err(err)), + } + } + + fn describe_configs_inner<'a, I>(&self, configs: I, opts: &AdminOptions) -> KafkaResult> + where + I: IntoIterator>, + { + let mut native_configs = Vec::new(); + let mut err_buf = ErrBuf::new(); + for c in configs { + let (name, typ) = match c { + ResourceSpecifier::Topic(name) => (CString::new(*name)?, RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC), + ResourceSpecifier::Group(name) => (CString::new(*name)?, RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP), + ResourceSpecifier::Broker(id) => ( + CString::new(format!("{}", id))?, + RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER, + ), + }; + native_configs.push(unsafe { + NativeConfigResource::from_ptr(rdsys::rd_kafka_ConfigResource_new(typ, name.as_ptr())) + }); + } + let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?; + unsafe { + rdsys::rd_kafka_DescribeConfigs( + self.client.native_ptr(), + native_configs.as_c_array(), + native_configs.len(), + native_opts.ptr(), + self.queue.ptr(), + ); + } + Ok(rx) + } + + /// Sets configuration parameters for the specified resources. + /// + /// Note that while the API supports altering multiple resources at once, it + /// is not transactional. Alteration of some resources may succeed while + /// others fail. Be sure to check the result of each individual operation. + pub fn alter_configs<'a, I>( + &self, + configs: I, + opts: &AdminOptions, + ) -> impl Future, Error = KafkaError> + where + I: IntoIterator>, + { + match self.alter_configs_inner(configs, opts) { + Ok(rx) => Either::A(AlterConfigsFuture { rx }), + Err(err) => Either::B(future::err(err)), + } + } + + fn alter_configs_inner<'a, I>(&self, configs: I, opts: &AdminOptions) -> KafkaResult> + where + I: IntoIterator>, + { + let mut native_configs = Vec::new(); + let mut err_buf = ErrBuf::new(); + for c in configs { + native_configs.push(c.to_native(&mut err_buf)?); + } + let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?; + unsafe { + rdsys::rd_kafka_AlterConfigs( + self.client.native_ptr(), + native_configs.as_c_array(), + native_configs.len(), + native_opts.ptr(), + self.queue.ptr(), + ); + } + Ok(rx) + } +} + +impl FromClientConfig for AdminClient { + fn from_config(config: &ClientConfig) -> KafkaResult> { + AdminClient::from_config_and_context(config, DefaultClientContext) + } +} + +impl FromClientConfigAndContext for AdminClient { + fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult> { + let native_config = config.create_native_config()?; + // librdkafka only provides consumer and producer types. We follow the + // example of the Python bindings in choosing to pretend to be a + // producer, as producer clients are allegedly more lightweight. [0] + // + // [0]: https://github.com/confluentinc/confluent-kafka-python/blob/bfb07dfbca47c256c840aaace83d3fe26c587360/confluent_kafka/src/Admin.c#L1492-L1493 + let client = Client::new(config, native_config, RDKafkaType::RD_KAFKA_PRODUCER, context)?; + let queue = Arc::new(client.new_native_queue()); + let should_stop = Arc::new(AtomicBool::new(false)); + let handle = start_poll_thread(queue.clone(), should_stop.clone()); + Ok(AdminClient { + client, + queue, + should_stop, + handle: Some(handle), + }) + } +} + +impl Drop for AdminClient { + fn drop(&mut self) { + trace!("Stopping polling"); + self.should_stop.store(true, Ordering::Relaxed); + trace!("Waiting for polling thread termination"); + match self.handle.take().unwrap().join() { + Ok(()) => trace!("Polling stopped"), + Err(e) => warn!("Failure while terminating thread: {:?}", e), + }; + } +} + +fn start_poll_thread(queue: Arc, should_stop: Arc) -> JoinHandle<()> { + thread::Builder::new() + .name("admin client polling thread".into()) + .spawn(move || { + trace!("Admin polling thread loop started"); + loop { + let event = queue.poll(Duration::from_millis(100)); + if event.is_null() { + if should_stop.load(Ordering::Relaxed) { + // We received nothing and the thread should stop, so + // break the loop. + break; + } + continue; + } + let event = unsafe { NativeEvent::from_ptr(event) }; + let tx: Box> = + unsafe { IntoOpaque::from_ptr(rdsys::rd_kafka_event_opaque(event.ptr())) }; + let _ = tx.send(event); + } + trace!("Admin polling thread loop terminated"); + }) + .expect("Failed to start polling thread") +} + +struct NativeEvent { + ptr: *mut RDKafkaEvent, +} + +impl NativeEvent { + unsafe fn from_ptr(ptr: *mut RDKafkaEvent) -> NativeEvent { + NativeEvent { ptr } + } + + fn ptr(&self) -> *mut RDKafkaEvent { + self.ptr + } + + fn check_error(&self) -> KafkaResult<()> { + let err = unsafe { rdsys::rd_kafka_event_error(self.ptr) }; + if err.is_error() { + Err(KafkaError::AdminOp(err.into())) + } else { + Ok(()) + } + } +} + +impl Drop for NativeEvent { + fn drop(&mut self) { + trace!("Destroying event: {:?}", self.ptr); + unsafe { + rdsys::rd_kafka_event_destroy(self.ptr); + } + trace!("Event destroyed: {:?}", self.ptr); + } +} + +unsafe impl Sync for NativeEvent {} +unsafe impl Send for NativeEvent {} + +// +// ********** ADMIN OPTIONS ********** +// + +/// Options for an admin API request. +pub struct AdminOptions { + request_timeout: Option, + operation_timeout: Option, + validate_only: bool, + broker_id: Option, +} + +impl AdminOptions { + /// Creates a new `AdminOptions`. + pub fn new() -> AdminOptions { + AdminOptions { + request_timeout: None, + operation_timeout: None, + validate_only: false, + broker_id: None, + } + } + + /// Sets the overall request timeout, including broker lookup, request + /// transmission, operation time on broker, and response. + /// + /// Defaults to the `socket.timeout.ms` configuration parameter. + pub fn request_timeout>>(mut self, timeout: T) -> Self { + self.request_timeout = timeout.into(); + self + } + + /// Sets the broker's operation timeout, such as the timeout for + /// CreateTopics to complete the creation of topics on the controller before + /// returning a result to the application. + /// + /// If unset (the default), the API calls will return immediately after + /// triggering the operation. + /// + /// Only the CreateTopics, DeleteTopics, and CreatePartitions API calls + /// respect this option. + pub fn operation_timeout>>(mut self, timeout: T) -> Self { + self.operation_timeout = timeout.into(); + self + } + + /// Tells the broker to only validate the request, without performing the + /// requested operation. + /// + /// Defaults to false. + pub fn validate_only(mut self, validate_only: bool) -> Self { + self.validate_only = validate_only; + self + } + + /// Override what broker the admin request will be sent to. + /// + /// By default, a reasonable broker will be selected automatically. See the + /// librdkafka docs on `rd_kafka_AdminOptions_set_broker` for details. + pub fn broker_id>>(mut self, broker_id: T) -> Self { + self.broker_id = broker_id.into(); + self + } + + fn to_native( + &self, + client: *mut RDKafka, + err_buf: &mut ErrBuf, + ) -> KafkaResult<(NativeAdminOptions, Oneshot)> { + let native_opts = unsafe { + NativeAdminOptions::from_ptr(rdsys::rd_kafka_AdminOptions_new( + client, + RDKafkaAdminOp::RD_KAFKA_ADMIN_OP_ANY, + )) + }; + + if let Some(timeout) = self.request_timeout { + let res = unsafe { + rdsys::rd_kafka_AdminOptions_set_request_timeout( + native_opts.ptr(), + timeout_to_ms(timeout), + err_buf.as_mut_ptr(), + err_buf.len(), + ) + }; + check_rdkafka_invalid_arg(res, err_buf)?; + } + + if let Some(timeout) = self.operation_timeout { + let res = unsafe { + rdsys::rd_kafka_AdminOptions_set_operation_timeout( + native_opts.ptr(), + timeout_to_ms(timeout), + err_buf.as_mut_ptr(), + err_buf.len(), + ) + }; + check_rdkafka_invalid_arg(res, err_buf)?; + } + + if self.validate_only { + let res = unsafe { + rdsys::rd_kafka_AdminOptions_set_validate_only( + native_opts.ptr(), + 1, // true + err_buf.as_mut_ptr(), + err_buf.len(), + ) + }; + check_rdkafka_invalid_arg(res, err_buf)?; + } + + if let Some(broker_id) = self.broker_id { + let res = unsafe { + rdsys::rd_kafka_AdminOptions_set_broker( + native_opts.ptr(), + broker_id, + err_buf.as_mut_ptr(), + err_buf.len(), + ) + }; + check_rdkafka_invalid_arg(res, err_buf)?; + } + + let (tx, rx) = futures::oneshot(); + let tx = Box::new(tx); + unsafe { rdsys::rd_kafka_AdminOptions_set_opaque(native_opts.ptr, IntoOpaque::as_ptr(&tx)) }; + mem::forget(tx); + + Ok((native_opts, rx)) + } +} + +struct NativeAdminOptions { + ptr: *mut RDKafkaAdminOptions, +} + +impl NativeAdminOptions { + unsafe fn from_ptr(ptr: *mut RDKafkaAdminOptions) -> NativeAdminOptions { + NativeAdminOptions { ptr } + } + + fn ptr(&self) -> *mut RDKafkaAdminOptions { + self.ptr + } +} + +impl Drop for NativeAdminOptions { + fn drop(&mut self) { + trace!("Destroying admin options: {:?}", self.ptr); + unsafe { + rdsys::rd_kafka_AdminOptions_destroy(self.ptr); + } + trace!("Admin options destroyed: {:?}", self.ptr); + } +} + +fn check_rdkafka_invalid_arg(res: RDKafkaRespErr, err_buf: &ErrBuf) -> KafkaResult<()> { + match res.into() { + RDKafkaError::NoError => Ok(()), + RDKafkaError::InvalidArgument => { + let msg = if err_buf.len() == 0 { + "invalid argument".into() + } else { + err_buf.to_string() + }; + Err(KafkaError::AdminOpCreation(msg)) + } + res => Err(KafkaError::AdminOpCreation(format!( + "setting admin options returned unexpected error code {}", + res + ))), + } +} + +// +// ********** RESPONSE HANDLING ********** +// + +/// The result of an individual CreateTopic, DeleteTopic, or +/// CreatePartition operation. +pub type TopicResult = Result; + +fn build_topic_results(topics: *const *const RDKafkaTopicResult, n: usize) -> Vec { + let mut out = Vec::with_capacity(n); + for i in 0..n { + let topic = unsafe { *topics.offset(i as isize) }; + let name = unsafe { cstr_to_owned(rdsys::rd_kafka_topic_result_name(topic)) }; + let err = unsafe { rdsys::rd_kafka_topic_result_error(topic) }; + if err.is_error() { + out.push(Err((name, err.into()))); + } else { + out.push(Ok(name)); + } + } + out +} + +// +// Create topic handling +// + +/// Configuration for a CreateTopic operation. +#[derive(Debug)] +pub struct NewTopic<'a> { + /// The name of the new topic. + pub name: &'a str, + /// The initial number of partitions. + pub num_partitions: i32, + /// The initial replication configuration. + pub replication: TopicReplication<'a>, + /// The initial configuration parameters for the topic. + pub config: Vec<(&'a str, &'a str)>, +} + +impl<'a> NewTopic<'a> { + /// Creates a new `NewTopic`. + pub fn new(name: &'a str, num_partitions: i32, replication: TopicReplication<'a>) -> NewTopic<'a> { + NewTopic { + name, + num_partitions, + replication, + config: Vec::new(), + } + } + + /// Sets a new parameter in the initial topic configuration. + pub fn set(mut self, key: &'a str, value: &'a str) -> NewTopic<'a> { + self.config.push((key, value)); + self + } + + fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult { + let name = CString::new(self.name)?; + let repl = match self.replication { + TopicReplication::Fixed(n) => n, + TopicReplication::Variable(partitions) => { + if partitions.len() as i32 != self.num_partitions { + return Err(KafkaError::AdminOpCreation(format!( + "replication configuration for topic '{}' assigns {} partition(s), \ + which does not match the specified number of partitions ({})", + self.name, + partitions.len(), + self.num_partitions, + ))); + } + -1 + } + }; + let topic = unsafe { + rdsys::rd_kafka_NewTopic_new( + name.as_ptr(), + self.num_partitions, + repl, + err_buf.as_mut_ptr(), + err_buf.len(), + ) + }; + if topic.is_null() { + return Err(KafkaError::AdminOpCreation(err_buf.to_string())); + } + // N.B.: we wrap topic immediately, so that it is destroyed via the + // NativeNewTopic's Drop implementation if replica assignment or config + // installation fails. + let topic = unsafe { NativeNewTopic::from_ptr(topic) }; + if let TopicReplication::Variable(assignment) = self.replication { + for (partition_id, broker_ids) in assignment.into_iter().enumerate() { + let res = unsafe { + rdsys::rd_kafka_NewTopic_set_replica_assignment( + topic.ptr(), + partition_id as i32, + broker_ids.as_ptr() as *mut i32, + broker_ids.len(), + err_buf.as_mut_ptr(), + err_buf.len(), + ) + }; + check_rdkafka_invalid_arg(res, err_buf)?; + } + } + for (key, val) in &self.config { + let key_c = CString::new(*key)?; + let val_c = CString::new(*val)?; + let res = unsafe { rdsys::rd_kafka_NewTopic_set_config(topic.ptr(), key_c.as_ptr(), val_c.as_ptr()) }; + check_rdkafka_invalid_arg(res, err_buf)?; + } + Ok(topic) + } +} + +/// An assignment of partitions to replicas. +/// +/// Each element in the outer slice corresponds to the partition with that +/// index. The inner slice specifies the broker IDs to which replicas of that +/// partition should be assigned. +pub type PartitionAssignment<'a> = &'a [&'a [i32]]; + +/// Replication configuration for a new topic. +#[derive(Debug)] +pub enum TopicReplication<'a> { + /// All partitions should use the same fixed replication factor. + Fixed(i32), + /// Each partition should use the replica assignment from + /// `PartitionAssignment`. + Variable(PartitionAssignment<'a>), +} + +#[repr(transparent)] +struct NativeNewTopic { + ptr: *mut RDKafkaNewTopic, +} + +impl NativeNewTopic { + unsafe fn from_ptr(ptr: *mut RDKafkaNewTopic) -> NativeNewTopic { + NativeNewTopic { ptr } + } +} + +impl WrappedCPointer for NativeNewTopic { + type Target = RDKafkaNewTopic; + + fn ptr(&self) -> *mut RDKafkaNewTopic { + self.ptr + } +} + +impl Drop for NativeNewTopic { + fn drop(&mut self) { + trace!("Destroying new topic: {:?}", self.ptr); + unsafe { + rdsys::rd_kafka_NewTopic_destroy(self.ptr); + } + trace!("New topic destroyed: {:?}", self.ptr); + } +} + +struct CreateTopicsFuture { + rx: Oneshot, +} + +impl Future for CreateTopicsFuture { + type Item = Vec; + type Error = KafkaError; + + fn poll(&mut self) -> Poll { + match self.rx.poll() { + Ok(Async::Ready(event)) => { + event.check_error()?; + let res = unsafe { rdsys::rd_kafka_event_CreateTopics_result(event.ptr()) }; + if res.is_null() { + let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; + return Err(KafkaError::AdminOpCreation(format!( + "create topics request received response of incorrect type ({})", + typ + ))); + } + let mut n = 0; + let topics = unsafe { rdsys::rd_kafka_CreateTopics_result_topics(res, &mut n) }; + Ok(Async::Ready(build_topic_results(topics, n))) + } + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(Canceled) => Err(KafkaError::Canceled), + } + } +} + +// +// Delete topic handling +// + +#[repr(transparent)] +struct NativeDeleteTopic { + ptr: *mut RDKafkaDeleteTopic, +} + +impl NativeDeleteTopic { + unsafe fn from_ptr(ptr: *mut RDKafkaDeleteTopic) -> NativeDeleteTopic { + NativeDeleteTopic { ptr } + } +} + +impl WrappedCPointer for NativeDeleteTopic { + type Target = RDKafkaDeleteTopic; + + fn ptr(&self) -> *mut RDKafkaDeleteTopic { + self.ptr + } +} + +impl Drop for NativeDeleteTopic { + fn drop(&mut self) { + trace!("Destroying delete topic: {:?}", self.ptr); + unsafe { + rdsys::rd_kafka_DeleteTopic_destroy(self.ptr); + } + trace!("Delete topic destroyed: {:?}", self.ptr); + } +} + +struct DeleteTopicsFuture { + rx: Oneshot, +} + +impl Future for DeleteTopicsFuture { + type Item = Vec; + type Error = KafkaError; + + fn poll(&mut self) -> Poll { + match self.rx.poll() { + Ok(Async::Ready(event)) => { + event.check_error()?; + let res = unsafe { rdsys::rd_kafka_event_DeleteTopics_result(event.ptr()) }; + if res.is_null() { + let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; + return Err(KafkaError::AdminOpCreation(format!( + "delete topics request received response of incorrect type ({})", + typ + ))); + } + let mut n = 0; + let topics = unsafe { rdsys::rd_kafka_DeleteTopics_result_topics(res, &mut n) }; + Ok(Async::Ready(build_topic_results(topics, n))) + } + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(Canceled) => Err(KafkaError::Canceled), + } + } +} + +// +// Create partitions handling +// + +/// Configuration for a CreatePartitions operation. +pub struct NewPartitions<'a> { + /// The name of the topic to which partitions should be added. + pub topic_name: &'a str, + /// The total number of partitions after the operation completes. + pub new_partition_count: usize, + /// The replica assignments for the new partitions. + pub assignment: Option>, +} + +impl<'a> NewPartitions<'a> { + /// Creates a new `NewPartitions`. + pub fn new(topic_name: &'a str, new_partition_count: usize) -> NewPartitions<'a> { + NewPartitions { + topic_name, + new_partition_count, + assignment: None, + } + } + + /// Sets the partition replica assignment for the new partitions. Only + /// assignments for newly created replicas should be included. + pub fn assign(mut self, assignment: PartitionAssignment<'a>) -> NewPartitions { + self.assignment = Some(assignment); + self + } + + fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult { + let name = CString::new(self.topic_name)?; + if let Some(assignment) = self.assignment { + // If assignment contains more than self.new_partition_count + // entries, we'll trip an assertion in librdkafka that crashes the + // process. Note that this check isn't a guarantee that the + // partition assignment is valid, since the assignment should only + // contain entries for the *new* partitions added, and not any + // existing partitions, but we can let the server handle that + // validation--we just need to make sure not to crash librdkafka. + if assignment.len() > self.new_partition_count { + return Err(KafkaError::AdminOpCreation(format!( + "partition assignment for topic '{}' assigns {} partition(s), \ + which is more than the requested total number of partitions ({})", + self.topic_name, + assignment.len(), + self.new_partition_count, + ))); + } + } + let partitions = unsafe { + rdsys::rd_kafka_NewPartitions_new( + name.as_ptr(), + self.new_partition_count, + err_buf.as_mut_ptr(), + err_buf.len(), + ) + }; + if partitions.is_null() { + return Err(KafkaError::AdminOpCreation(err_buf.to_string())); + } + // N.B.: we wrap partition immediately, so that it is destroyed via + // NativeNewPartitions's Drop implementation if replica assignment or + // config installation fails. + let partitions = unsafe { NativeNewPartitions::from_ptr(partitions) }; + if let Some(assignment) = self.assignment { + for (partition_id, broker_ids) in assignment.into_iter().enumerate() { + let res = unsafe { + rdsys::rd_kafka_NewPartitions_set_replica_assignment( + partitions.ptr(), + partition_id as i32, + broker_ids.as_ptr() as *mut i32, + broker_ids.len(), + err_buf.as_mut_ptr(), + err_buf.len(), + ) + }; + check_rdkafka_invalid_arg(res, err_buf)?; + } + } + Ok(partitions) + } +} + +#[repr(transparent)] +struct NativeNewPartitions { + ptr: *mut RDKafkaNewPartitions, +} + +impl NativeNewPartitions { + unsafe fn from_ptr(ptr: *mut RDKafkaNewPartitions) -> NativeNewPartitions { + NativeNewPartitions { ptr } + } +} + +impl WrappedCPointer for NativeNewPartitions { + type Target = RDKafkaNewPartitions; + + fn ptr(&self) -> *mut RDKafkaNewPartitions { + self.ptr + } +} + +impl Drop for NativeNewPartitions { + fn drop(&mut self) { + trace!("Destroying new partitions: {:?}", self.ptr); + unsafe { + rdsys::rd_kafka_NewPartitions_destroy(self.ptr); + } + trace!("New partitions destroyed: {:?}", self.ptr); + } +} + +struct CreatePartitionsFuture { + rx: Oneshot, +} + +impl Future for CreatePartitionsFuture { + type Item = Vec; + type Error = KafkaError; + + fn poll(&mut self) -> Poll { + match self.rx.poll() { + Ok(Async::Ready(event)) => { + event.check_error()?; + let res = unsafe { rdsys::rd_kafka_event_CreatePartitions_result(event.ptr()) }; + if res.is_null() { + let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; + return Err(KafkaError::AdminOpCreation(format!( + "create partitions request received response of incorrect type ({})", + typ + ))); + } + let mut n = 0; + let topics = unsafe { rdsys::rd_kafka_CreatePartitions_result_topics(res, &mut n) }; + Ok(Async::Ready(build_topic_results(topics, n))) + } + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(Canceled) => Err(KafkaError::Canceled), + } + } +} + +// +// Describe configs handling +// + +/// The result of an individual DescribeConfig operation. +pub type ConfigResourceResult = Result; + +/// Specification of a configurable resource. +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub enum ResourceSpecifier<'a> { + /// A topic resource, identified by its name. + Topic(&'a str), + /// A group resource, identified by its ID. + Group(&'a str), + /// A broker resource, identified by its ID. + Broker(i32), +} + +/// A `ResourceSpecifier` that owns its data. +#[derive(Debug, Eq, PartialEq)] +pub enum OwnedResourceSpecifier { + /// A topic resource, identified by its name. + Topic(String), + /// A group resource, identified by its ID. + Group(String), + /// A broker resource, identified by its ID. + Broker(i32), +} + +/// The source of a configuration entry. +#[derive(Debug, Eq, PartialEq)] +pub enum ConfigSource { + /// Unknown. Note that Kafka brokers before v1.1.0 do not reliably provide + /// configuration source information. + Unknown, + /// A dynamic topic configuration. + DynamicTopic, + /// A dynamic broker configuration. + DynamicBroker, + /// The default dynamic broker configuration. + DynamicDefaultBroker, + /// The static broker configuration. + StaticBroker, + /// The hardcoded default configuration. + Default, +} + +/// An individual configuration parameter for a `ConfigResource`. +#[derive(Debug, Eq, PartialEq)] +pub struct ConfigEntry { + /// The name of the configuration parameter. + pub name: String, + /// The value of the configuration parameter. + pub value: Option, + /// The source of the configuration parameter. + pub source: ConfigSource, + /// Whether the configuration parameter is read only. + pub is_read_only: bool, + /// Whether the configuration parameter currently has the default value. + pub is_default: bool, + /// Whether the configuration parameter contains sensitive data. + pub is_sensitive: bool, +} + +/// A configurable resource and its current configuration values. +#[derive(Debug)] +pub struct ConfigResource { + /// Identifies the resource. + pub specifier: OwnedResourceSpecifier, + /// The current configuration parameters. + pub entries: Vec, +} + +impl ConfigResource { + /// Builds a `HashMap` of configuration entries, keyed by configuration + /// entry name. + pub fn entry_map(&self) -> HashMap<&str, &ConfigEntry> { + self.entries.iter().map(|e| (&*e.name, e)).collect() + } + + /// Searches the configuration entries to find the named parameter. + /// + /// For more efficient lookups, use `entry_map` to build a `HashMap` + /// instead. + pub fn get(&self, name: &str) -> Option<&ConfigEntry> { + self.entries.iter().find(|e| e.name == name) + } +} + +#[repr(transparent)] +struct NativeConfigResource { + ptr: *mut RDKafkaConfigResource, +} + +impl NativeConfigResource { + unsafe fn from_ptr(ptr: *mut RDKafkaConfigResource) -> NativeConfigResource { + NativeConfigResource { ptr } + } +} + +impl WrappedCPointer for NativeConfigResource { + type Target = RDKafkaConfigResource; + + fn ptr(&self) -> *mut RDKafkaConfigResource { + self.ptr + } +} + +impl Drop for NativeConfigResource { + fn drop(&mut self) { + trace!("Destroying config resource: {:?}", self.ptr); + unsafe { + rdsys::rd_kafka_ConfigResource_destroy(self.ptr); + } + trace!("Config resource destroyed: {:?}", self.ptr); + } +} + +fn extract_config_specifier(resource: *const RDKafkaConfigResource) -> KafkaResult { + let typ = unsafe { rdsys::rd_kafka_ConfigResource_type(resource) }; + match typ { + RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC => { + let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigResource_name(resource)) }; + Ok(OwnedResourceSpecifier::Topic(name)) + } + RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP => { + let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigResource_name(resource)) }; + Ok(OwnedResourceSpecifier::Group(name)) + } + RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER => { + let name = unsafe { CStr::from_ptr(rdsys::rd_kafka_ConfigResource_name(resource)) }.to_string_lossy(); + match name.parse::() { + Ok(id) => Ok(OwnedResourceSpecifier::Broker(id)), + Err(_) => Err(KafkaError::AdminOpCreation(format!( + "bogus broker ID in kafka response: {}", + name + ))), + } + } + _ => Err(KafkaError::AdminOpCreation(format!( + "bogus resource type in kafka response: {:?}", + typ + ))), + } +} + +fn extract_config_source(config_source: RDKafkaConfigSource) -> KafkaResult { + match config_source { + RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG => Ok(ConfigSource::Unknown), + RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG => Ok(ConfigSource::DynamicTopic), + RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG => Ok(ConfigSource::DynamicBroker), + RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG => { + Ok(ConfigSource::DynamicDefaultBroker) + } + RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG => Ok(ConfigSource::StaticBroker), + RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG => Ok(ConfigSource::Default), + _ => Err(KafkaError::AdminOpCreation(format!( + "bogus config source type in kafka response: {:?}", + config_source, + ))), + } +} + +struct DescribeConfigsFuture { + rx: Oneshot, +} + +impl Future for DescribeConfigsFuture { + type Item = Vec; + type Error = KafkaError; + + fn poll(&mut self) -> Poll { + match self.rx.poll() { + Ok(Async::Ready(event)) => { + event.check_error()?; + let res = unsafe { rdsys::rd_kafka_event_DescribeConfigs_result(event.ptr()) }; + if res.is_null() { + let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; + return Err(KafkaError::AdminOpCreation(format!( + "describe configs request received response of incorrect type ({})", + typ + ))); + } + let mut n = 0; + let resources = unsafe { rdsys::rd_kafka_DescribeConfigs_result_resources(res, &mut n) }; + let mut out = Vec::with_capacity(n); + for i in 0..n { + let resource = unsafe { *resources.offset(i as isize) }; + let specifier = extract_config_specifier(resource)?; + let mut entries_out = Vec::new(); + let mut n = 0; + let entries = unsafe { rdsys::rd_kafka_ConfigResource_configs(resource, &mut n) }; + for j in 0..n { + let entry = unsafe { *entries.offset(j as isize) }; + let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigEntry_name(entry)) }; + let value = unsafe { + let value = rdsys::rd_kafka_ConfigEntry_value(entry); + if value.is_null() { + None + } else { + Some(cstr_to_owned(value)) + } + }; + entries_out.push(ConfigEntry { + name, + value, + source: extract_config_source(unsafe { rdsys::rd_kafka_ConfigEntry_source(entry) })?, + is_read_only: unsafe { rdsys::rd_kafka_ConfigEntry_is_read_only(entry) } != 0, + is_default: unsafe { rdsys::rd_kafka_ConfigEntry_is_default(entry) } != 0, + is_sensitive: unsafe { rdsys::rd_kafka_ConfigEntry_is_sensitive(entry) } != 0, + }); + } + out.push(Ok(ConfigResource { + specifier: specifier, + entries: entries_out, + })) + } + Ok(Async::Ready(out)) + } + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(Canceled) => Err(KafkaError::Canceled), + } + } +} + +// +// Alter configs handling +// + +/// The result of an individual AlterConfig operation. +pub type AlterConfigsResult = Result; + +/// Configuration for an AlterConfig operation. +pub struct AlterConfig<'a> { + /// Identifies the resource to be altered. + pub specifier: ResourceSpecifier<'a>, + /// The configuration parameters to be updated. + pub entries: HashMap<&'a str, &'a str>, +} + +impl<'a> AlterConfig<'a> { + /// Creates a new `AlterConfig`. + pub fn new(specifier: ResourceSpecifier) -> AlterConfig { + AlterConfig { + specifier, + entries: HashMap::new(), + } + } + + /// Sets the configuration parameter named `key` to the specified `value`. + pub fn set(mut self, key: &'a str, value: &'a str) -> AlterConfig<'a> { + self.entries.insert(key, value); + self + } + + fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult { + let (name, typ) = match self.specifier { + ResourceSpecifier::Topic(name) => (CString::new(name)?, RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC), + ResourceSpecifier::Group(name) => (CString::new(name)?, RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP), + ResourceSpecifier::Broker(id) => ( + CString::new(format!("{}", id))?, + RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER, + ), + }; + // N.B.: we wrap config immediately, so that it is destroyed via the + // NativeNewTopic's Drop implementation if config installation fails. + let config = unsafe { NativeConfigResource::from_ptr(rdsys::rd_kafka_ConfigResource_new(typ, name.as_ptr())) }; + for (key, val) in &self.entries { + let key_c = CString::new(*key)?; + let val_c = CString::new(*val)?; + let res = + unsafe { rdsys::rd_kafka_ConfigResource_set_config(config.ptr(), key_c.as_ptr(), val_c.as_ptr()) }; + check_rdkafka_invalid_arg(res, err_buf)?; + } + Ok(config) + } +} + +struct AlterConfigsFuture { + rx: Oneshot, +} + +impl Future for AlterConfigsFuture { + type Item = Vec; + type Error = KafkaError; + + fn poll(&mut self) -> Poll { + match self.rx.poll() { + Ok(Async::Ready(event)) => { + event.check_error()?; + let res = unsafe { rdsys::rd_kafka_event_AlterConfigs_result(event.ptr()) }; + if res.is_null() { + let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; + return Err(KafkaError::AdminOpCreation(format!( + "alter configs request received response of incorrect type ({})", + typ + ))); + } + let mut n = 0; + let resources = unsafe { rdsys::rd_kafka_AlterConfigs_result_resources(res, &mut n) }; + let mut out = Vec::with_capacity(n); + for i in 0..n { + let resource = unsafe { *resources.offset(i as isize) }; + let specifier = extract_config_specifier(resource)?; + out.push(Ok(specifier)); + } + Ok(Async::Ready(out)) + } + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(Canceled) => Err(KafkaError::Canceled), + } + } +} diff --git a/src/client.rs b/src/client.rs index ac465930a..76eba6e5e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -233,6 +233,14 @@ impl Client { )} ) } + + /// Returns a NativeQueue from the current client. The NativeQueue shouldn't + /// outlive the client it was generated from. + pub(crate) fn new_native_queue(&self) -> NativeQueue { + unsafe { + NativeQueue::from_ptr(rdsys::rd_kafka_queue_new(self.native_ptr())) + } + } } struct NativeTopic { @@ -269,6 +277,41 @@ impl Drop for NativeTopic { } } +pub(crate) struct NativeQueue { + ptr: *mut RDKafkaQueue +} + +// The library is completely thread safe, according to the documentation. +unsafe impl Sync for NativeQueue {} +unsafe impl Send for NativeQueue {} + +impl NativeQueue { + /// Wraps a pointer to an `RDKafkaQueue` object and returns a new + /// `NativeQueue`. + unsafe fn from_ptr(ptr: *mut RDKafkaQueue) -> NativeQueue { + NativeQueue { ptr } + } + + /// Returns the pointer to the librdkafka RDKafkaQueue structure. + pub fn ptr(&self) -> *mut RDKafkaQueue { + self.ptr + } + + pub fn poll>>(&self, t: T) -> *mut RDKafkaEvent { + unsafe { rdsys::rd_kafka_queue_poll(self.ptr, timeout_to_ms(t)) } + } +} + +impl Drop for NativeQueue { + fn drop(&mut self) { + trace!("Destroying queue: {:?}", self.ptr); + unsafe { + rdsys::rd_kafka_queue_destroy(self.ptr); + } + trace!("Queue destroyed: {:?}", self.ptr); + } +} + pub(crate) unsafe extern "C" fn native_log_cb( client: *const RDKafka, level: i32, fac: *const c_char, buf: *const c_char) { diff --git a/src/error.rs b/src/error.rs index 293132ceb..37a8a641d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -32,6 +32,12 @@ impl IsError for RDKafkaConfRes { /// Represents all Kafka errors. Check the underlying `RDKafkaError` to get details. #[derive(Clone, PartialEq, Eq)] pub enum KafkaError { + /// Creation of admin operation failed. + AdminOpCreation(String), + /// The admin operation itself failed. + AdminOp(RDKafkaError), + /// The client was dropped before the operation completed. + Canceled, /// Invalid client configuration. ClientConfig(RDKafkaConfRes, String, String, String), /// Client creation failed. @@ -67,6 +73,9 @@ pub enum KafkaError { impl fmt::Debug for KafkaError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { + KafkaError::AdminOp(err) => write!(f, "KafkaError (Admin operation error: {})", err), + KafkaError::AdminOpCreation(ref err) => write!(f, "KafkaError (Admin operation creation error: {})", err), + KafkaError::Canceled => write!(f, "KafkaError (Client dropped)"), KafkaError::ClientConfig(_, ref desc, ref key, ref value) => write!(f, "KafkaError (Client config error: {} {} {})", desc, key, value), KafkaError::ClientCreation(ref err) => write!(f, "KafkaError (Client creation error: {})", err), KafkaError::ConsumerCommit(err) => write!(f, "KafkaError (Consumer commit error: {})", err), @@ -89,6 +98,9 @@ impl fmt::Debug for KafkaError { impl fmt::Display for KafkaError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { + KafkaError::AdminOp(err) => write!(f, "Admin operation error: {}", err), + KafkaError::AdminOpCreation(ref err) => write!(f, "Admin operation creation error: {}", err), + KafkaError::Canceled => write!(f, "KafkaError (Client dropped)"), KafkaError::ClientConfig(_, ref desc, ref key, ref value) => write!(f, "Client config error: {} {} {}", desc, key, value), KafkaError::ClientCreation(ref err) => write!(f, "Client creation error: {}", err), KafkaError::ConsumerCommit(err) => write!(f, "Consumer commit error: {}", err), @@ -111,6 +123,9 @@ impl fmt::Display for KafkaError { impl error::Error for KafkaError { fn description(&self) -> &str { match *self { + KafkaError::AdminOp(_) => "Admin operation error", + KafkaError::AdminOpCreation(_) => "Admin operation creation error", + KafkaError::Canceled => "Client dropped", KafkaError::ClientConfig(_, _, _, _) => "Client config error", KafkaError::ClientCreation(_) => "Client creation error", KafkaError::ConsumerCommit(_) => "Consumer commit error", @@ -132,6 +147,9 @@ impl error::Error for KafkaError { #[allow(clippy::match_same_arms)] fn cause(&self) -> Option<&error::Error> { match *self { + KafkaError::AdminOp(_) => None, + KafkaError::AdminOpCreation(_) => None, + KafkaError::Canceled => None, KafkaError::ClientConfig(_, _, _, _) => None, KafkaError::ClientCreation(_) => None, KafkaError::ConsumerCommit(ref err) => Some(err), diff --git a/src/lib.rs b/src/lib.rs index 563355729..e3f1bde98 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -224,6 +224,7 @@ extern crate rdkafka_sys as rdsys; pub use crate::rdsys::types as types; +pub mod admin; pub mod client; pub mod config; pub mod consumer; diff --git a/src/util.rs b/src/util.rs index feb0d7c94..a3eaffaa2 100644 --- a/src/util.rs +++ b/src/util.rs @@ -141,6 +141,27 @@ impl Default for ErrBuf { } } +pub(crate) trait WrappedCPointer { + type Target; + + fn ptr(&self) -> *mut Self::Target; + + fn is_null(&self) -> bool { + self.ptr().is_null() + } +} + +/// Converts a container into a C array. +pub(crate) trait AsCArray { + fn as_c_array(&self) -> *mut *mut T::Target; +} + +impl AsCArray for Vec { + fn as_c_array(&self) -> *mut *mut T::Target { + self.as_ptr() as *mut *mut T::Target + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/tests/test_admin.rs b/tests/test_admin.rs new file mode 100644 index 000000000..c3d7bc499 --- /dev/null +++ b/tests/test_admin.rs @@ -0,0 +1,390 @@ +//! Test administrative commands using the admin API. + +use backoff::{ExponentialBackoff, Operation}; + +use futures::Future; + +use std::time::Duration; + +use rdkafka::admin::{ + AdminClient, AdminOptions, AlterConfig, ConfigEntry, ConfigSource, NewPartitions, NewTopic, OwnedResourceSpecifier, + ResourceSpecifier, TopicReplication, +}; +use rdkafka::client::DefaultClientContext; +use rdkafka::consumer::{BaseConsumer, Consumer, DefaultConsumerContext}; +use rdkafka::error::{KafkaError, RDKafkaError}; +use rdkafka::metadata::Metadata; +use rdkafka::ClientConfig; + +mod utils; +use crate::utils::*; + +fn create_config() -> ClientConfig { + let mut config = ClientConfig::new(); + config.set("bootstrap.servers", get_bootstrap_server().as_str()); + config +} + +fn create_admin_client() -> AdminClient { + create_config().create().expect("admin client creation failed") +} + +fn fetch_metadata(topic: &str) -> Metadata { + let consumer: BaseConsumer = create_config().create().expect("consumer creation failed"); + let timeout = Some(Duration::from_secs(1)); + + let mut backoff = ExponentialBackoff::default(); + backoff.max_elapsed_time = Some(Duration::from_secs(5)); + (|| { + let metadata = consumer + .fetch_metadata(Some(topic), timeout) + .map_err(|e| e.to_string())?; + if metadata.topics().len() == 0 { + Err("metadata fetch returned no topics".to_string())? + } + let topic = &metadata.topics()[0]; + if topic.partitions().len() == 0 { + Err("metadata fetch returned a topic with no partitions".to_string())? + } + Ok(metadata) + }) + .retry(&mut backoff) + .unwrap() +} + +fn verify_delete(topic: &str) { + let consumer: BaseConsumer = create_config().create().expect("consumer creation failed"); + let timeout = Some(Duration::from_secs(1)); + + let mut backoff = ExponentialBackoff::default(); + backoff.max_elapsed_time = Some(Duration::from_secs(5)); + (|| { + // Asking about the topic specifically will recreate it (under the + // default Kafka configuration, at least) so we have to ask for the list + // of all topics and search through it. + let metadata = consumer.fetch_metadata(None, timeout).map_err(|e| e.to_string())?; + if let Some(_) = metadata.topics().iter().find(|t| t.name() == topic) { + Err(format!("topic {} still exists", topic))? + } + Ok(()) + }) + .retry(&mut backoff) + .unwrap() +} + +#[test] +fn test_topics() { + let admin_client = create_admin_client(); + let opts = AdminOptions::new().operation_timeout(Duration::from_secs(1)); + + // Verify that topics are created as specified, and that they can later + // be deleted. + { + let name1 = rand_test_topic(); + let name2 = rand_test_topic(); + + // Test both the builder API and the literal construction. + let topic1 = NewTopic::new(&name1, 1, TopicReplication::Fixed(1)) + .set("max.message.bytes", "1234"); + let topic2 = NewTopic { + name: &name2, + num_partitions: 3, + replication: TopicReplication::Variable(&[&[0], &[0], &[0]]), + config: Vec::new(), + }; + + let res = admin_client + .create_topics(&[topic1, topic2], &opts) + .wait() + .expect("topic creation failed"); + assert_eq!(res, &[Ok(name1.clone()), Ok(name2.clone())]); + + let metadata1 = fetch_metadata(&name1); + let metadata2 = fetch_metadata(&name2); + assert_eq!(1, metadata1.topics().len()); + assert_eq!(1, metadata2.topics().len()); + let metadata_topic1 = &metadata1.topics()[0]; + let metadata_topic2 = &metadata2.topics()[0]; + assert_eq!(&name1, metadata_topic1.name()); + assert_eq!(&name2, metadata_topic2.name()); + assert_eq!(1, metadata_topic1.partitions().len()); + assert_eq!(3, metadata_topic2.partitions().len()); + + let res = admin_client + .describe_configs( + &[ResourceSpecifier::Topic(&name1), ResourceSpecifier::Topic(&name2)], + &opts, + ) + .wait() + .expect("describe configs failed"); + let config1 = &res[0].as_ref().expect("describe configs failed on topic 1"); + let config2 = &res[1].as_ref().expect("describe configs failed on topic 2"); + let mut expected_entry1 = ConfigEntry { + name: "max.message.bytes".into(), + value: Some("1234".into()), + source: ConfigSource::DynamicTopic, + is_read_only: false, + is_default: false, + is_sensitive: false, + }; + let expected_entry2 = ConfigEntry { + name: "max.message.bytes".into(), + value: Some("1000012".into()), + source: ConfigSource::Default, + is_read_only: false, + is_default: true, + is_sensitive: false, + }; + if get_broker_version() < KafkaVersion(1, 1, 0, 0) { + expected_entry1.source = ConfigSource::Unknown; + } + assert_eq!(Some(&expected_entry1), config1.get("max.message.bytes")); + assert_eq!(Some(&expected_entry2), config2.get("max.message.bytes")); + let config_entries1 = config1.entry_map(); + let config_entries2 = config2.entry_map(); + assert_eq!(config1.entries.len(), config_entries1.len()); + assert_eq!(config2.entries.len(), config_entries2.len()); + assert_eq!(Some(&&expected_entry1), config_entries1.get("max.message.bytes")); + assert_eq!(Some(&&expected_entry2), config_entries2.get("max.message.bytes")); + + let partitions1 = NewPartitions::new(&name1, 5); + let res = admin_client + .create_partitions(&[partitions1], &opts) + .wait() + .expect("partition creation failed"); + assert_eq!(res, &[Ok(name1.clone())]); + + let mut backoff = ExponentialBackoff::default(); + backoff.max_elapsed_time = Some(Duration::from_secs(5)); + (|| { + let metadata = fetch_metadata(&name1); + let topic = &metadata.topics()[0]; + let n = topic.partitions().len(); + if n != 5 { + Err(format!("topic has {} partitions, but expected {}", n, 5))?; + } + Ok(()) + }) + .retry(&mut backoff) + .unwrap(); + + let res = admin_client + .delete_topics(&[&name1, &name2], &opts) + .wait() + .expect("topic deletion failed"); + assert_eq!(res, &[Ok(name1.clone()), Ok(name2.clone())]); + verify_delete(&name1); + verify_delete(&name2); + } + + // Verify that incorrect replication configurations are ignored when + // creating topics. + { + let topic = NewTopic::new("ignored", 1, TopicReplication::Variable(&[&[0], &[0]])); + let res = admin_client.create_topics(&[topic], &opts).wait(); + assert_eq!( + Err(KafkaError::AdminOpCreation( + "replication configuration for topic 'ignored' assigns 2 partition(s), \ + which does not match the specified number of partitions (1)" + .into() + )), + res, + ) + } + + // Verify that incorrect replication configurations are ignored when + // creating partitions. + { + let name = rand_test_topic(); + let topic = NewTopic::new(&name, 1, TopicReplication::Fixed(1)); + + let res = admin_client + .create_topics(vec![&topic], &opts) + .wait() + .expect("topic creation failed"); + assert_eq!(res, &[Ok(name.clone())]); + let _ = fetch_metadata(&name); + + // This partition specification is obviously garbage, and so trips + // a client-side error. + let partitions = NewPartitions::new(&name, 2).assign(&[&[0], &[0], &[0]]); + let res = admin_client.create_partitions(&[partitions], &opts).wait(); + assert_eq!( + res, + Err(KafkaError::AdminOpCreation(format!( + "partition assignment for topic '{}' assigns 3 partition(s), \ + which is more than the requested total number of partitions (2)", + name + ))) + ); + + // Only the server knows that this partition specification is garbage. + let partitions = NewPartitions::new(&name, 2).assign(&[&[0], &[0]]); + let res = admin_client + .create_partitions(&[partitions], &opts) + .wait() + .expect("partition creation failed"); + assert_eq!(res, &[Err((name, RDKafkaError::InvalidReplicaAssignment))],); + } + + // Verify that deleting a non-existent topic fails. + { + let name = rand_test_topic(); + let res = admin_client + .delete_topics(&[&name], &opts) + .wait() + .expect("delete topics failed"); + assert_eq!(res, &[Err((name, RDKafkaError::UnknownTopicOrPartition))]); + } + + // Verify that mixed-success operations properly report the successful and + // failing operators. + { + let name1 = rand_test_topic(); + let name2 = rand_test_topic(); + + let topic1 = NewTopic::new(&name1, 1, TopicReplication::Fixed(1)); + let topic2 = NewTopic::new(&name2, 1, TopicReplication::Fixed(1)); + + let res = admin_client + .create_topics(vec![&topic1], &opts) + .wait() + .expect("topic creation failed"); + assert_eq!(res, &[Ok(name1.clone())]); + let _ = fetch_metadata(&name1); + + let res = admin_client + .create_topics(vec![&topic1, &topic2], &opts) + .wait() + .expect("topic creation failed"); + assert_eq!( + res, + &[ + Err((name1.clone(), RDKafkaError::TopicAlreadyExists)), + Ok(name2.clone()) + ] + ); + let _ = fetch_metadata(&name2); + + let res = admin_client + .delete_topics(&[&name1], &opts) + .wait() + .expect("topic deletion failed"); + assert_eq!(res, &[Ok(name1.clone())]); + verify_delete(&name1); + + let res = admin_client + .delete_topics(&[&name2, &name1], &opts) + .wait() + .expect("topic deletion failed"); + assert_eq!( + res, + &[ + Ok(name2.clone()), + Err((name1.clone(), RDKafkaError::UnknownTopicOrPartition)) + ] + ); + } +} + +#[test] +fn test_configs() { + let admin_client = create_admin_client(); + let opts = AdminOptions::new(); + let broker = ResourceSpecifier::Broker(0); + + let res = admin_client + .describe_configs(&[broker], &opts) + .wait() + .expect("describe configs failed"); + let config = &res[0].as_ref().expect("describe configs failed"); + let orig_val = config + .get("log.flush.interval.messages") + .expect("original config entry missing") + .value + .as_ref() + .expect("original value missing"); + + let config = AlterConfig::new(broker).set("log.flush.interval.messages", "1234"); + let res = admin_client + .alter_configs(&[config], &opts) + .wait() + .expect("alter configs failed"); + assert_eq!(res, &[Ok(OwnedResourceSpecifier::Broker(0))]); + + let mut backoff = ExponentialBackoff::default(); + backoff.max_elapsed_time = Some(Duration::from_secs(5)); + (|| { + let res = admin_client + .describe_configs(&[broker], &opts) + .wait() + .expect("describe configs failed"); + let config = &res[0].as_ref().expect("describe configs failed"); + let entry = config.get("log.flush.interval.messages"); + let expected_entry = if get_broker_version() < KafkaVersion(1, 1, 0, 0) { + // Pre-1.1, the AlterConfig operation will silently fail, and the + // config will remain unchanged, which I guess is worth testing. + ConfigEntry { + name: "log.flush.interval.messages".into(), + value: Some(orig_val.clone()), + source: ConfigSource::Default, + is_read_only: true, + is_default: true, + is_sensitive: false, + } + } else { + ConfigEntry { + name: "log.flush.interval.messages".into(), + value: Some("1234".into()), + source: ConfigSource::DynamicBroker, + is_read_only: false, + is_default: false, + is_sensitive: false, + } + }; + if entry != Some(&expected_entry) { + Err(format!("{:?} != {:?}", entry, Some(&expected_entry)))? + } + Ok(()) + }) + .retry(&mut backoff) + .unwrap(); + + let config = AlterConfig::new(broker).set("log.flush.interval.ms", &orig_val); + let res = admin_client + .alter_configs(&[config], &opts) + .wait() + .expect("alter configs failed"); + assert_eq!(res, &[Ok(OwnedResourceSpecifier::Broker(0))]); +} + +// Tests whether each admin operation properly reports an error if the entire +// request fails. The original implementations failed to check this, resulting +// in confusing situations where a failed admin request would return Ok([]). +#[test] +fn test_event_errors() { + // Configure an admin client to target a Kafka server that doesn't exist, + // then set an impossible timeout. This will ensure that every request fails + // with an OperationTimedOut error, assuming, of course, that the request + // passes client-side validation. + let admin_client = ClientConfig::new() + .set("bootstrap.servers", "noexist") + .create::>() + .expect("admin client creation failed"); + let opts = AdminOptions::new().request_timeout(Duration::from_nanos(1)); + + let res = admin_client.create_topics(&[], &opts).wait(); + assert_eq!(res, Err(KafkaError::AdminOp(RDKafkaError::OperationTimedOut))); + + let res = admin_client.create_partitions(&[], &opts).wait(); + assert_eq!(res, Err(KafkaError::AdminOp(RDKafkaError::OperationTimedOut))); + + let res = admin_client.delete_topics(&[], &opts).wait(); + assert_eq!(res, Err(KafkaError::AdminOp(RDKafkaError::OperationTimedOut))); + + let res = admin_client.describe_configs(&[], &opts).wait(); + assert_eq!(res.err(), Some(KafkaError::AdminOp(RDKafkaError::OperationTimedOut))); + + let res = admin_client.alter_configs(&[], &opts).wait(); + assert_eq!(res, Err(KafkaError::AdminOp(RDKafkaError::OperationTimedOut))); +} diff --git a/tests/utils.rs b/tests/utils.rs index 3aaa7646f..fdc555b11 100644 --- a/tests/utils.rs +++ b/tests/utils.rs @@ -1,10 +1,12 @@ #![allow(dead_code)] extern crate rdkafka; extern crate rand; +extern crate regex; extern crate futures; -use rand::Rng; use futures::*; +use rand::Rng; +use regex::Regex; use rdkafka::client::ClientContext; use rdkafka::config::ClientConfig; @@ -13,7 +15,7 @@ use rdkafka::message::ToBytes; use rdkafka::statistics::Statistics; use std::collections::HashMap; -use std::env; +use std::env::{self, VarError}; #[macro_export] macro_rules! map( @@ -46,6 +48,28 @@ pub fn get_bootstrap_server() -> String { env::var("KAFKA_HOST").unwrap_or_else(|_| "localhost:9092".to_owned()) } +pub fn get_broker_version() -> KafkaVersion { + // librdkafka doesn't expose this directly, sadly. + match env::var("KAFKA_VERSION") { + Ok(v) => { + let regex = Regex::new(r"^(\d+)(?:\.(\d+))?(?:\.(\d+))?(?:\.(\d+))?$").unwrap(); + match regex.captures(&v) { + Some(captures) => { + let extract = |i| captures.get(i).map(|m| m.as_str().parse().unwrap()).unwrap_or(0); + KafkaVersion(extract(1), extract(2), extract(3), extract(4)) + }, + None => panic!("KAFKA_VERSION env var was not in expected [n[.n[.n[.n]]]] format") + } + }, + Err(VarError::NotUnicode(_)) => panic!("KAFKA_VERSION env var contained non-unicode characters"), + // If the environment variable is unset, assume we're running the latest version. + Err(VarError::NotPresent) => KafkaVersion(std::u32::MAX, std::u32::MAX, std::u32::MAX, std::u32::MAX), + } +} + +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd)] +pub struct KafkaVersion(pub u32, pub u32, pub u32, pub u32); + pub struct TestContext { _some_data: i64, // Add some data so that valgrind can check proper allocation }