Async kafka client in pure Rust.
- Multiple async runtime (
tokio
,async-std
, etc.) - All versions of kafka are supported
- Compression (
gzip
,snappy
,lz4
)
- Producer
- Consumer
- Streams
- Connect
- Admin client
[dependencies]
kafkas = { git = "https://github.com/iamazy/kafkas", branch = "main" }
To get started using kafkas:
- Producer
#[tokio::main]
async fn main() -> Result<(), Box<Error>> {
let client = Kafka::new("127.0.0.1:9092", KafkaOptions::default(), TokioExecutor).await?;
let producer = Producer::new(client, ProducerOptions::default()).await?;
let (mut tx, mut rx) = futures::channel::mpsc::unbounded();
tokio::task::spawn(Box::pin(async move {
while let Some(fut) = rx.next().await {
if let Err(e) = fut.await {
error!("{e}");
}
}
}));
let topic = topic_name("kafka");
for _ in 0..10000_0000 {
let record = TestData::new("hello kafka");
let ret = producer.send(&topic, record).await?;
let _ = tx.send(ret).await;
}
}
- Consumer
#[tokio::main]
async fn main() -> Result<(), Box<Error>> {
let client = Kafka::new("127.0.0.1:9092", KafkaOptions::default(), TokioExecutor).await?;
let mut consumer_options = ConsumerOptions::new("default");
consumer_options.auto_commit_enabled = false;
let mut consumer = Consumer::new(kafka_client, consumer_options).await?;
let consume_stream = consumer.subscribe::<&str, ConsumerRecord>(vec!["kafka"]).await?;
pin_mut!(consume_stream);
while let Some(records) = consume_stream.next().await {
for record in records {
if let Some(value) = record.value {
println!("{:?} - {}", String::from_utf8(value.to_vec())?, record.offset);
}
}
// needed only when `auto_commit_enabled` is false
consumer.commit_async().await?;
}
}
Examples can be found in examples
.
The rust version used for kafkas
development is 1.65
.
- kafka-protocol-rs : Rust implementation of the Kafka wire protocol.
- pulsar-rs : Rust Client library for Apache Pulsar
- rskafka : A minimal Rust client for Apache Kafka