From 766938525f3e94d3e94f8adf2f7230a38593de09 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Wed, 6 Nov 2024 14:57:33 +0100 Subject: [PATCH] fix(blooms): Do not restart builders when planner disconnects (#14783) --- pkg/bloombuild/builder/builder.go | 79 ++++++++++++++++++++++++------- 1 file changed, 62 insertions(+), 17 deletions(-) diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go index 7f0ac4b65b4ce..1b0a81a247258 100644 --- a/pkg/bloombuild/builder/builder.go +++ b/pkg/bloombuild/builder/builder.go @@ -3,7 +3,9 @@ package builder import ( "context" "fmt" + "io" "os" + "strings" "sync" "time" @@ -136,16 +138,39 @@ func (b *Builder) running(ctx context.Context) error { retries := backoff.New(ctx, b.cfg.BackoffConfig) for retries.Ongoing() { err := b.connectAndBuild(ctx) - if err == nil || errors.Is(err, context.Canceled) { - break + if err != nil { + err := standardizeRPCError(err) + + // When the builder is shutting down, we will get a context canceled error + if errors.Is(err, context.Canceled) && b.State() != services.Running { + level.Debug(b.logger).Log("msg", "builder is shutting down") + break + } + + // If the planner disconnects while we are sending/receive a message we get an EOF error. + // In this case we should reset the backoff and retry + if errors.Is(err, io.EOF) { + level.Error(b.logger).Log("msg", "planner disconnected. Resetting backoff and retrying", "err", err) + retries.Reset() + continue + } + + // Otherwise (e.g. failed to connect to the builder), we should retry + code := status.Code(err) + level.Error(b.logger).Log("msg", "failed to connect and build. Retrying", "retry", retries.NumRetries(), "maxRetries", b.cfg.BackoffConfig.MaxRetries, "code", code.String(), "err", err) + retries.Wait() + continue } - level.Error(b.logger).Log("msg", "failed to connect and build. Retrying", "err", err) - retries.Wait() + // We shouldn't get here. If we do, we better restart the builder. + // Adding a log line for visibility + level.Error(b.logger).Log("msg", "unexpected end of connectAndBuild. Restarting builder") + break } if err := retries.Err(); err != nil { if errors.Is(err, context.Canceled) { + // Edge case when the builder is shutting down while we check for retries return nil } return fmt.Errorf("failed to connect and build: %w", err) @@ -154,6 +179,31 @@ func (b *Builder) running(ctx context.Context) error { return nil } +// standardizeRPCError converts some gRPC errors we want to handle differently to standard errors. +// 1. codes.Canceled -> context.Canceled +// 2. codes.Unavailable with EOF -> io.EOF +// 3. All other errors are returned as is. +func standardizeRPCError(err error) error { + if err == nil { + return nil + } + + if st, ok := status.FromError(err); ok { + switch st.Code() { + case codes.Canceled: + // Happens when the builder is shutting down, and we are sending/receiving a message + return context.Canceled + case codes.Unavailable: + // We want to handle this case as a retryable error that resets the backoff + if i := strings.LastIndex(st.Message(), "EOF"); i != -1 { + return io.EOF + } + } + } + + return err +} + func (b *Builder) plannerAddress() string { if b.ringWatcher == nil { return b.cfg.PlannerAddress @@ -173,8 +223,7 @@ func (b *Builder) connectAndBuild(ctx context.Context) error { return fmt.Errorf("failed to create grpc dial options: %w", err) } - // nolint:staticcheck // grpc.DialContext() has been deprecated; we'll address it before upgrading to gRPC 2. - conn, err := grpc.DialContext(ctx, b.plannerAddress(), opts...) + conn, err := grpc.NewClient(b.plannerAddress(), opts...) if err != nil { return fmt.Errorf("failed to dial bloom planner: %w", err) } @@ -202,21 +251,17 @@ func (b *Builder) connectAndBuild(ctx context.Context) error { } func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) error { + ctx := c.Context() + // Send ready message to planner if err := c.Send(&protos.BuilderToPlanner{BuilderID: b.ID}); err != nil { return fmt.Errorf("failed to send ready message to planner: %w", err) } - for b.State() == services.Running { - // When the planner connection closes, an EOF or "planner shutting down" error is returned. - // When the builder is shutting down, a gRPC context canceled error is returned. + // Will break when planner<->builder connection is closed or when the builder is shutting down. + for ctx.Err() == nil { protoTask, err := c.Recv() if err != nil { - if status.Code(err) == codes.Canceled { - level.Debug(b.logger).Log("msg", "builder loop context canceled") - return nil - } - return fmt.Errorf("failed to receive task from planner: %w", err) } @@ -235,7 +280,7 @@ func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) erro continue } - newMetas, err := b.processTask(c.Context(), task) + newMetas, err := b.processTask(ctx, task) if err != nil { err = fmt.Errorf("failed to process task: %w", err) } @@ -249,8 +294,8 @@ func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) erro b.metrics.processingTask.Set(0) } - level.Debug(b.logger).Log("msg", "builder loop stopped") - return nil + level.Debug(b.logger).Log("msg", "builder loop stopped", "ctx_err", ctx.Err()) + return ctx.Err() } func (b *Builder) logTaskCompleted(