Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Block the sending of control messages #319

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ impl<Exe: Executor> ConnectionSender<Exe> {
match (
self.registrations
.unbounded_send(Register::Ping { resolver }),
self.tx.try_send(messages::ping())?,
self.tx.send(messages::ping()).await?,
) {
(Ok(_), ()) => {
let delay_f = self.executor.delay(self.operation_timeout);
Expand Down Expand Up @@ -526,35 +526,42 @@ impl<Exe: Executor> ConnectionSender<Exe> {
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn send_flow(&self, consumer_id: u64, message_permits: u32) -> Result<(), ConnectionError> {
pub async fn send_flow(
&self,
consumer_id: u64,
message_permits: u32,
) -> Result<(), ConnectionError> {
self.tx
.try_send(messages::flow(consumer_id, message_permits))?;
.send(messages::flow(consumer_id, message_permits))
.await?;
Ok(())
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn send_ack(
pub async fn send_ack(
&self,
consumer_id: u64,
message_ids: Vec<proto::MessageIdData>,
cumulative: bool,
) -> Result<(), ConnectionError> {
self.tx
.try_send(messages::ack(consumer_id, message_ids, cumulative))?;
.send(messages::ack(consumer_id, message_ids, cumulative))
.await?;
Ok(())
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn send_redeliver_unacknowleged_messages(
pub async fn send_redeliver_unacknowleged_messages(
&self,
consumer_id: u64,
message_ids: Vec<proto::MessageIdData>,
) -> Result<(), ConnectionError> {
self.tx
.try_send(messages::redeliver_unacknowleged_messages(
.send(messages::redeliver_unacknowleged_messages(
consumer_id,
message_ids,
))?;
))
.await?;
Ok(())
}

Expand Down
26 changes: 14 additions & 12 deletions src/consumer/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,19 +151,18 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
.connection
.sender()
.send_flow(self.id, self.batch_size - self.remaining_messages)
.await
{
Ok(()) => {}
Err(ConnectionError::Disconnected) => {
self.reconnect().await?;
self.connection
.sender()
.send_flow(self.id, self.batch_size - self.remaining_messages)?;
}
Err(ConnectionError::SlowDown) => {
self.connection
.sender()
.send_flow(self.id, self.batch_size - self.remaining_messages)?;
.send_flow(self.id, self.batch_size - self.remaining_messages)
.await?;
}
// we don't need to handle the SlowDown error, since send_flow waits on the
// channel to be not full
Err(e) => return Err(e.into()),
}
self.remaining_messages = self.batch_size;
Expand All @@ -178,7 +177,7 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
}
}
Ok(Some(EngineEvent::EngineMessage(msg))) => {
let continue_loop = self.handle_ack_opt(msg);
let continue_loop = self.handle_ack_opt(msg).await;
if !continue_loop {
return Ok(());
}
Expand Down Expand Up @@ -225,21 +224,22 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
}
}

fn handle_ack_opt(&mut self, ack_opt: Option<EngineMessage<Exe>>) -> bool {
async fn handle_ack_opt(&mut self, ack_opt: Option<EngineMessage<Exe>>) -> bool {
match ack_opt {
None => {
trace!("ack channel was closed");
false
}
Some(EngineMessage::Ack(message_id, cumulative)) => {
self.ack(message_id, cumulative);
self.ack(message_id, cumulative).await;
true
}
Some(EngineMessage::Nack(message_id)) => {
if let Err(e) = self
.connection
.sender()
.send_redeliver_unacknowleged_messages(self.id, vec![message_id.clone()])
.await
{
error!(
"could not ask for redelivery for message {:?}: {:?}",
Expand All @@ -265,6 +265,7 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
.connection
.sender()
.send_redeliver_unacknowleged_messages(self.id, ids)
.await
{
error!("could not ask for redelivery: {:?}", e);
} else {
Expand All @@ -288,13 +289,14 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
fn ack(&mut self, message_id: MessageIdData, cumulative: bool) {
async fn ack(&mut self, message_id: MessageIdData, cumulative: bool) {
// FIXME: this does not handle cumulative acks
self.unacked_messages.remove(&message_id);
let res = self
.connection
.sender()
.send_ack(self.id, vec![message_id], cumulative);
.send_ack(self.id, vec![message_id], cumulative)
.await;
if res.is_err() {
error!("ack error: {:?}", res);
}
Expand Down Expand Up @@ -510,7 +512,7 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
Error::Custom("DLQ send error".to_string())
})?;

self.ack(message_id, false);
self.ack(message_id, false).await;
}
_ => self.send_to_consumer(message_id, payload).await?,
}
Expand Down
7 changes: 7 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,13 @@ impl From<AuthenticationError> for ConnectionError {
}
}

impl<T> From<async_channel::SendError<T>> for ConnectionError {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
fn from(_err: async_channel::SendError<T>) -> Self {
ConnectionError::Disconnected
}
}

impl<T> From<async_channel::TrySendError<T>> for ConnectionError {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
fn from(err: async_channel::TrySendError<T>) -> Self {
Expand Down
1 change: 1 addition & 0 deletions src/retry_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ pub async fn retry_subscribe_consumer<Exe: Executor>(
connection
.sender()
.send_flow(consumer_id, batch_size)
.await
.map_err(|err| {
error!("TopicConsumer::send_flow({topic}) error: {err:?}");
Error::Consumer(ConsumerError::Connection(err))
Expand Down
Loading