diff --git a/cl/antiquary/antiquary.go b/cl/antiquary/antiquary.go index df2ca5760b5..8f3d376ec13 100644 --- a/cl/antiquary/antiquary.go +++ b/cl/antiquary/antiquary.go @@ -40,7 +40,7 @@ import ( "github.com/erigontech/erigon/turbo/snapshotsync/freezeblocks" ) -const safetyMargin = 2_000 // We retire snapshots 2k blocks after the finalized head +const safetyMargin = 10_000 // We retire snapshots 10k blocks after the finalized head // Antiquary is where the snapshots go, aka old history, it is what keep track of the oldest records. type Antiquary struct { @@ -360,6 +360,10 @@ func (a *Antiquary) antiquateBlobs() error { defer roTx.Rollback() // perform blob antiquation if it is time to. currentBlobsProgress := a.sn.FrozenBlobs() + // We should NEVER get ahead of the block snapshots. + if currentBlobsProgress >= a.sn.BlocksAvailable() { + return nil + } minimunBlobsProgress := ((a.cfg.DenebForkEpoch * a.cfg.SlotsPerEpoch) / snaptype.Erigon2MergeLimit) * snaptype.Erigon2MergeLimit currentBlobsProgress = max(currentBlobsProgress, minimunBlobsProgress) // read the finalized head diff --git a/cl/phase1/stages/stage_history_download.go b/cl/phase1/stages/stage_history_download.go index 835a1f7cb6c..245759c2c03 100644 --- a/cl/phase1/stages/stage_history_download.go +++ b/cl/phase1/stages/stage_history_download.go @@ -228,18 +228,18 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co close(finishCh) if cfg.blobsBackfilling { go func() { - if err := downloadBlobHistoryWorker(cfg, ctx, logger); err != nil { + if err := downloadBlobHistoryWorker(cfg, ctx, true, logger); err != nil { logger.Error("Error downloading blobs", "err", err) } - // set a timer every 1 hour as a failsafe - ticker := time.NewTicker(time.Hour) + // set a timer every 15 minutes as a failsafe + ticker := time.NewTicker(15 * time.Minute) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: - if err := downloadBlobHistoryWorker(cfg, ctx, logger); err != nil { + if err := downloadBlobHistoryWorker(cfg, ctx, false, logger); err != nil { logger.Error("Error downloading blobs", "err", err) } } @@ -273,7 +273,7 @@ func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Co } // downloadBlobHistoryWorker is a worker that downloads the blob history by using the already downloaded beacon blocks -func downloadBlobHistoryWorker(cfg StageHistoryReconstructionCfg, ctx context.Context, logger log.Logger) error { +func downloadBlobHistoryWorker(cfg StageHistoryReconstructionCfg, ctx context.Context, shouldLog bool, logger log.Logger) error { currentSlot := cfg.startingSlot + 1 blocksBatchSize := uint64(8) // requests 8 blocks worth of blobs at a time tx, err := cfg.indiciesDB.BeginRo(ctx) @@ -287,7 +287,7 @@ func downloadBlobHistoryWorker(cfg StageHistoryReconstructionCfg, ctx context.Co prevLogSlot := currentSlot prevTime := time.Now() targetSlot := cfg.beaconCfg.DenebForkEpoch * cfg.beaconCfg.SlotsPerEpoch - cfg.logger.Info("Downloading blobs backwards", "from", currentSlot, "to", targetSlot) + for currentSlot >= targetSlot { if currentSlot <= cfg.sn.FrozenBlobs() { break @@ -336,7 +336,9 @@ func downloadBlobHistoryWorker(cfg StageHistoryReconstructionCfg, ctx context.Co case <-ctx.Done(): return ctx.Err() case <-logInterval.C: - + if !shouldLog { + continue + } blkSec := float64(prevLogSlot-currentSlot) / time.Since(prevTime).Seconds() blkSecStr := fmt.Sprintf("%.1f", blkSec) // round to 1 decimal place and convert to string @@ -377,7 +379,9 @@ func downloadBlobHistoryWorker(cfg StageHistoryReconstructionCfg, ctx context.Co continue } } - log.Info("Blob history download finished successfully") + if shouldLog { + logger.Info("Blob history download finished successfully") + } cfg.antiquary.NotifyBlobBackfilled() return nil }