Skip to content

Commit

Permalink
fix(relayer): requeue back to original queue on select errors (#17584)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberhorsey authored Jun 14, 2024
1 parent 720fad5 commit bc2379f
Showing 1 changed file with 17 additions and 11 deletions.
28 changes: 17 additions & 11 deletions packages/relayer/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/ecdsa"
"database/sql"
"encoding/json"
"errors"
"fmt"
"log/slog"
Expand Down Expand Up @@ -489,23 +490,28 @@ func (p *Processor) eventLoop(ctx context.Context) {
if err := p.queue.Ack(ctx, m); err != nil {
slog.Error("Err acking message", "err", err.Error())
}
case errors.Is(err, context.Canceled):
slog.Error("process message failed due to context cancel", "err", err.Error())
case errors.Is(err, context.Canceled) ||
strings.Contains(err.Error(), "timeout") ||
strings.Contains(err.Error(), "i/o") ||
strings.Contains(err.Error(), "connect") ||
strings.Contains(err.Error(), "failed to get tx into the mempool"):
slog.Error("process message failed", "err", err.Error())

// we want to negatively acknowledge the message and make sure
// we requeue it
if err := p.queue.Nack(ctx, m, true); err != nil {
if err := p.queue.Nack(ctx, m, false); err != nil {
slog.Error("Err nacking message", "err", err.Error())
break
}
case strings.Contains(err.Error(), "timeout") ||
strings.Contains(err.Error(), "i/o") ||
strings.Contains(err.Error(), "connect"):
slog.Error("process message failed due to networking issue", "err", err.Error())

// we want to negatively acknowledge the message and make sure
// we requeue it
if err := p.queue.Nack(ctx, m, true); err != nil {
slog.Error("Err nacking message", "err", err.Error())
marshalledMsg, err := json.Marshal(msg)
if err != nil {
slog.Error("err marshaling queue message", "err", err.Error())
break
}

if err := p.queue.Publish(ctx, p.queueName(), marshalledMsg, nil, nil); err != nil {
slog.Error("err publishing to queue", "err", err.Error())
}
default:
slog.Error("process message failed", "err", err.Error())
Expand Down

0 comments on commit bc2379f

Please sign in to comment.