Skip to content

Commit

Permalink
chore: apply formatter
Browse files Browse the repository at this point in the history
Signed-off-by: Florentin Dubois <florentin.dubois@clever-cloud.com>
  • Loading branch information
FlorentinDUBOIS committed Dec 18, 2024
1 parent ae8b4d5 commit e33ffba
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 23 deletions.
10 changes: 8 additions & 2 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,10 @@ impl<Exe: Executor> ConnectionSender<Exe> {
response
.await
.map_err(|oneshot::Canceled| {
error!("response has been canceled (key = {:?}), we are disconnected", k);
error!(
"response has been canceled (key = {:?}), we are disconnected",
k
);
error.set(ConnectionError::Disconnected);
ConnectionError::Disconnected
})
Expand All @@ -674,7 +677,10 @@ impl<Exe: Executor> ConnectionSender<Exe> {
let connection_id = self.connection_id;
let error = self.error.clone();
let delay_f = self.executor.delay(self.operation_timeout);
trace!("Create timeout futures with operation timeout at {:?}", self.operation_timeout);
trace!(
"Create timeout futures with operation timeout at {:?}",
self.operation_timeout
);
let fut = async move {
pin_mut!(response);
pin_mut!(delay_f);
Expand Down
4 changes: 1 addition & 3 deletions src/consumer/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,7 @@ impl<Exe: Executor> ConsumerBuilder<Exe> {
// Checks the builder for inconsistencies
// returns a config and a list of topics with associated brokers
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
async fn validate(
self,
) -> Result<(ConsumerConfig, Vec<(String, BrokerAddress)>), Error> {
async fn validate(self) -> Result<(ConsumerConfig, Vec<(String, BrokerAddress)>), Error> {
let ConsumerBuilder {
pulsar,
topics,
Expand Down
28 changes: 19 additions & 9 deletions src/consumer/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,17 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
})?;
}

// debug!("self.remaining_messages ({}) < self.batch_size.div_ceil(2) ({}) = {}", self.remaining_messages, self.batch_size.div_ceil(2), self.remaining_messages < self.batch_size.div_ceil(2));
// debug!("self.remaining_messages ({}) < self.batch_size.div_ceil(2) ({}) = {}",
// self.remaining_messages, self.batch_size.div_ceil(2), self.remaining_messages <
// self.batch_size.div_ceil(2));
if self.remaining_messages < (self.batch_size.div_ceil(2) as i64) {
match self
.connection
.sender()
.send_flow(self.id, (self.batch_size as i64 - self.remaining_messages) as u32)
.send_flow(
self.id,
(self.batch_size as i64 - self.remaining_messages) as u32,
)
.await
{
Ok(()) => {}
Expand All @@ -165,20 +170,22 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
// channel to be not full
Err(e) => {
error!("consumer engine: we got a unrecoverable connection error, {e}");
return Err(e.into())
},
return Err(e.into());
}
}

self.remaining_messages = self.batch_size as i64;
}

match Self::timeout(self.event_rx.next(), Duration::from_secs(1)).await {
Err(_) => {
// If you are reading this comment, you may have an issue where you have received a batched message
// that is greater that the batch size and then break the way that we send flow command message.
// If you are reading this comment, you may have an issue where you have
// received a batched message that is greater that the batch
// size and then break the way that we send flow command message.
//
// In that case, you could increase your batch size or patch this driver by adding the following line,
// if you are sure that you have at least 1 incoming message per second.
// In that case, you could increase your batch size or patch this driver by
// adding the following line, if you are sure that you have
// at least 1 incoming message per second.
//
// ```rust
// self.remaining_messages = 0;
Expand Down Expand Up @@ -221,7 +228,10 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
.payload
.as_ref()
.and_then(|payload| {
debug!("Consumer: received message payload, num_messages_in_batch = {:?}", payload.metadata.num_messages_in_batch);
debug!(
"Consumer: received message payload, num_messages_in_batch = {:?}",
payload.metadata.num_messages_in_batch
);
payload.metadata.num_messages_in_batch
})
.unwrap_or(1) as i64;
Expand Down
10 changes: 3 additions & 7 deletions src/consumer/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,12 +323,8 @@ impl<T: DeserializeMessage, Exe: Executor> Stream for TopicConsumer<T, Exe> {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.messages.as_mut().poll_next(cx) {
Poll::Pending => {
Poll::Pending
},
Poll::Ready(None) => {
Poll::Ready(None)
},
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok((id, payload)))) => {
self.last_message_received = Some(Utc::now());
self.messages_received += 1;
Expand All @@ -337,7 +333,7 @@ impl<T: DeserializeMessage, Exe: Executor> Stream for TopicConsumer<T, Exe> {
Poll::Ready(Some(Err(e))) => {
error!("we are using in the single-consumer and we got an error, {e}");
Poll::Ready(Some(Err(e)))
},
}
}
}
}
4 changes: 2 additions & 2 deletions src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,8 @@ impl<Exe: Executor> Producer<Exe> {
/// this function returns a `SendFuture` because the receipt can come long after
/// this function was called, for various reasons:
/// - the message was sent successfully but Pulsar did not send the receipt yet
/// - the producer is batching messages, so this function must return immediately,
/// and the receipt will come when the batched messages are actually sent
/// - the producer is batching messages, so this function must return immediately, and the
/// receipt will come when the batched messages are actually sent
///
/// Usage:
///
Expand Down

0 comments on commit e33ffba

Please sign in to comment.