-
Notifications
You must be signed in to change notification settings - Fork 613
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature Request: Add delivery attempt count to RabbitMQ DLQ messages #2939
Comments
We currently don’t set any such a header and fully rely on whatever broker gives us in that |
As mentioned above, RabbitMQ 13.0 has stopped incrementing count parameter in x-death header. So there is no way for application to determine how many times a messages has landed in DLQ |
OK. I see now. So, reopening respective Spring AMQP issue: spring-projects/spring-amqp#2688. Not sure, though, what you mean about fixing Rabbit Binder, but probably that fails just to the docs as we discussing in that issue. Thanks |
@samragu , See the comment from RabbitMQ team member: spring-projects/spring-amqp#2688 (comment). So, an Either way something has to be fixed. What is good that it is only docs on Spring Cloud Stream side. |
I wanted to talk about a couple of issues that popped up because of something I did. I've been using this code within Consumer bean impl to make N attempts to handle Message<*>: @Service
class EventHandleService {
fun handleEvent(
message: Message<out Any>,
maxAttempts: Int,
handler: () -> Unit
) {
try {
handler.invoke()
} catch (e: Exception) {
if (maxAttempts == 1 || getCurrentAttempt(message) > maxAttempts - 1) {
// drops message out from the queue
throw ImmediateAcknowledgeAmqpException("Failed after max attempts exceed")
} else {
// rethrowing exception must put message into DLQ
throw e
}
}
}
private fun getCurrentAttempt(message: Message<out Any>): Long {
val xDeath = message.headers["x-death"]
val currentAttempt = if (xDeath == null) {
0L
} else {
((xDeath as ArrayList<*>)[0] as Map<*, *>)["count"] as Long
}
return currentAttempt + 1
}
} We're counting on something called the "x-death" header, as the docs say. We're expecting either Spring or Rabbit to bump up a counter whenever something goes wrong. I've been testing this out with a real Rabbit setup using testcontainers, and everything was going smoothly until Rabbit got updated to version 3.13.x. Suddenly, if a message failed once, it got stuck in a never-ending loop. After digging around, I noticed that the counter in the "x-death" header was being updated randomly. I'm not sure if I'm misinterpreting the documentation or if my implementation of the retry mechanism is flawed. Please let me know if the issue lies with my approach. |
@dvovney can you please create a GitHub repo with a minimal reproducible example so that I can run your code with a single command on my local machine? In your GitHub repo's README, please also include the Java version + Erlang/OTP version you're using. |
@dvovney , as you said yourself, RabbitMQ starting with version @ansd , correct me, please, if I have missed anything. Thanks |
@artembilan thank you for the feedback! looking forward for updated Spring Cloud Stream docs |
Just to be clear, you are proposing to add a new header (for example, spring-retry-count) with delivery count of the message incremented each time the message is delivered to DLQ, correct? Also, it would be nice if applications can intercept the message before Spring delivers the message to DLQ so that things like delivery count can be updated by application itself within the message as opposed to Spring updating a header |
No, I'm not proposing to add such a header since it turned out there is no any DLX retry components in the framework. Since you have raised this issue, it would be great to see the code how you deal with such retry over broker scenario. Thanks |
The code you referenced above is how many applications are handling retry and as you know that mechanism is broken since RabbitMQ 3.13. If you are going to remove that referenced code in documentation, the problem still remains. Applications throw uncaught exception into Spring framework and the framework catches those exceptions, sets up x-exception-stacktrace message header and routes the message to DLQ. Applications have no way of setting up any additional headers or update the message itself in those scenarios. When such messages are routed to their normal queue from DLQ and redelivered, applications cannot determine how many times the messages have been retried, so they are stuck in an infinite loop |
Yeah... Sorry, that was wrong sample code. All I read about this @ansd , |
Exactly, that's why we can close this issue. @samragu @dvovney as I wrote above: If you think, something doesn't work as expected, provide a GitHub repo with reproduction steps and open an issue in https://github.com/rabbitmq/rabbitmq-server/issues |
@ansd , here is a sample to confirm that we have a problem with When I change Docker image to Feel free to reach me offline to discuss details. Thanks |
Thanks for the repro steps @artembilan . Just for the non-Java people, this is how to run this code: I added
to https://github.com/artembilan/sandbox/blob/master/spring-cloud-stream-rabbit-dlq/src/test/resources/application.properties Thereafter, the test can be executed via I could repro that the test case succeeds in 3.12 and fails in 3.13. After adding some debug statements into the RabbitMQ server code, I could see that the Java client indeed publishes messages with the In other words, the following statement is not correct:
|
I see. Thanks. So, I was wrong. That
So, that's the one which has to be fixed for the mention Another thought: how about we manually increment that |
I don't think that's possible since the So, a custom header |
... alternatively, the Java client could consume the message from the |
Thanks for confirmation, @ansd ! So, here is a workaround for the current Spring Cloud Stream version:
And it is going to work just exactly as you explained and as I expect with native Spring AMQP logic. We may fix this issue with respective docs |
Well, looking into this one more time and having the fact that current |
…ver retries Fixes: spring-cloud#2939 The RabbitMQ 4.0 does not deal with client side `x-*` headers. Therefore, an `x-death.count` is not incremented anymore when message is re-published from client back to the broker. * Spring AMQP 3.2 has introduced an `AmqpHeaders.RETRY_COUNT` custom header. Use `messageProperties.incrementRetryCount()` in the `RabbitMessageChannelBinder` when we re-published message back to the broker for server-side retries * Fix docs respectively
RabbitMQ sets x-death message headers that had a count parameter incremented each time a message was delivered to DLQ. The messages from DLQ went back to normal queue after TTL expired and we relied on the count parameter to retry a message so many times. Starting with RabbitMQ 3.13, RabbitMQ has discontinued support for this feature and making all x- headers as server headers. See rabbitmq/rabbitmq-server#10709 for details. Can SCSt set a similar header and increment the value each time the message is delivered to DLQ? The exception stack trace is already being set on these messages which are very useful for troubleshooting.
The text was updated successfully, but these errors were encountered: