diff --git a/src/connection.rs b/src/connection.rs index 13eaf08..c2a63a6 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -351,7 +351,7 @@ impl ConnectionSender { match ( self.registrations .unbounded_send(Register::Ping { resolver }), - self.tx.try_send(messages::ping())?, + self.tx.send(messages::ping()).await?, ) { (Ok(_), ()) => { let delay_f = self.executor.delay(self.operation_timeout); @@ -526,35 +526,42 @@ impl ConnectionSender { } #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] - pub fn send_flow(&self, consumer_id: u64, message_permits: u32) -> Result<(), ConnectionError> { + pub async fn send_flow( + &self, + consumer_id: u64, + message_permits: u32, + ) -> Result<(), ConnectionError> { self.tx - .try_send(messages::flow(consumer_id, message_permits))?; + .send(messages::flow(consumer_id, message_permits)) + .await?; Ok(()) } #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] - pub fn send_ack( + pub async fn send_ack( &self, consumer_id: u64, message_ids: Vec, cumulative: bool, ) -> Result<(), ConnectionError> { self.tx - .try_send(messages::ack(consumer_id, message_ids, cumulative))?; + .send(messages::ack(consumer_id, message_ids, cumulative)) + .await?; Ok(()) } #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] - pub fn send_redeliver_unacknowleged_messages( + pub async fn send_redeliver_unacknowleged_messages( &self, consumer_id: u64, message_ids: Vec, ) -> Result<(), ConnectionError> { self.tx - .try_send(messages::redeliver_unacknowleged_messages( + .send(messages::redeliver_unacknowleged_messages( consumer_id, message_ids, - ))?; + )) + .await?; Ok(()) } diff --git a/src/consumer/engine.rs b/src/consumer/engine.rs index 0742a9e..cc1d7b2 100644 --- a/src/consumer/engine.rs +++ b/src/consumer/engine.rs @@ -151,19 +151,18 @@ impl ConsumerEngine { .connection .sender() .send_flow(self.id, self.batch_size - self.remaining_messages) + .await { Ok(()) => {} Err(ConnectionError::Disconnected) => { self.reconnect().await?; self.connection .sender() - .send_flow(self.id, self.batch_size - self.remaining_messages)?; - } - Err(ConnectionError::SlowDown) => { - self.connection - .sender() - .send_flow(self.id, self.batch_size - self.remaining_messages)?; + .send_flow(self.id, self.batch_size - self.remaining_messages) + .await?; } + // we don't need to handle the SlowDown error, since send_flow waits on the + // channel to be not full Err(e) => return Err(e.into()), } self.remaining_messages = self.batch_size; @@ -178,7 +177,7 @@ impl ConsumerEngine { } } Ok(Some(EngineEvent::EngineMessage(msg))) => { - let continue_loop = self.handle_ack_opt(msg); + let continue_loop = self.handle_ack_opt(msg).await; if !continue_loop { return Ok(()); } @@ -225,14 +224,14 @@ impl ConsumerEngine { } } - fn handle_ack_opt(&mut self, ack_opt: Option>) -> bool { + async fn handle_ack_opt(&mut self, ack_opt: Option>) -> bool { match ack_opt { None => { trace!("ack channel was closed"); false } Some(EngineMessage::Ack(message_id, cumulative)) => { - self.ack(message_id, cumulative); + self.ack(message_id, cumulative).await; true } Some(EngineMessage::Nack(message_id)) => { @@ -240,6 +239,7 @@ impl ConsumerEngine { .connection .sender() .send_redeliver_unacknowleged_messages(self.id, vec![message_id.clone()]) + .await { error!( "could not ask for redelivery for message {:?}: {:?}", @@ -265,6 +265,7 @@ impl ConsumerEngine { .connection .sender() .send_redeliver_unacknowleged_messages(self.id, ids) + .await { error!("could not ask for redelivery: {:?}", e); } else { @@ -288,13 +289,14 @@ impl ConsumerEngine { } #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] - fn ack(&mut self, message_id: MessageIdData, cumulative: bool) { + async fn ack(&mut self, message_id: MessageIdData, cumulative: bool) { // FIXME: this does not handle cumulative acks self.unacked_messages.remove(&message_id); let res = self .connection .sender() - .send_ack(self.id, vec![message_id], cumulative); + .send_ack(self.id, vec![message_id], cumulative) + .await; if res.is_err() { error!("ack error: {:?}", res); } @@ -510,7 +512,7 @@ impl ConsumerEngine { Error::Custom("DLQ send error".to_string()) })?; - self.ack(message_id, false); + self.ack(message_id, false).await; } _ => self.send_to_consumer(message_id, payload).await?, } diff --git a/src/error.rs b/src/error.rs index b9df7e9..02c7632 100644 --- a/src/error.rs +++ b/src/error.rs @@ -156,6 +156,13 @@ impl From for ConnectionError { } } +impl From> for ConnectionError { + #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] + fn from(_err: async_channel::SendError) -> Self { + ConnectionError::Disconnected + } +} + impl From> for ConnectionError { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn from(err: async_channel::TrySendError) -> Self { diff --git a/src/retry_op.rs b/src/retry_op.rs index 74f3f52..76244d8 100644 --- a/src/retry_op.rs +++ b/src/retry_op.rs @@ -140,6 +140,7 @@ pub async fn retry_subscribe_consumer( connection .sender() .send_flow(consumer_id, batch_size) + .await .map_err(|err| { error!("TopicConsumer::send_flow({topic}) error: {err:?}"); Error::Consumer(ConsumerError::Connection(err))