Skip to content

Commit

Permalink
BroadcastSinkError
Browse files Browse the repository at this point in the history
  • Loading branch information
pragmaxim committed Jul 1, 2024
1 parent 9433759 commit dfd267f
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 7 deletions.
6 changes: 3 additions & 3 deletions benches/broadcast_sink_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use broadcast_sink::{Consumer, StreamBroadcastSinkExt};
use broadcast_sink::{BroadcastSinkError, Consumer, StreamBroadcastSinkExt};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use futures::stream::{self, Stream};
use std::sync::{Arc, RwLock};
Expand All @@ -21,7 +21,7 @@ impl MultiplyX {
}

impl Consumer<u64> for MultiplyX {
fn consume(&mut self, _: &u64) -> Result<(), &'static str> {
fn consume(&mut self, _: &u64) -> Result<(), BroadcastSinkError> {
let mut x = self.state.x.write().unwrap();
*x *= 5;
println!("Consumer 1 processed item");
Expand All @@ -40,7 +40,7 @@ impl MultiplyY {
}

impl Consumer<u64> for MultiplyY {
fn consume(&mut self, _: &u64) -> Result<(), &'static str> {
fn consume(&mut self, _: &u64) -> Result<(), BroadcastSinkError> {
let mut y = self.state.y.write().unwrap();
*y *= 10;
println!("Consumer 2 processed item");
Expand Down
13 changes: 9 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@ use tokio::sync::{broadcast, Barrier, Mutex};
use tokio::task;
use tokio_stream::wrappers::BroadcastStream;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BroadcastSinkError {
message: String,
}

pub trait Consumer<T>: Send + Sync {
fn consume(&mut self, item: &T) -> Result<(), &'static str>;
fn consume(&mut self, item: &T) -> Result<(), BroadcastSinkError>;
}

pin_project! {
Expand Down Expand Up @@ -63,7 +68,7 @@ where
while let Some(Ok(item)) = stream.next().await {
let mut consumer = consumer.lock().await;
if let Err(e) = consumer.consume(&item) {
error!("BroadcastSink consumer error occurred: {:?}", e);
error!("BroadcastSink consumer error occurred: {:?}", e.message);
}
barrier_clone.wait().await;
active_count_clone.fetch_sub(1, Ordering::SeqCst);
Expand Down Expand Up @@ -148,7 +153,7 @@ mod tests {
}

impl Consumer<u64> for MultiplyX {
fn consume(&mut self, _: &u64) -> Result<(), &'static str> {
fn consume(&mut self, _: &u64) -> Result<(), BroadcastSinkError> {
let mut x = self.state.x.write().unwrap();
*x *= 5;
println!("Consumer X processed item");
Expand All @@ -167,7 +172,7 @@ mod tests {
}

impl Consumer<u64> for MultiplyY {
fn consume(&mut self, _: &u64) -> Result<(), &'static str> {
fn consume(&mut self, _: &u64) -> Result<(), BroadcastSinkError> {
let mut y = self.state.y.write().unwrap();
*y *= 10;
println!("Consumer Y processed item");
Expand Down

0 comments on commit dfd267f

Please sign in to comment.