From aeeab3850850286b0ceb30bb4c52a5e150b4beae Mon Sep 17 00:00:00 2001 From: Nico Vergauwen Date: Mon, 12 Apr 2021 17:05:40 +0200 Subject: [PATCH] eth: move timewatcher state initialisation into constructor to be synchronous --- CHANGELOG_PENDING.md | 1 + eth/stubclient.go | 34 ++++--- eth/watchers/orchestratorwatcher_test.go | 5 +- eth/watchers/timewatcher.go | 67 +++++++------- eth/watchers/timewatcher_test.go | 110 +++++++++++++++++------ 5 files changed, 143 insertions(+), 74 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index f1ac137146..68b9982dc7 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -8,6 +8,7 @@ - \#1810 Display "n/a" in CLI when max gas price isn't specified (@kyriediculous) - \#1827 Limit the maximum size of a segment read over HTTP (@jailuthra) +- \#1833 Prevent nil pointer errors when fetching transcoder pool size (@kyriediculous) #### Orchestrator diff --git a/eth/stubclient.go b/eth/stubclient.go index d777632d29..789047c825 100644 --- a/eth/stubclient.go +++ b/eth/stubclient.go @@ -201,7 +201,6 @@ type StubClient struct { ProcessHistoricalUnbondError error Orchestrators []*lpTypes.Transcoder Round *big.Int - RoundsErr error SenderInfo *pm.SenderInfo PoolSize *big.Int ClaimedAmount *big.Int @@ -213,6 +212,7 @@ type StubClient struct { TranscoderPoolError error RoundLocked bool RoundLockedErr error + Errors map[string]error } type stubTranscoder struct { @@ -220,21 +220,27 @@ type stubTranscoder struct { } func (e *StubClient) Setup(password string, gasLimit uint64, gasPrice *big.Int) error { return nil } -func (e *StubClient) Account() accounts.Account { return accounts.Account{Address: e.TranscoderAddress} } -func (e *StubClient) Backend() (Backend, error) { return nil, nil } +func (e *StubClient) Account() accounts.Account { + return accounts.Account{Address: e.TranscoderAddress} +} +func (e *StubClient) Backend() (Backend, error) { return nil, nil } // Rounds func (e *StubClient) InitializeRound() (*types.Transaction, error) { return nil, nil } -func (e *StubClient) CurrentRound() (*big.Int, error) { return big.NewInt(0), e.RoundsErr } -func (e *StubClient) LastInitializedRound() (*big.Int, error) { return e.Round, e.RoundsErr } +func (e *StubClient) CurrentRound() (*big.Int, error) { return big.NewInt(0), e.Errors["CurrentRound"] } +func (e *StubClient) LastInitializedRound() (*big.Int, error) { + return e.Round, e.Errors["LastInitializedRound"] +} func (e *StubClient) BlockHashForRound(round *big.Int) ([32]byte, error) { - return e.BlockHashToReturn, nil + return e.BlockHashToReturn, e.Errors["BlockHashForRound"] } -func (e *StubClient) CurrentRoundInitialized() (bool, error) { return false, nil } -func (e *StubClient) CurrentRoundLocked() (bool, error) { return e.RoundLocked, e.RoundLockedErr } -func (e *StubClient) CurrentRoundStartBlock() (*big.Int, error) { return nil, nil } -func (e *StubClient) Paused() (bool, error) { return false, nil } +func (e *StubClient) CurrentRoundInitialized() (bool, error) { return false, nil } +func (e *StubClient) CurrentRoundLocked() (bool, error) { return e.RoundLocked, e.RoundLockedErr } +func (e *StubClient) CurrentRoundStartBlock() (*big.Int, error) { + return e.BlockNum, e.Errors["CurrentRoundStartBlock"] +} +func (e *StubClient) Paused() (bool, error) { return false, nil } // Token @@ -300,9 +306,11 @@ func (e *StubClient) GetTranscoderEarningsPoolForRound(addr common.Address, roun func (e *StubClient) TranscoderPool() ([]*lpTypes.Transcoder, error) { return e.Orchestrators, e.TranscoderPoolError } -func (e *StubClient) IsActiveTranscoder() (bool, error) { return false, nil } -func (e *StubClient) GetTotalBonded() (*big.Int, error) { return big.NewInt(0), nil } -func (e *StubClient) GetTranscoderPoolSize() (*big.Int, error) { return e.PoolSize, nil } +func (e *StubClient) IsActiveTranscoder() (bool, error) { return false, nil } +func (e *StubClient) GetTotalBonded() (*big.Int, error) { return big.NewInt(0), nil } +func (e *StubClient) GetTranscoderPoolSize() (*big.Int, error) { + return e.PoolSize, e.Errors["GetTranscoderPoolSize"] +} func (e *StubClient) ClaimedReserve(sender ethcommon.Address, claimant ethcommon.Address) (*big.Int, error) { return e.ClaimedAmount, e.ClaimedReserveError } diff --git a/eth/watchers/orchestratorwatcher_test.go b/eth/watchers/orchestratorwatcher_test.go index e99dc6e6f8..d9d4373d18 100644 --- a/eth/watchers/orchestratorwatcher_test.go +++ b/eth/watchers/orchestratorwatcher_test.go @@ -150,6 +150,7 @@ func TestOrchWatcher_HandleRoundEvent_CacheOrchestratorStake(t *testing.T) { stubStore := &stubOrchestratorStore{} lpEth := ð.StubClient{ TotalStake: expStake, + Errors: make(map[string]error), } tw := &stubTimeWatcher{} @@ -170,13 +171,13 @@ func TestOrchWatcher_HandleRoundEvent_CacheOrchestratorStake(t *testing.T) { assert.Equal(stubStore.stake, int64(500000000)) // LivepeerEthClient.CurrentRound() error - lpEth.RoundsErr = errors.New("CurrentRound error") + lpEth.Errors["CurrentRound"] = errors.New("CurrentRound error") errorLogsBefore := glog.Stats.Error.Lines() tw.sink <- newRoundEvent time.Sleep(20 * time.Millisecond) errorLogsAfter := glog.Stats.Error.Lines() assert.Equal(int64(1), errorLogsAfter-errorLogsBefore) - lpEth.RoundsErr = nil + lpEth.Errors["CurrentRound"] = nil // OrchestratorStore.SelectOrchs error stubStore.selectErr = errors.New("SelectOrchs error") diff --git a/eth/watchers/timewatcher.go b/eth/watchers/timewatcher.go index 138961e1c1..c9fc6ca066 100644 --- a/eth/watchers/timewatcher.go +++ b/eth/watchers/timewatcher.go @@ -54,12 +54,42 @@ func NewTimeWatcher(roundsManagerAddr ethcommon.Address, watcher BlockWatcher, l return nil, fmt.Errorf("error creating decoder: %v", err) } - return &TimeWatcher{ + tw := &TimeWatcher{ quit: make(chan struct{}), watcher: watcher, lpEth: lpEth, dec: dec, - }, nil + } + + lr, err := tw.lpEth.LastInitializedRound() + if err != nil { + return nil, fmt.Errorf("error fetching initial lastInitializedRound value err=%v", err) + } + bh, err := tw.lpEth.BlockHashForRound(lr) + if err != nil { + return nil, fmt.Errorf("error fetching initial lastInitializedBlockHash value err=%v", err) + } + num, err := tw.lpEth.CurrentRoundStartBlock() + if err != nil { + return nil, fmt.Errorf("error fetching current round start block %v=", err) + } + tw.setLastInitializedRound(lr, bh, num) + + lastSeenBlock, err := tw.watcher.GetLatestBlock() + if err != nil { + return nil, fmt.Errorf("error fetching last seen block err=%v", err) + } + blockNum := big.NewInt(0) + if lastSeenBlock != nil { + blockNum = lastSeenBlock.Number + } + tw.setLastSeenBlock(blockNum) + + if err := tw.fetchAndSetTranscoderPoolSize(); err != nil { + return nil, fmt.Errorf("error fetching initial transcoderPoolSize err=%v", err) + } + + return tw, nil } // LastInitializedRound gets the last initialized round from cache @@ -93,6 +123,9 @@ func (tw *TimeWatcher) setLastInitializedRound(round *big.Int, hash [32]byte, st func (tw *TimeWatcher) GetTranscoderPoolSize() *big.Int { tw.mu.RLock() defer tw.mu.RUnlock() + if tw.transcoderPoolSize == nil { + return big.NewInt(0) + } return tw.transcoderPoolSize } @@ -116,34 +149,6 @@ func (tw *TimeWatcher) setLastSeenBlock(blockNum *big.Int) { // Watch the blockwatch subscription for NewRound events func (tw *TimeWatcher) Watch() error { - lr, err := tw.lpEth.LastInitializedRound() - if err != nil { - return fmt.Errorf("error fetching initial lastInitializedRound value err=%v", err) - } - bh, err := tw.lpEth.BlockHashForRound(lr) - if err != nil { - return fmt.Errorf("error fetching initial lastInitializedBlockHash value err=%v", err) - } - num, err := tw.lpEth.CurrentRoundStartBlock() - if err != nil { - return fmt.Errorf("error fetching current round start block %v=", err) - } - tw.setLastInitializedRound(lr, bh, num) - - if err := tw.fetchAndSetTranscoderPoolSize(); err != nil { - return fmt.Errorf("error fetching initial transcoderPoolSize err=%v", err) - } - - lastSeenBlock, err := tw.watcher.GetLatestBlock() - if err != nil { - return fmt.Errorf("error fetching last seen block err=%v", err) - } - blockNum := big.NewInt(0) - if lastSeenBlock != nil { - blockNum = lastSeenBlock.Number - } - tw.setLastSeenBlock(blockNum) - events := make(chan []*blockwatch.Event, 10) sub := tw.watcher.Subscribe(events) defer sub.Unsubscribe() @@ -252,7 +257,7 @@ func (tw *TimeWatcher) handleLog(log types.Log) error { func (tw *TimeWatcher) fetchAndSetTranscoderPoolSize() error { size, err := tw.lpEth.GetTranscoderPoolSize() if err != nil { - return fmt.Errorf("error fetching initial transcoderPoolSize: %v", err) + return err } tw.setTranscoderPoolSize(size) return nil diff --git a/eth/watchers/timewatcher_test.go b/eth/watchers/timewatcher_test.go index f5c5f8d10b..72da00d69d 100644 --- a/eth/watchers/timewatcher_test.go +++ b/eth/watchers/timewatcher_test.go @@ -1,7 +1,7 @@ package watchers import ( - "errors" + "fmt" "math/big" "testing" "time" @@ -40,9 +40,14 @@ func TestSetAndGet_TranscoderPoolSize(t *testing.T) { tw.setTranscoderPoolSize(size) assert.Equal(size, tw.transcoderPoolSize) assert.Equal(size, tw.GetTranscoderPoolSize()) + + // return big.Int(0) when nil + tw.setTranscoderPoolSize(nil) + assert.Nil(tw.transcoderPoolSize) + assert.Equal(big.NewInt(0), tw.GetTranscoderPoolSize()) } -func TestTimeWatcher_WatchAndStop(t *testing.T) { +func TestTimeWatcher_NewTimeWatcher(t *testing.T) { assert := assert.New(t) size := big.NewInt(50) block := big.NewInt(10) @@ -53,44 +58,103 @@ func TestTimeWatcher_WatchAndStop(t *testing.T) { BlockNum: block, BlockHashToReturn: hash, Round: round, + Errors: make(map[string]error), } watcher := &stubBlockWatcher{ latestHeader: &blockwatch.MiniHeader{Number: block}, } + + testErr := fmt.Errorf("error") + + // Last InitializedRound error + lpEth.Errors["LastInitializedRound"] = testErr + expErr := fmt.Sprintf("error fetching initial lastInitializedRound value err=%v", testErr) tw, err := NewTimeWatcher(stubRoundsManagerAddr, watcher, lpEth) - assert.Nil(err) + assert.Nil(tw) + assert.EqualError(err, expErr) + lpEth.Errors["LastInitializedRound"] = nil - header := defaultMiniHeader() - newRoundEvent := newStubNewRoundLog() + // BlockHashForRound error + lpEth.Errors["BlockHashForRound"] = testErr + expErr = fmt.Sprintf("error fetching initial lastInitializedBlockHash value err=%v", testErr) + tw, err = NewTimeWatcher(stubRoundsManagerAddr, watcher, lpEth) + assert.Nil(tw) + assert.EqualError(err, expErr) + lpEth.Errors["BlockHashForRound"] = nil - header.Logs = append(header.Logs, newRoundEvent) - blockEvent := &blockwatch.Event{ - Type: blockwatch.Added, - BlockHeader: header, - } + // CurrentRoundStartBlock error + lpEth.Errors["CurrentRoundStartBlock"] = testErr + expErr = fmt.Sprintf("error fetching current round start block %v=", testErr) + tw, err = NewTimeWatcher(stubRoundsManagerAddr, watcher, lpEth) + assert.Nil(tw) + assert.EqualError(err, expErr) + lpEth.Errors["CurrentRoundStartBlock"] = nil - go tw.Watch() - time.Sleep(2 * time.Millisecond) - // Check state initialization + // GetLastestBlock error + watcher.err = fmt.Errorf("GetLatestBlock error") + expErr = fmt.Sprintf("error fetching last seen block err=%v", watcher.err) + tw, err = NewTimeWatcher(stubRoundsManagerAddr, watcher, lpEth) + assert.Nil(tw) + assert.EqualError(err, expErr) + watcher.err = nil + + // TranscoderPoolSize error + lpEth.Errors["GetTranscoderPoolSize"] = testErr + expErr = fmt.Sprintf("error fetching initial transcoderPoolSize err=%v", testErr) + tw, err = NewTimeWatcher(stubRoundsManagerAddr, watcher, lpEth) + assert.Nil(tw) + assert.EqualError(err, expErr) + lpEth.Errors["GetTranscoderPoolSize"] = nil + + tw, err = NewTimeWatcher(stubRoundsManagerAddr, watcher, lpEth) + assert.Nil(err) bh := tw.LastInitializedBlockHash() assert.Equal(hash, common.BytesToHash(bh[:])) assert.Equal(round, tw.LastInitializedRound()) assert.Equal(size, tw.GetTranscoderPoolSize()) assert.Equal(block, tw.LastSeenBlock()) - tw.Stop() // if watcher.GetLatestBlock() == nil, initialise lastSeenBlock to big.NewInt(0) watcher.latestHeader = nil tw, err = NewTimeWatcher(stubRoundsManagerAddr, watcher, lpEth) assert.Nil(err) - go tw.Watch() - time.Sleep(2 * time.Millisecond) - // Check state initialization bh = tw.LastInitializedBlockHash() assert.Equal(hash, common.BytesToHash(bh[:])) assert.Equal(round, tw.LastInitializedRound()) assert.Equal(size, tw.GetTranscoderPoolSize()) assert.Equal(big.NewInt(0), tw.LastSeenBlock()) +} + +func TestTimeWatcher_WatchAndStop(t *testing.T) { + assert := assert.New(t) + size := big.NewInt(50) + block := big.NewInt(10) + round := big.NewInt(1) + hash := ethcommon.HexToHash("foo") + lpEth := ð.StubClient{ + PoolSize: size, + BlockNum: block, + BlockHashToReturn: hash, + Round: round, + Errors: make(map[string]error), + } + watcher := &stubBlockWatcher{ + latestHeader: &blockwatch.MiniHeader{Number: block}, + } + tw, err := NewTimeWatcher(stubRoundsManagerAddr, watcher, lpEth) + require.Nil(t, err) + + header := defaultMiniHeader() + newRoundEvent := newStubNewRoundLog() + + header.Logs = append(header.Logs, newRoundEvent) + blockEvent := &blockwatch.Event{ + Type: blockwatch.Added, + BlockHeader: header, + } + + go tw.Watch() + time.Sleep(2 * time.Millisecond) // New Round event watcher.sink <- []*blockwatch.Event{blockEvent} @@ -123,7 +187,7 @@ func TestTimeWatcher_WatchAndStop(t *testing.T) { watcher.sink <- []*blockwatch.Event{blockEvent} time.Sleep(2 * time.Millisecond) bhForRound = tw.LastInitializedBlockHash() - assert.Equal(hash, common.BytesToHash(bh[:])) + assert.Equal(hash, common.BytesToHash(bhForRound[:])) assert.Equal(round, tw.LastInitializedRound()) assert.Equal(size, tw.GetTranscoderPoolSize()) assert.Equal(header.Number, tw.LastSeenBlock()) @@ -132,16 +196,6 @@ func TestTimeWatcher_WatchAndStop(t *testing.T) { tw.Stop() time.Sleep(2 * time.Millisecond) assert.True(watcher.sub.unsubscribed) - - // Test watch error when RPC calls fail - tw = &TimeWatcher{ - lpEth: ð.StubClient{ - RoundsErr: errors.New("timewatcher error"), - }, - } - err = tw.Watch() - assert.NotNil(err) - assert.Contains(err.Error(), "timewatcher error") } func TestTimeWatcher_HandleLog(t *testing.T) {