Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

feat: orchestrator not failing and requeing failed nonces #631

Merged
merged 5 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/blobstream/orchestrator/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func Start() *cobra.Command {

// creating the p2p querier
p2pQuerier := p2p.NewQuerier(dht, logger)
retrier := helpers.NewRetrier(logger, 6, time.Minute)
retrier := helpers.NewRetrier(logger, 5, 30*time.Second)

// creating the broadcaster
broadcaster := orchestrator.NewBroadcaster(p2pQuerier.BlobstreamDHT)
Expand Down
6 changes: 4 additions & 2 deletions e2e/scripts/start_orchestrator_after_validator_created.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ then
--grpc.insecure \
--p2p.nickname=key \
--p2p.listen-addr="${P2P_LISTEN}" \
--evm.passphrase=123
--evm.passphrase=123 \
--log.level debug
else
# to give time for the bootstrappers to be up
sleep 5s
Expand All @@ -66,5 +67,6 @@ else
--grpc.insecure \
--p2p.listen-addr="${P2P_LISTEN}" \
--p2p.bootstrappers="${P2P_BOOTSTRAPPERS}" \
--evm.passphrase=123
--evm.passphrase=123 \
--log.level debug
fi
4 changes: 2 additions & 2 deletions helpers/retrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ func (r Retrier) Retry(ctx context.Context, retryMethod func() error) error {
case <-ctx.Done():
return ctx.Err()
case <-nextTick.C:
r.logger.Info("retrying", "retry_number", i, "retries_left", r.retriesNumber-i)
r.logger.Debug("retrying", "retry_number", i, "retries_left", r.retriesNumber-i)
err = retryMethod()
if err == nil {
r.logger.Info("succeeded", "retries_number", i)
return nil
}
r.logger.Error("failed attempt", "retry", i, "err", err)
r.logger.Debug("failed attempt", "retry", i, "err", err)
}
}
return err
Expand Down
39 changes: 28 additions & 11 deletions orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (
coretypes "github.com/tendermint/tendermint/types"
)

// RequeueWindow the number of nonces that we want to re-enqueue if we can't process them even after retry.
// After this window is elapsed, the nonce is discarded.
const RequeueWindow = 50

type Orchestrator struct {
Logger tmlog.Logger // maybe use a more general interface

Expand Down Expand Up @@ -70,18 +74,16 @@ func (orch Orchestrator) Start(ctx context.Context) {
// used to send a signal when the nonces processor wants to notify the nonces enqueuing services to stop.
signalChan := make(chan struct{})

withCancel, cancel := context.WithCancel(ctx)

wg := &sync.WaitGroup{}

// go routine to listen for new attestation nonces
wg.Add(1)
go func() {
defer wg.Done()
err := orch.StartNewEventsListener(withCancel, noncesQueue, signalChan)
err := orch.StartNewEventsListener(ctx, noncesQueue, signalChan)
if err != nil {
orch.Logger.Error("error listening to new attestations", "err", err)
cancel()
return
}
orch.Logger.Info("stopping listening to new attestations")
}()
Expand All @@ -90,10 +92,10 @@ func (orch Orchestrator) Start(ctx context.Context) {
wg.Add(1)
go func() {
defer wg.Done()
err := orch.ProcessNonces(withCancel, noncesQueue, signalChan)
err := orch.ProcessNonces(ctx, noncesQueue, signalChan)
if err != nil {
orch.Logger.Error("error processing attestations", "err", err)
cancel()
return
}
orch.Logger.Info("stopping processing attestations")
}()
Expand All @@ -102,10 +104,10 @@ func (orch Orchestrator) Start(ctx context.Context) {
wg.Add(1)
go func() {
defer wg.Done()
err := orch.EnqueueMissingEvents(withCancel, noncesQueue, signalChan)
err := orch.EnqueueMissingEvents(ctx, noncesQueue, signalChan)
if err != nil {
orch.Logger.Error("error enqueuing missing attestations", "err", err)
cancel()
return
}
}()

Expand Down Expand Up @@ -241,7 +243,7 @@ func (orch Orchestrator) EnqueueMissingEvents(

func (orch Orchestrator) ProcessNonces(
ctx context.Context,
noncesQueue <-chan uint64,
noncesQueue chan uint64,
signalChan chan<- struct{},
) error {
for {
Expand All @@ -256,14 +258,29 @@ func (orch Orchestrator) ProcessNonces(
if err := orch.Retrier.Retry(ctx, func() error {
return orch.Process(ctx, nonce)
}); err != nil {
close(signalChan)
return err
orch.Logger.Error("error processing nonce even after retrying", "err", err.Error())
go orch.MaybeRequeue(ctx, noncesQueue, nonce)
}
}
}
}
}

// MaybeRequeue requeue the nonce to be re-processed subsequently if it's recent.
func (orch Orchestrator) MaybeRequeue(ctx context.Context, noncesQueue chan<- uint64, nonce uint64) {
latestNonce, err := orch.AppQuerier.QueryLatestAttestationNonce(ctx)
if err != nil {
orch.Logger.Debug("error requeuing nonce", "nonce", nonce, "err", err.Error())
return
}
if latestNonce <= RequeueWindow || nonce >= latestNonce-RequeueWindow {
orch.Logger.Debug("requeuing nonce", "nonce", nonce)
noncesQueue <- nonce
} else {
orch.Logger.Debug("nonce is too old, will not retry it in the future", "nonce", nonce)
}
}

func (orch Orchestrator) Process(ctx context.Context, nonce uint64) error {
att, err := orch.AppQuerier.QueryAttestationByNonce(ctx, nonce)
if err != nil {
Expand Down
Loading