Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bulk_indexer: track all response status codes #177

Merged
merged 3 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 24 additions & 22 deletions appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,21 +348,30 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
}

if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
a.addCount(int64(n),
nil,
a.addCount(int64(n), nil,
a.metrics.docsIndexed,
metric.WithAttributes(attribute.String("status", "Timeout")),
)
}

var errTooMany errorTooManyRequests
// 429 may be returned as errors from the bulk indexer.
if errors.As(err, &errTooMany) {
a.addCount(int64(n),
&a.tooManyRequests,
a.metrics.docsIndexed,
metric.WithAttributes(attribute.String("status", "TooMany")),
)
// Bulk indexing may fail with different status codes.
var errFailed errorFlushFailed
if errors.As(err, &errFailed) {
var legacy *int64
var status string
switch {
case errFailed.tooMany:
legacy, status = &a.tooManyRequests, "TooMany"
case errFailed.clientError:
legacy, status = &a.docsFailedClient, "FailedClient"
case errFailed.serverError:
legacy, status = &a.docsFailedServer, "FailedServer"
}
if status != "" {
a.addCount(int64(n), legacy, a.metrics.docsIndexed,
metric.WithAttributes(attribute.String("status", status)),
)
}
}
return err
}
Expand Down Expand Up @@ -408,35 +417,28 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
}
if resp.RetriedDocs > 0 {
// docs are scheduled to be retried but not yet failed due to retry limit
a.addCount(resp.RetriedDocs,
nil,
a.metrics.docsRetried,
)
a.addCount(resp.RetriedDocs, nil, a.metrics.docsRetried)
}
if docsIndexed > 0 {
a.addCount(docsIndexed,
&a.docsIndexed,
a.addCount(docsIndexed, &a.docsIndexed,
a.metrics.docsIndexed,
metric.WithAttributes(attribute.String("status", "Success")),
)
}
if tooManyRequests > 0 {
a.addCount(tooManyRequests,
&a.tooManyRequests,
a.addCount(tooManyRequests, &a.tooManyRequests,
a.metrics.docsIndexed,
metric.WithAttributes(attribute.String("status", "TooMany")),
)
}
if clientFailed > 0 {
a.addCount(clientFailed,
&a.docsFailedClient,
a.addCount(clientFailed, &a.docsFailedClient,
a.metrics.docsIndexed,
metric.WithAttributes(attribute.String("status", "FailedClient")),
)
}
if serverFailed > 0 {
a.addCount(serverFailed,
&a.docsFailedServer,
a.addCount(serverFailed, &a.docsFailedServer,
a.metrics.docsIndexed,
metric.WithAttributes(attribute.String("status", "FailedServer")),
)
Expand Down
120 changes: 62 additions & 58 deletions appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,67 +671,71 @@ func TestAppenderFlushBytes(t *testing.T) {
}
}

func TestAppenderServerError(t *testing.T) {
var bytesTotal int64
var bytesUncompressedTotal int64
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
bytesTotal += r.ContentLength
_, _, stat := docappendertest.DecodeBulkRequestWithStats(r)
bytesUncompressedTotal += stat.UncompressedBytes
w.WriteHeader(http.StatusInternalServerError)
})
indexer, err := docappender.New(client, docappender.Config{FlushInterval: time.Minute})
require.NoError(t, err)
defer indexer.Close(context.Background())

addMinimalDoc(t, indexer, "logs-foo-testing")
func TestAppenderFlushRequestError(t *testing.T) {
// This test ensures that the appender correctly categorizes and quantifies
// failed requests with different failure scenarios. Since a bulk request
// contains N documents, the appender should increment the categorized
// failure by the same number of documents in the request.
test := func(t *testing.T, sc int, errMsg string) {
var bytesTotal int64
var bytesUncompressedTotal int64
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
bytesTotal += r.ContentLength
_, _, stat := docappendertest.DecodeBulkRequestWithStats(r)
bytesUncompressedTotal += stat.UncompressedBytes
w.WriteHeader(sc)
})
indexer, err := docappender.New(client, docappender.Config{FlushInterval: time.Minute})
require.NoError(t, err)
defer indexer.Close(context.Background())

// Closing the indexer flushes enqueued documents.
err = indexer.Close(context.Background())
require.EqualError(t, err, "flush failed: [500 Internal Server Error] ")
stats := indexer.Stats()
assert.Equal(t, docappender.Stats{
Added: 1,
Active: 0,
BulkRequests: 1,
Failed: 1,
AvailableBulkRequests: 10,
BytesTotal: bytesTotal,
BytesUncompressedTotal: bytesUncompressedTotal,
}, stats)
}
// Send 3 docs, ensure that metrics are always in the unit of documents, not requests.
docs := 3
for i := 0; i < docs; i++ {
addMinimalDoc(t, indexer, "logs-foo-testing")
}

func TestAppenderServerErrorTooManyRequests(t *testing.T) {
var bytesTotal int64
var bytesUncompressedTotal int64
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
// Set the r.ContentLength rather than sum it since 429s will be
// retried by the go-elasticsearch transport.
bytesTotal = r.ContentLength
_, _, stat := docappendertest.DecodeBulkRequestWithStats(r)
bytesUncompressedTotal = stat.UncompressedBytes
w.WriteHeader(http.StatusTooManyRequests)
// Closing the indexer flushes enqueued documents.
err = indexer.Close(context.Background())
require.EqualError(t, err, errMsg)
stats := indexer.Stats()
wantStats := docappender.Stats{
Added: int64(docs),
Active: 0,
BulkRequests: 1, // Single bulk request
Failed: int64(docs),
AvailableBulkRequests: 10,
BytesTotal: bytesTotal,
BytesUncompressedTotal: bytesUncompressedTotal,
}
switch {
case sc == 429:
wantStats.TooManyRequests = int64(docs)
case sc >= 500:
wantStats.FailedServer = int64(docs)
case sc >= 400 && sc != 429:
wantStats.FailedClient = int64(docs)
}
assert.Equal(t, wantStats, stats)
}
t.Run("400", func(t *testing.T) {
test(t, http.StatusBadRequest, "flush failed (400): [400 Bad Request] ")
})
t.Run("403", func(t *testing.T) {
test(t, http.StatusForbidden, "flush failed (403): [403 Forbidden] ")
})
t.Run("429", func(t *testing.T) {
test(t, http.StatusTooManyRequests, "flush failed (429): [429 Too Many Requests] ")
})
t.Run("500", func(t *testing.T) {
test(t, http.StatusInternalServerError, "flush failed (500): [500 Internal Server Error] ")
})
t.Run("503", func(t *testing.T) {
test(t, http.StatusServiceUnavailable, "flush failed (503): [503 Service Unavailable] ")
})
t.Run("504", func(t *testing.T) {
test(t, http.StatusGatewayTimeout, "flush failed (504): [504 Gateway Timeout] ")
})
indexer, err := docappender.New(client, docappender.Config{FlushInterval: time.Minute})
require.NoError(t, err)
defer indexer.Close(context.Background())

addMinimalDoc(t, indexer, "logs-foo-testing")

// Closing the indexer flushes enqueued documents.
err = indexer.Close(context.Background())
require.EqualError(t, err, "flush failed: [429 Too Many Requests] ")
stats := indexer.Stats()
assert.Equal(t, docappender.Stats{
Added: 1,
Active: 0,
BulkRequests: 1,
Failed: 1,
TooManyRequests: 1,
AvailableBulkRequests: 10,
BytesTotal: bytesTotal,
BytesUncompressedTotal: bytesUncompressedTotal,
}, stats)
}

func TestAppenderIndexFailedLogging(t *testing.T) {
Expand Down
27 changes: 20 additions & 7 deletions bulk_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,19 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
b.bytesUncompFlushed = bytesUncompFlushed
var resp BulkIndexerResponseStat
if res.IsError() {
if res.StatusCode == http.StatusTooManyRequests {
return resp, errorTooManyRequests{res: res}
e := errorFlushFailed{resp: res.String(), statusCode: res.StatusCode}
if res.StatusCode >= 400 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go-elasticsearch client treats StatusCode > 299 as Error here. I'm not sure how relevant handling of 3xx is, but right now any 3xx will be skipped in here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we have the switch case here as well? I find it more readable than nested if. Here we'll get the added benefit of fewer return statements.

if res.StatusCode == http.StatusTooManyRequests {
e.tooMany = true
return resp, e
}
if res.StatusCode >= 500 {
e.serverError = true
return resp, e
}
e.clientError = true
}
return resp, fmt.Errorf("flush failed: %s", res.String())
return resp, e
}

if err := jsoniter.NewDecoder(res.Body).Decode(&resp); err != nil {
Expand Down Expand Up @@ -530,10 +539,14 @@ func indexnth(s []byte, nth int, sep rune) int {
})
}

type errorTooManyRequests struct {
res *esapi.Response
type errorFlushFailed struct {
resp string
statusCode int
tooMany bool
clientError bool
serverError bool
Comment on lines +539 to +544

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice 👍🏼

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one other option would be to have a typed error for each case, and then wrapping them into an error returned on the specific branch, but we can always change that later

}

func (e errorTooManyRequests) Error() string {
return fmt.Sprintf("flush failed: %s", e.res.String())
func (e errorFlushFailed) Error() string {
return fmt.Sprintf("flush failed (%d): %s", e.statusCode, e.resp)
}