-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[libbeat] Fix Kafka output "circuit breaker is open" errors #23484
Conversation
Pinging @elastic/integrations (Team:Integrations) |
💚 Build Succeeded
Expand to view the summary
Build stats
Test stats 🧪
💚 Flaky test reportTests succeeded. Expand to view the summary
Test stats 🧪
|
// a circuit breaker error, because it might be an old error still | ||
// sitting in the channel from 10 seconds ago. So we only end up | ||
// sleeping every _other_ reported breaker error. | ||
if breakerOpen { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can there be multiple breaker errors in the channel? If so we might consider to add a timestamp in fail
and ignore errors based on the timestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered that, but no, the channel is synchronous, and there's no way to tell when the error was "really" sent except to read from it continually. The timestamp of a breaker error would need to be tracked in the client struct itself rather than say msgRef, which would require more synchronization in updating it etc since it's shared between all batches. And if we didn't want to block the error channel anyway at the end of a batch, we'd need to spawn a new goroutine to dispatch delayed retries, and in the meantime there would be nothing stopping Publish from continuing to spam new data / errors...
The upshot is, we can't just ignore the errors (because reading them at all is what enables the infinite loop), and adding a timestamp doesn't help much since, invariably, the only thing we ever want to do is wait precisely 10 seconds before allowing the next input, starting as soon as we become aware of the error. I tried timestamp-based approaches while diagnosing this, and ended up with worse performance and more complicated code.
libbeat/outputs/kafka/client.go
Outdated
if msg.ref.err != nil { | ||
c.log.Errorf("Kafka (topic=%v): %v", msg.topic, msg.ref.err) | ||
} | ||
time.Sleep(10 * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case sleeps accumulate we would like some mechanism to return early if the input is stopped. Right now we only return once the error channel is closed.
e.g. https://github.com/elastic/go-concert/blob/master/timed/timed.go#L39
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
c.log.Errorf("Kafka (topic=%v): %v", msg.topic, msg.ref.err) | ||
} | ||
select { | ||
case <-time.After(10 * time.Second): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't using time.After
going to cause memory issues? According to its documentation:
The underlying Timer is not recovered by the garbage collector until the timer fires. If efficiency is a concern, use NewTimer instead and call Timer.Stop if the timer is no longer needed.
I assume no, but I am wondering if you have considered this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That isn't a concern here since this can happen at most once globally every 10sec, after multiple other failures, and heap-allocating a timer at that granularity is negligible.
Pinging @elastic/agent (Team:Agent) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change will make our lives much easier. Thank you so much for working on it!
@faec I just installed Filebeat 7.11.2 (I need a couple of months before upgrading to 8.x) to avoid this issue. However, it seems that while Kafka (or the target topic) isn't available, each new log entry will still trigger a warning and all those events will be skipped when Kafka (or the target topic) will be available again. P.s.: I followed up in this issue, because the Elastic community doesn't answer for any Kafka related Qs nor did I got an answer on GH to my original issue until I found this one. Edit: ACKs are set to "-1" |
What does this PR do?
This is a near-fix for #22437. It doesn't strictly respect exponential backoff configuration during a connection error (since the whole nature of the bug is that Sarama in some contexts ignores exponential backoff configuration), but it brings our error reporting and backoff behavior in line with Sarama's and prevents the CPU explosion we were seeing on connection-level Sarama errors.
The particular approach of applying back pressure to Sarama is a little questionable: sleeping on the error reporting thread when we detect that Sarama's circuit breaker has gone off. Most of this PR is an extended comment explaining why that works and why I settled on that approach. Ideally in the future we can get Sarama's error handling behavior better defined / documented, so we can use a more official / supported API mechanism in a future release.
Checklist
I have made corresponding changes to the documentationI have made corresponding change to the default configuration filesI have added tests that prove my fix is effective or that my feature worksCHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.How to test this PR locally
Run Filebeat with any input and the following output configuration (and no kafka server):
Before this PR is applied, this produces an infinite loop of error messages, consuming as much CPU as the system allows. With the PR applied, this merely produces the following error at ~10s intervals: