Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(blooms): Do not restart builders when planner disconnects (backport k227) #14922

Merged
merged 1 commit into from
Nov 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 62 additions & 17 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package builder
import (
"context"
"fmt"
"io"
"os"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -136,16 +138,39 @@ func (b *Builder) running(ctx context.Context) error {
retries := backoff.New(ctx, b.cfg.BackoffConfig)
for retries.Ongoing() {
err := b.connectAndBuild(ctx)
if err == nil || errors.Is(err, context.Canceled) {
break
if err != nil {
err := standardizeRPCError(err)

// When the builder is shutting down, we will get a context canceled error
if errors.Is(err, context.Canceled) && b.State() != services.Running {
level.Debug(b.logger).Log("msg", "builder is shutting down")
break
}

// If the planner disconnects while we are sending/receive a message we get an EOF error.
// In this case we should reset the backoff and retry
if errors.Is(err, io.EOF) {
level.Error(b.logger).Log("msg", "planner disconnected. Resetting backoff and retrying", "err", err)
retries.Reset()
continue
}

// Otherwise (e.g. failed to connect to the builder), we should retry
code := status.Code(err)
level.Error(b.logger).Log("msg", "failed to connect and build. Retrying", "retry", retries.NumRetries(), "maxRetries", b.cfg.BackoffConfig.MaxRetries, "code", code.String(), "err", err)
retries.Wait()
continue
}

level.Error(b.logger).Log("msg", "failed to connect and build. Retrying", "err", err)
retries.Wait()
// We shouldn't get here. If we do, we better restart the builder.
// Adding a log line for visibility
level.Error(b.logger).Log("msg", "unexpected end of connectAndBuild. Restarting builder")
break
}

if err := retries.Err(); err != nil {
if errors.Is(err, context.Canceled) {
// Edge case when the builder is shutting down while we check for retries
return nil
}
return fmt.Errorf("failed to connect and build: %w", err)
Expand All @@ -154,6 +179,31 @@ func (b *Builder) running(ctx context.Context) error {
return nil
}

// standardizeRPCError converts some gRPC errors we want to handle differently to standard errors.
// 1. codes.Canceled -> context.Canceled
// 2. codes.Unavailable with EOF -> io.EOF
// 3. All other errors are returned as is.
func standardizeRPCError(err error) error {
if err == nil {
return nil
}

if st, ok := status.FromError(err); ok {
switch st.Code() {
case codes.Canceled:
// Happens when the builder is shutting down, and we are sending/receiving a message
return context.Canceled
case codes.Unavailable:
// We want to handle this case as a retryable error that resets the backoff
if i := strings.LastIndex(st.Message(), "EOF"); i != -1 {
return io.EOF
}
}
}

return err
}

func (b *Builder) plannerAddress() string {
if b.ringWatcher == nil {
return b.cfg.PlannerAddress
Expand All @@ -173,8 +223,7 @@ func (b *Builder) connectAndBuild(ctx context.Context) error {
return fmt.Errorf("failed to create grpc dial options: %w", err)
}

// nolint:staticcheck // grpc.DialContext() has been deprecated; we'll address it before upgrading to gRPC 2.
conn, err := grpc.DialContext(ctx, b.plannerAddress(), opts...)
conn, err := grpc.NewClient(b.plannerAddress(), opts...)
if err != nil {
return fmt.Errorf("failed to dial bloom planner: %w", err)
}
Expand Down Expand Up @@ -202,21 +251,17 @@ func (b *Builder) connectAndBuild(ctx context.Context) error {
}

func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) error {
ctx := c.Context()

// Send ready message to planner
if err := c.Send(&protos.BuilderToPlanner{BuilderID: b.ID}); err != nil {
return fmt.Errorf("failed to send ready message to planner: %w", err)
}

for b.State() == services.Running {
// When the planner connection closes, an EOF or "planner shutting down" error is returned.
// When the builder is shutting down, a gRPC context canceled error is returned.
// Will break when planner<->builder connection is closed or when the builder is shutting down.
for ctx.Err() == nil {
protoTask, err := c.Recv()
if err != nil {
if status.Code(err) == codes.Canceled {
level.Debug(b.logger).Log("msg", "builder loop context canceled")
return nil
}

return fmt.Errorf("failed to receive task from planner: %w", err)
}

Expand All @@ -235,7 +280,7 @@ func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) erro
continue
}

newMetas, err := b.processTask(c.Context(), task)
newMetas, err := b.processTask(ctx, task)
if err != nil {
err = fmt.Errorf("failed to process task: %w", err)
}
Expand All @@ -249,8 +294,8 @@ func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) erro
b.metrics.processingTask.Set(0)
}

level.Debug(b.logger).Log("msg", "builder loop stopped")
return nil
level.Debug(b.logger).Log("msg", "builder loop stopped", "ctx_err", ctx.Err())
return ctx.Err()
}

func (b *Builder) logTaskCompleted(
Expand Down
Loading