Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[libbeat] Fix Kafka output "circuit breaker is open" errors #23484

Merged
merged 6 commits into from
Jan 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix `nested` subfield handling in generated Elasticsearch templates. {issue}23178[23178] {pull}23183[23183]
- Fix CPU usage metrics on VMs with dynamic CPU config {pull}23154[23154]
- Fix panic due to unhandled DeletedFinalStateUnknown in k8s OnDelete {pull}23419[23419]
- Fix error loop with runaway CPU use when the Kafka output encounters some connection errors {pull}23484[23484]

*Auditbeat*

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ require (
github.com/dop251/goja v0.0.0-20200831102558-9af81ddcf0e1
github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6
github.com/dustin/go-humanize v1.0.0
github.com/eapache/go-resiliency v1.2.0
github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2
github.com/elastic/ecs v1.6.0
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a
Expand Down
96 changes: 95 additions & 1 deletion libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/Shopify/sarama"
"github.com/eapache/go-resiliency/breaker"

"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/common/transport"
Expand All @@ -47,6 +49,7 @@ type client struct {
codec codec.Codec
config sarama.Config
mux sync.Mutex
done chan struct{}

producer sarama.AsyncProducer

Expand Down Expand Up @@ -85,6 +88,7 @@ func newKafkaClient(
index: strings.ToLower(index),
codec: writer,
config: *cfg,
done: make(chan struct{}),
}
return c, nil
}
Expand Down Expand Up @@ -121,6 +125,7 @@ func (c *client) Close() error {
return nil
}

close(c.done)
c.producer.AsyncClose()
c.wg.Wait()
c.producer = nil
Expand Down Expand Up @@ -237,12 +242,92 @@ func (c *client) successWorker(ch <-chan *sarama.ProducerMessage) {
}

func (c *client) errorWorker(ch <-chan *sarama.ProducerError) {
breakerOpen := false
defer c.wg.Done()
defer c.log.Debug("Stop kafka error handler")

for errMsg := range ch {
msg := errMsg.Msg.Metadata.(*message)
msg.ref.fail(msg, errMsg.Err)

if errMsg.Err == breaker.ErrBreakerOpen {
// ErrBreakerOpen is a very special case in Sarama. It happens only when
// there have been repeated critical (broker / topic-level) errors, and it
// puts Sarama into a state where it immediately rejects all input
// for 10 seconds, ignoring retry / backoff settings.
// With this output's current design (in which Publish passes through to
// Sarama's input channel with no further synchronization), retrying
// these failed values causes an infinite retry loop that degrades
// the entire system.
// "Nice" approaches and why we haven't used them:
// - Use exposed API to navigate this state and its effect on retries.
// * Unfortunately, Sarama's circuit breaker and its errors are
// hard-coded and undocumented. We'd like to address this in the
// future.
// - If a batch fails with a circuit breaker error, delay before
// retrying it.
// * This would fix the most urgent performance issues, but requires
// extra bookkeeping because the Kafka output handles each batch
// independently. It results in potentially many batches / 10s of
// thousands of events being loaded and attempted, even though we
// know there's a fatal error early in the first batch. It also
// makes it hard to know when each batch should be retried.
// - In the Kafka Publish method, add a blocking first-pass intake step
// that can gate on error conditions, rather than handing off data
// to Sarama immediately.
// * This would fix the issue but would require a lot of work and
// testing, and we need a fix for the release now. It's also a
// fairly elaborate workaround for something that might be
// easier to fix in the library itself.
//
// Instead, we have applied the following fix, which is not very "nice"
// but satisfies all other important constraints:
// - When we receive a circuit breaker error, sleep for 10 seconds
// (Sarama's hard-coded timeout) on the _error worker thread_.
//
// This works because connection-level errors that can trigger the
// circuit breaker are on the critical path for input processing, and
// thus blocking on the error channel applies back-pressure to the
// input channel. This means that if there are any more errors while the
// error worker is asleep, any call to Publish will block until we
// start reading again.
//
// Reasons this solution is preferred:
// - It responds immediately to Sarama's global error state, rather than
// trying to detect it independently in each batch or adding more
// cumbersome synchronization to the output
// - It gives the minimal delay that is consistent with Sarama's
// internal behavior
// - It requires only a few lines of code and no design changes
//
// That said, this is still relying on undocumented library internals
// for correct behavior, which isn't ideal, but the error itself is an
// undocumented library internal, so this is de facto necessary for now.
// We'd like to have a more official / permanent fix merged into Sarama
// itself in the future.

// The "breakerOpen" flag keeps us from sleeping the first time we see
// a circuit breaker error, because it might be an old error still
// sitting in the channel from 10 seconds ago. So we only end up
// sleeping every _other_ reported breaker error.
if breakerOpen {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can there be multiple breaker errors in the channel? If so we might consider to add a timestamp in fail and ignore errors based on the timestamp.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered that, but no, the channel is synchronous, and there's no way to tell when the error was "really" sent except to read from it continually. The timestamp of a breaker error would need to be tracked in the client struct itself rather than say msgRef, which would require more synchronization in updating it etc since it's shared between all batches. And if we didn't want to block the error channel anyway at the end of a batch, we'd need to spawn a new goroutine to dispatch delayed retries, and in the meantime there would be nothing stopping Publish from continuing to spam new data / errors...

The upshot is, we can't just ignore the errors (because reading them at all is what enables the infinite loop), and adding a timestamp doesn't help much since, invariably, the only thing we ever want to do is wait precisely 10 seconds before allowing the next input, starting as soon as we become aware of the error. I tried timestamp-based approaches while diagnosing this, and ended up with worse performance and more complicated code.

// Immediately log the error that presumably caused this state,
// since the error reporting on this batch will be delayed.
if msg.ref.err != nil {
c.log.Errorf("Kafka (topic=%v): %v", msg.topic, msg.ref.err)
}
select {
case <-time.After(10 * time.Second):
Copy link
Contributor

@kvch kvch Jan 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't using time.After going to cause memory issues? According to its documentation:

The underlying Timer is not recovered by the garbage collector until the timer fires. If efficiency is a concern, use NewTimer instead and call Timer.Stop if the timer is no longer needed.

I assume no, but I am wondering if you have considered this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That isn't a concern here since this can happen at most once globally every 10sec, after multiple other failures, and heap-allocating a timer at that granularity is negligible.

// Sarama's circuit breaker is hard-coded to reject all inputs
// for 10sec.
case <-msg.ref.client.done:
// Allow early bailout if the output itself is closing.
}
breakerOpen = false
} else {
breakerOpen = true
}
}
}
}

Expand All @@ -262,9 +347,18 @@ func (r *msgRef) fail(msg *message, err error) {
len(msg.key)+len(msg.value))
r.client.observer.Dropped(1)

case breaker.ErrBreakerOpen:
// Add this message to the failed list, but don't overwrite r.err since
// all the breaker error means is "there were a lot of other errors".
r.failed = append(r.failed, msg.data)

default:
r.failed = append(r.failed, msg.data)
r.err = err
if r.err == nil {
// Don't overwrite an existing error. This way at tne end of the batch
// we report the first error that we saw, rather than the last one.
r.err = err
}
}
r.dec()
}
Expand Down