Skip to content

Commit

Permalink
reprocess epoch aggregations if relevant slots are still cached (rel. #2
Browse files Browse the repository at this point in the history
)
  • Loading branch information
pk910 committed Aug 7, 2023
1 parent 1575bb6 commit 3494311
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,13 @@ func (indexer *Indexer) processHeadBlock(slot uint64, header *rpctypes.StandardV
}
if !reachedEnd {
logger.Errorf("Large chain reorg detected, resync needed")
// TODO: Drop all unfinalized & resync
// TODO: Start synchronization
} else {
reorgMinEpoch := utils.EpochOfSlot(uint64(canonicalBlock.Header.Data.Header.Message.Slot))
if reorgMinEpoch <= indexer.state.lastProcessedEpoch {
logger.Infof("Chain reorg touched processed epochs, reset epoch processing to %v", reorgMinEpoch-1)
indexer.state.lastProcessedEpoch = reorgMinEpoch - 1
}
}
}
indexer.state.lastHeadBlock = slot
Expand Down Expand Up @@ -610,8 +616,9 @@ func (indexer *Indexer) loadEpochValidators(epoch uint64, epochStats *EpochStats
func (indexer *Indexer) processIndexing() {
// process old epochs
currentEpoch := utils.EpochOfSlot(indexer.state.lastHeadBlock)
processEpoch := currentEpoch - uint64(indexer.epochProcessingDelay)
if indexer.state.lastProcessedEpoch < processEpoch {
maxProcessEpoch := currentEpoch - uint64(indexer.epochProcessingDelay)
for indexer.state.lastProcessedEpoch < maxProcessEpoch {
processEpoch := indexer.state.lastProcessedEpoch + 1
indexer.processEpoch(processEpoch)
indexer.state.lastProcessedEpoch = processEpoch
}
Expand Down

0 comments on commit 3494311

Please sign in to comment.