Skip to content

Commit

Permalink
Merge pull request #1729 from livepeer/nv/fix-blockwatch
Browse files Browse the repository at this point in the history
eth: blockwatcher can process multiple blocks in a polling interval
  • Loading branch information
kyriediculous authored Mar 5, 2021
2 parents f4a2efb + 003029a commit f56c02f
Showing 1 changed file with 26 additions and 5 deletions.
31 changes: 26 additions & 5 deletions eth/blockwatch/block_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type Watcher struct {
ticker *time.Ticker
withLogs bool
topics []common.Hash
mu sync.RWMutex
sync.RWMutex
}

// New creates a new Watcher instance.
Expand Down Expand Up @@ -105,13 +105,13 @@ func (w *Watcher) BackfillEventsIfNeeded(ctx context.Context) error {
// Watch starts the Watcher. It will continuously look for new blocks and blocks
// until the given context is canceled. Typically, you want to call Watch inside a goroutine.
func (w *Watcher) Watch(ctx context.Context) error {
w.mu.Lock()
w.Lock()
if w.wasStartedOnce {
w.mu.Unlock()
w.Unlock()
return errors.New("Can only start Watcher once per instance")
}
w.wasStartedOnce = true
w.mu.Unlock()
w.Unlock()

ticker := time.NewTicker(w.pollingInterval)
for {
Expand All @@ -120,7 +120,7 @@ func (w *Watcher) Watch(ctx context.Context) error {
ticker.Stop()
return nil
case <-ticker.C:
if err := w.pollNextBlock(); err != nil {
if err := w.syncToLatestBlock(); err != nil {
glog.Errorf("blockwatch.Watcher error encountered - trying again on next polling interval err=%v", err)
}
}
Expand All @@ -146,6 +146,27 @@ func (w *Watcher) InspectRetainedBlocks() ([]*MiniHeader, error) {
return w.stack.Inspect()
}

func (w *Watcher) syncToLatestBlock() error {
w.Lock()
defer w.Unlock()
newestHeader, err := w.client.HeaderByNumber(nil)
if err != nil {
return err
}

lastSeenHeader, err := w.stack.Peek()
if err != nil {
return err
}

for i := lastSeenHeader.Number; i.Cmp(newestHeader.Number) < 0; i = i.Add(i, big.NewInt(1)) {
if err := w.pollNextBlock(); err != nil {
return err
}
}
return nil
}

// pollNextBlock polls for the next block header to be added to the block stack.
// If there are no blocks on the stack, it fetches the first block at the specified
// `startBlockDepth` supplied at instantiation.
Expand Down

0 comments on commit f56c02f

Please sign in to comment.