diff --git a/src/client.rs b/src/client.rs index b4bf59785..030513648 100644 --- a/src/client.rs +++ b/src/client.rs @@ -9,6 +9,7 @@ use std::os::raw::c_void; use std::ptr; use std::string::ToString; use std::time::Duration; +use std::os::raw::c_char; use serde_json; @@ -123,7 +124,7 @@ impl Client { unsafe { rdsys::rd_kafka_conf_set_error_cb(native_config.ptr(), Some(native_error_cb::)) }; let client_ptr = unsafe { - rdsys::rd_kafka_new(rd_kafka_type, native_config.ptr_move(), errstr.as_ptr() as *mut i8, errstr.len()) + rdsys::rd_kafka_new(rd_kafka_type, native_config.ptr_move(), errstr.as_ptr() as *mut c_char, errstr.len()) }; trace!("Create new librdkafka client {:p}", client_ptr); @@ -271,7 +272,7 @@ impl Drop for NativeTopic { pub(crate) unsafe extern "C" fn native_log_cb( client: *const RDKafka, level: i32, - fac: *const i8, buf: *const i8) { + fac: *const c_char, buf: *const c_char) { let fac = CStr::from_ptr(fac).to_string_lossy(); let log_message = CStr::from_ptr(buf).to_string_lossy(); @@ -281,7 +282,7 @@ pub(crate) unsafe extern "C" fn native_log_cb( } pub(crate) unsafe extern "C" fn native_stats_cb( - _conf: *mut RDKafka, json: *mut i8, json_len: usize, + _conf: *mut RDKafka, json: *mut c_char, json_len: usize, opaque: *mut c_void) -> i32 { let context = Box::from_raw(opaque as *mut C); @@ -302,7 +303,7 @@ pub(crate) unsafe extern "C" fn native_stats_cb( } pub(crate) unsafe extern "C" fn native_error_cb( - _client: *mut RDKafka, err: i32, reason: *const i8, + _client: *mut RDKafka, err: i32, reason: *const c_char, opaque: *mut c_void) { let err = rdsys::primitive_to_rd_kafka_resp_err_t(err) .expect("global error not an rd_kafka_resp_err_t"); diff --git a/src/config.rs b/src/config.rs index 5c9c7743f..6ab2c0e08 100644 --- a/src/config.rs +++ b/src/config.rs @@ -29,6 +29,7 @@ use crate::util::bytes_cstr_to_owned; use std::collections::HashMap; use std::ffi::CString; use std::mem; +use std::os::raw::c_char; const ERR_LEN: usize = 256; @@ -151,7 +152,7 @@ impl ClientConfig { let value_c = CString::new(value.to_string())?; let ret = unsafe { rdsys::rd_kafka_conf_set(conf, key_c.as_ptr(), value_c.as_ptr(), - errstr.as_ptr() as *mut i8, errstr.len()) + errstr.as_ptr() as *mut c_char, errstr.len()) }; if ret.is_error() { let descr = unsafe { bytes_cstr_to_owned(&errstr) }; diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index 101ef2bdc..f009745d3 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -176,7 +176,7 @@ impl Consumer for BaseConsumer { } let ret_code = unsafe { rdsys::rd_kafka_subscribe(self.client.native_ptr(), tpl.ptr()) }; if ret_code.is_error() { - let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) }; + let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code) as *const i8) }; return Err(KafkaError::Subscription(error)); }; Ok(()) @@ -189,7 +189,7 @@ impl Consumer for BaseConsumer { fn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()> { let ret_code = unsafe { rdsys::rd_kafka_assign(self.client.native_ptr(), assignment.ptr()) }; if ret_code.is_error() { - let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) }; + let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code) as *const i8) }; return Err(KafkaError::Subscription(error)); }; Ok(()) diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index b9500778b..b3164e648 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -51,7 +51,7 @@ pub trait ConsumerContext: ClientContext { } RDKafkaRespErr::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS => Rebalance::Revoke, _ => { - let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(err)) }; + let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(err) as *const i8) }; error!("Error rebalancing: {}", error); Rebalance::Error(error) } diff --git a/src/util.rs b/src/util.rs index aa32a0a19..f3d86203e 100644 --- a/src/util.rs +++ b/src/util.rs @@ -6,6 +6,7 @@ use std::os::raw::c_void; use std::ptr; use std::slice; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::os::raw::c_char; /// Return a tuple representing the version of `librdkafka` in /// hexadecimal and string format. @@ -102,12 +103,12 @@ impl IntoOpaque for Box { // TODO: check if the implementation returns a copy of the data and update the documentation /// Converts a byte array representing a C string into a String. pub unsafe fn bytes_cstr_to_owned(bytes_cstr: &[i8]) -> String { - CStr::from_ptr(bytes_cstr.as_ptr()).to_string_lossy().into_owned() + CStr::from_ptr(bytes_cstr.as_ptr() as *const c_char).to_string_lossy().into_owned() } /// Converts a C string into a String. pub unsafe fn cstr_to_owned(cstr: *const i8) -> String { - CStr::from_ptr(cstr).to_string_lossy().into_owned() + CStr::from_ptr(cstr as *const c_char).to_string_lossy().into_owned() } #[cfg(test)]