Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to log errors if "Retrying Complete Batches" happen #2395

Closed
marcin-waldowski-rewe-digital opened this issue Sep 16, 2022 · 5 comments · Fixed by #2405
Closed

Allow to log errors if "Retrying Complete Batches" happen #2395

marcin-waldowski-rewe-digital opened this issue Sep 16, 2022 · 5 comments · Fixed by #2405

Comments

@marcin-waldowski-rewe-digital

Expected Behavior

Allow to extend or configure DefaultErrorHandler (which use FixedBackOff.UNLIMITED_ATTEMPTS) in such way that if "Retrying Complete Batches" happen (see spring-kafka documentation) then it is possible to log transient error.

Current Behavior

Spring-kafka logs error only if reties are exhausted. If applications have to retry forever in case of transient error (for example some external application is not available but is needed to process each message) then no errors are logged at all.

There are two solutions for that problem:

  • catch, log and re-throw errors in method annotated with @KafkaListener,
  • extend DefaultErrorHandler and log errors inside of overwritten methods.

The second method works perfectly except when "Retrying Complete Batches" takes place. In this case DefaultErrorHandler methods are not called at all.

Context

This issue stops my team from upgrading from spring-kafka 2.7.* to 2.8.*.
I would prefer to use second solution described above without using fist solution.
I did not find any alternatives to achieve same goal.
The only workaround I found is to use second method and log non-BatchListenerFailedException errors in method annotated with @KafkaListener.

@garyrussell
Copy link
Contributor

This issue stops my team from upgrading from spring-kafka 2.7.* to 2.8.*.

Are you saying it worked in 2.7.x?

@marcin-waldowski-rewe-digital
Copy link
Author

marcin-waldowski-rewe-digital commented Sep 19, 2022

Yes, it worked in 2.7.x but instead of extending DefaultErrorHandler we extended SeekToCurrentErrorHandler for single listeners and RecoveringBatchErrorHandler for batch listeners (DefaultErrorHandler did not exist in 2.7.x). Also "Retrying Complete Batches" was introduced in 2.8.x (I cannot find that phrase in 2.7.x documentation).

In 2.7.x we were able to achieve our goal which is to log transient errors (other than BatchListenerFailedException) while retrying forever in batch listener. In version 2.8.x we found it possible only using workaround described in my previous post.

@garyrussell
Copy link
Contributor

Just for background...

The equivalent of the fallback behavior (when not throwing a BatchListenerFailedException) was provided by the RetryingBatchErrorHandler (this predated the RecoveringBatchErrorHandler). I don't think it logged either.

The RecoveringBatchErrorHandler logic is in the DefaultErrorHandler for batch listeners and it falls back to the FallbackBatchErrorHandler (which is the RetryingBatchErrorHandler, but renamed in 2.8/2.9).

The preferred mechanism is to throw the BatchListenerFailedException; otherwise you end up retrying previously successful records over and over again. Of course, if you are doing a batch DB insert, then that's exactly what you want.

I have no issues providing a hook for you to log transient errors in this case, I just wanted to understand.

@marcin-waldowski-rewe-digital
Copy link
Author

marcin-waldowski-rewe-digital commented Sep 20, 2022

Thank you.

Sure! I guess you have some doubts that RecoveringBatchErrorHandler in version 2.7.x can log error in such case. I tested it again with spring-kafka 2.7.12.

In class RetryBatchErrorHandler I overwrote RecoveringBatchErrorHandler method as follows:

@Override
public void handle(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer, MessageListenerContainer container) {

    try {
        super.handle(thrownException, records, consumer, container);
    } catch (KafkaException ke) {

        boolean transientException = getClassifier().classify(ke);
        String exceptionTypeInfo = transientException ? "Transient error! " : "Message dropped! Non-transient error! ";
        String offsetInfo = createPartitionsOffsetsInfo(records);
        throw new KafkaException(exceptionTypeInfo + offsetInfo, thrownException);
    }
}

And with such error handler I can catch throw new IllegalStateException("Boom!") - here is stacktrace:

18:22:03.302 [org.springframework.kafka.KafkaListenerEndpointContainer#4-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Error handler threw an exception
org.springframework.kafka.KafkaException: Transient error! my_topic:0@94-94; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.mypackage.MyConsumer.processBatch(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>>)' threw exception; nested exception is java.lang.IllegalStateException: Boom!; nested exception is java.lang.IllegalStateException: Boom!
	at com.mypackage.exception.RetryBatchErrorHandler.handle(RetryBatchErrorHandler.java:102)
	at org.springframework.kafka.listener.ContainerAwareBatchErrorHandler.handle(ContainerAwareBatchErrorHandler.java:56)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2022)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1857)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1723)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1702)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1276)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1268)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1163)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.mypackage.MyConsumer.processBatch(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>>)' threw exception; nested exception is java.lang.IllegalStateException: Boom!; nested exception is java.lang.IllegalStateException: Boom!
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2383)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2015)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:1985)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:1928)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1840)
	... 8 common frames omitted
	Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
		at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363)
		at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180)
		at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172)
		at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61)
		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:1995)
Caused by: java.lang.IllegalStateException: Boom!
	at com.mypackage.MyConsumer.processBatch(MyConsumer.java:134)

garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Sep 21, 2022
Resolves spring-projects#2395

Previously, `RetryListener` only supported record listeners; add methods for
retrying batches.

**cherry-pick to 2.9.x, 2.8.x**
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Sep 21, 2022
Resolves spring-projects#2395

Previously, `RetryListener` only supported record listeners; add methods for
retrying batches.

**cherry-pick to 2.9.x, 2.8.x**
artembilan pushed a commit that referenced this issue Sep 21, 2022
Resolves #2395

Previously, `RetryListener` only supported record listeners; add methods for
retrying batches.

**cherry-pick to 2.9.x, 2.8.x**
artembilan pushed a commit that referenced this issue Sep 21, 2022
Resolves #2395

Previously, `RetryListener` only supported record listeners; add methods for
retrying batches.

**cherry-pick to 2.9.x, 2.8.x**

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java
artembilan pushed a commit that referenced this issue Sep 21, 2022
Resolves #2395

Previously, `RetryListener` only supported record listeners; add methods for
retrying batches.

**cherry-pick to 2.9.x, 2.8.x**

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java
@marcin-waldowski-rewe-digital
Copy link
Author

@garyrussell I built spring-kafka from 2.8.x branch (5df2568) and I managed to log error using new method in RetryListener. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants