Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth,pm: avoid watchPoolSizeChange nil pointer error #1833

Merged
merged 2 commits into from
Apr 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- \#1827 Limit the maximum size of a segment read over HTTP (@jailuthra)
- \#1809 Don't log statement that blocks have been backfilled when no blocks have elapsed (@kyriediculous)
- \#1809 Avoid nil pointer error in SyncToLatestBlock when no blocks are present in the database (@kyriediculous)
- \#1833 Prevent nil pointer errors when fetching transcoder pool size (@kyriediculous)

#### Orchestrator

Expand Down
34 changes: 21 additions & 13 deletions eth/stubclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ type StubClient struct {
ProcessHistoricalUnbondError error
Orchestrators []*lpTypes.Transcoder
Round *big.Int
RoundsErr error
SenderInfo *pm.SenderInfo
PoolSize *big.Int
ClaimedAmount *big.Int
Expand All @@ -213,28 +212,35 @@ type StubClient struct {
TranscoderPoolError error
RoundLocked bool
RoundLockedErr error
Errors map[string]error
}

type stubTranscoder struct {
ServiceURI string
}

func (e *StubClient) Setup(password string, gasLimit uint64, gasPrice *big.Int) error { return nil }
func (e *StubClient) Account() accounts.Account { return accounts.Account{Address: e.TranscoderAddress} }
func (e *StubClient) Backend() (Backend, error) { return nil, nil }
func (e *StubClient) Account() accounts.Account {
return accounts.Account{Address: e.TranscoderAddress}
}
func (e *StubClient) Backend() (Backend, error) { return nil, nil }

// Rounds

func (e *StubClient) InitializeRound() (*types.Transaction, error) { return nil, nil }
func (e *StubClient) CurrentRound() (*big.Int, error) { return big.NewInt(0), e.RoundsErr }
func (e *StubClient) LastInitializedRound() (*big.Int, error) { return e.Round, e.RoundsErr }
func (e *StubClient) CurrentRound() (*big.Int, error) { return big.NewInt(0), e.Errors["CurrentRound"] }
func (e *StubClient) LastInitializedRound() (*big.Int, error) {
return e.Round, e.Errors["LastInitializedRound"]
}
func (e *StubClient) BlockHashForRound(round *big.Int) ([32]byte, error) {
return e.BlockHashToReturn, nil
return e.BlockHashToReturn, e.Errors["BlockHashForRound"]
}
func (e *StubClient) CurrentRoundInitialized() (bool, error) { return false, nil }
func (e *StubClient) CurrentRoundLocked() (bool, error) { return e.RoundLocked, e.RoundLockedErr }
func (e *StubClient) CurrentRoundStartBlock() (*big.Int, error) { return nil, nil }
func (e *StubClient) Paused() (bool, error) { return false, nil }
func (e *StubClient) CurrentRoundInitialized() (bool, error) { return false, nil }
func (e *StubClient) CurrentRoundLocked() (bool, error) { return e.RoundLocked, e.RoundLockedErr }
yondonfu marked this conversation as resolved.
Show resolved Hide resolved
func (e *StubClient) CurrentRoundStartBlock() (*big.Int, error) {
return e.BlockNum, e.Errors["CurrentRoundStartBlock"]
}
func (e *StubClient) Paused() (bool, error) { return false, nil }

// Token

Expand Down Expand Up @@ -300,9 +306,11 @@ func (e *StubClient) GetTranscoderEarningsPoolForRound(addr common.Address, roun
func (e *StubClient) TranscoderPool() ([]*lpTypes.Transcoder, error) {
return e.Orchestrators, e.TranscoderPoolError
}
func (e *StubClient) IsActiveTranscoder() (bool, error) { return false, nil }
func (e *StubClient) GetTotalBonded() (*big.Int, error) { return big.NewInt(0), nil }
func (e *StubClient) GetTranscoderPoolSize() (*big.Int, error) { return e.PoolSize, nil }
func (e *StubClient) IsActiveTranscoder() (bool, error) { return false, nil }
func (e *StubClient) GetTotalBonded() (*big.Int, error) { return big.NewInt(0), nil }
func (e *StubClient) GetTranscoderPoolSize() (*big.Int, error) {
return e.PoolSize, e.Errors["GetTranscoderPoolSize"]
}
func (e *StubClient) ClaimedReserve(sender ethcommon.Address, claimant ethcommon.Address) (*big.Int, error) {
return e.ClaimedAmount, e.ClaimedReserveError
}
Expand Down
5 changes: 3 additions & 2 deletions eth/watchers/orchestratorwatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func TestOrchWatcher_HandleRoundEvent_CacheOrchestratorStake(t *testing.T) {
stubStore := &stubOrchestratorStore{}
lpEth := &eth.StubClient{
TotalStake: expStake,
Errors: make(map[string]error),
}
tw := &stubTimeWatcher{}

Expand All @@ -170,13 +171,13 @@ func TestOrchWatcher_HandleRoundEvent_CacheOrchestratorStake(t *testing.T) {
assert.Equal(stubStore.stake, int64(500000000))

// LivepeerEthClient.CurrentRound() error
lpEth.RoundsErr = errors.New("CurrentRound error")
lpEth.Errors["CurrentRound"] = errors.New("CurrentRound error")
errorLogsBefore := glog.Stats.Error.Lines()
tw.sink <- newRoundEvent
time.Sleep(20 * time.Millisecond)
errorLogsAfter := glog.Stats.Error.Lines()
assert.Equal(int64(1), errorLogsAfter-errorLogsBefore)
lpEth.RoundsErr = nil
lpEth.Errors["CurrentRound"] = nil

// OrchestratorStore.SelectOrchs error
stubStore.selectErr = errors.New("SelectOrchs error")
Expand Down
80 changes: 40 additions & 40 deletions eth/watchers/timewatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,44 @@ func NewTimeWatcher(roundsManagerAddr ethcommon.Address, watcher BlockWatcher, l
return nil, fmt.Errorf("error creating decoder: %v", err)
}

return &TimeWatcher{
tw := &TimeWatcher{
quit: make(chan struct{}),
watcher: watcher,
lpEth: lpEth,
dec: dec,
}, nil
}

lr, err := tw.lpEth.LastInitializedRound()
if err != nil {
return nil, fmt.Errorf("error fetching initial lastInitializedRound value err=%v", err)
}
bh, err := tw.lpEth.BlockHashForRound(lr)
if err != nil {
return nil, fmt.Errorf("error fetching initial lastInitializedBlockHash value err=%v", err)
}
num, err := tw.lpEth.CurrentRoundStartBlock()
if err != nil {
return nil, fmt.Errorf("error fetching current round start block err=%v", err)
}
tw.setLastInitializedRound(lr, bh, num)

lastSeenBlock, err := tw.watcher.GetLatestBlock()
if err != nil {
return nil, fmt.Errorf("error fetching last seen block err=%v", err)
}
blockNum := big.NewInt(0)
if lastSeenBlock != nil {
blockNum = lastSeenBlock.Number
}
tw.setLastSeenBlock(blockNum)

size, err := tw.lpEth.GetTranscoderPoolSize()
if err != nil {
return nil, fmt.Errorf("error fetching initial transcoderPoolSize err=%v", err)
}
tw.setTranscoderPoolSize(size)

return tw, nil
}

// LastInitializedRound gets the last initialized round from cache
Expand Down Expand Up @@ -93,6 +125,9 @@ func (tw *TimeWatcher) setLastInitializedRound(round *big.Int, hash [32]byte, st
func (tw *TimeWatcher) GetTranscoderPoolSize() *big.Int {
tw.mu.RLock()
defer tw.mu.RUnlock()
if tw.transcoderPoolSize == nil {
return big.NewInt(0)
}
return tw.transcoderPoolSize
}

Expand All @@ -116,34 +151,6 @@ func (tw *TimeWatcher) setLastSeenBlock(blockNum *big.Int) {

// Watch the blockwatch subscription for NewRound events
func (tw *TimeWatcher) Watch() error {
lr, err := tw.lpEth.LastInitializedRound()
if err != nil {
return fmt.Errorf("error fetching initial lastInitializedRound value err=%v", err)
}
bh, err := tw.lpEth.BlockHashForRound(lr)
if err != nil {
return fmt.Errorf("error fetching initial lastInitializedBlockHash value err=%v", err)
}
num, err := tw.lpEth.CurrentRoundStartBlock()
if err != nil {
return fmt.Errorf("error fetching current round start block %v=", err)
}
tw.setLastInitializedRound(lr, bh, num)

if err := tw.fetchAndSetTranscoderPoolSize(); err != nil {
return fmt.Errorf("error fetching initial transcoderPoolSize err=%v", err)
}

lastSeenBlock, err := tw.watcher.GetLatestBlock()
if err != nil {
return fmt.Errorf("error fetching last seen block err=%v", err)
}
blockNum := big.NewInt(0)
if lastSeenBlock != nil {
blockNum = lastSeenBlock.Number
}
tw.setLastSeenBlock(blockNum)

events := make(chan []*blockwatch.Event, 10)
sub := tw.watcher.Subscribe(events)
defer sub.Unsubscribe()
Expand Down Expand Up @@ -240,20 +247,13 @@ func (tw *TimeWatcher) handleLog(log types.Log) error {
}

// Get the active transcoder pool size when we receive a NewRound event
if err := tw.fetchAndSetTranscoderPoolSize(); err != nil {
size, err := tw.lpEth.GetTranscoderPoolSize()
if err != nil {
return err
}
tw.setTranscoderPoolSize(size)

tw.roundSubFeed.Send(log)

return nil
}

func (tw *TimeWatcher) fetchAndSetTranscoderPoolSize() error {
size, err := tw.lpEth.GetTranscoderPoolSize()
if err != nil {
return fmt.Errorf("error fetching initial transcoderPoolSize: %v", err)
}
tw.setTranscoderPoolSize(size)
return nil
}
110 changes: 82 additions & 28 deletions eth/watchers/timewatcher_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package watchers

import (
"errors"
"fmt"
"math/big"
"testing"
"time"
Expand Down Expand Up @@ -40,9 +40,14 @@ func TestSetAndGet_TranscoderPoolSize(t *testing.T) {
tw.setTranscoderPoolSize(size)
assert.Equal(size, tw.transcoderPoolSize)
assert.Equal(size, tw.GetTranscoderPoolSize())

// return big.Int(0) when nil
tw.setTranscoderPoolSize(nil)
assert.Nil(tw.transcoderPoolSize)
assert.Equal(big.NewInt(0), tw.GetTranscoderPoolSize())
}

func TestTimeWatcher_WatchAndStop(t *testing.T) {
func TestTimeWatcher_NewTimeWatcher(t *testing.T) {
assert := assert.New(t)
size := big.NewInt(50)
block := big.NewInt(10)
Expand All @@ -53,44 +58,103 @@ func TestTimeWatcher_WatchAndStop(t *testing.T) {
BlockNum: block,
BlockHashToReturn: hash,
Round: round,
Errors: make(map[string]error),
}
watcher := &stubBlockWatcher{
latestHeader: &blockwatch.MiniHeader{Number: block},
}

testErr := fmt.Errorf("error")

// Last InitializedRound error
lpEth.Errors["LastInitializedRound"] = testErr
expErr := fmt.Sprintf("error fetching initial lastInitializedRound value err=%v", testErr)
tw, err := NewTimeWatcher(stubRoundsManagerAddr, watcher, lpEth)
assert.Nil(err)
assert.Nil(tw)
assert.EqualError(err, expErr)
lpEth.Errors["LastInitializedRound"] = nil

header := defaultMiniHeader()
newRoundEvent := newStubNewRoundLog()
// BlockHashForRound error
lpEth.Errors["BlockHashForRound"] = testErr
expErr = fmt.Sprintf("error fetching initial lastInitializedBlockHash value err=%v", testErr)
tw, err = NewTimeWatcher(stubRoundsManagerAddr, watcher, lpEth)
assert.Nil(tw)
assert.EqualError(err, expErr)
lpEth.Errors["BlockHashForRound"] = nil

header.Logs = append(header.Logs, newRoundEvent)
blockEvent := &blockwatch.Event{
Type: blockwatch.Added,
BlockHeader: header,
}
// CurrentRoundStartBlock error
lpEth.Errors["CurrentRoundStartBlock"] = testErr
expErr = fmt.Sprintf("error fetching current round start block err=%v", testErr)
tw, err = NewTimeWatcher(stubRoundsManagerAddr, watcher, lpEth)
assert.Nil(tw)
assert.EqualError(err, expErr)
lpEth.Errors["CurrentRoundStartBlock"] = nil

go tw.Watch()
time.Sleep(2 * time.Millisecond)
// Check state initialization
// GetLastestBlock error
watcher.err = fmt.Errorf("GetLatestBlock error")
expErr = fmt.Sprintf("error fetching last seen block err=%v", watcher.err)
tw, err = NewTimeWatcher(stubRoundsManagerAddr, watcher, lpEth)
assert.Nil(tw)
assert.EqualError(err, expErr)
watcher.err = nil

// TranscoderPoolSize error
lpEth.Errors["GetTranscoderPoolSize"] = testErr
expErr = fmt.Sprintf("error fetching initial transcoderPoolSize err=%v", testErr)
tw, err = NewTimeWatcher(stubRoundsManagerAddr, watcher, lpEth)
assert.Nil(tw)
assert.EqualError(err, expErr)
lpEth.Errors["GetTranscoderPoolSize"] = nil

tw, err = NewTimeWatcher(stubRoundsManagerAddr, watcher, lpEth)
assert.Nil(err)
bh := tw.LastInitializedBlockHash()
assert.Equal(hash, common.BytesToHash(bh[:]))
assert.Equal(round, tw.LastInitializedRound())
assert.Equal(size, tw.GetTranscoderPoolSize())
assert.Equal(block, tw.LastSeenBlock())
tw.Stop()

// if watcher.GetLatestBlock() == nil, initialise lastSeenBlock to big.NewInt(0)
watcher.latestHeader = nil
tw, err = NewTimeWatcher(stubRoundsManagerAddr, watcher, lpEth)
assert.Nil(err)
go tw.Watch()
time.Sleep(2 * time.Millisecond)
// Check state initialization
bh = tw.LastInitializedBlockHash()
assert.Equal(hash, common.BytesToHash(bh[:]))
assert.Equal(round, tw.LastInitializedRound())
assert.Equal(size, tw.GetTranscoderPoolSize())
assert.Equal(big.NewInt(0), tw.LastSeenBlock())
}

func TestTimeWatcher_WatchAndStop(t *testing.T) {
assert := assert.New(t)
size := big.NewInt(50)
block := big.NewInt(10)
round := big.NewInt(1)
hash := ethcommon.HexToHash("foo")
lpEth := &eth.StubClient{
PoolSize: size,
BlockNum: block,
BlockHashToReturn: hash,
Round: round,
Errors: make(map[string]error),
}
watcher := &stubBlockWatcher{
latestHeader: &blockwatch.MiniHeader{Number: block},
}
tw, err := NewTimeWatcher(stubRoundsManagerAddr, watcher, lpEth)
require.Nil(t, err)

header := defaultMiniHeader()
newRoundEvent := newStubNewRoundLog()

header.Logs = append(header.Logs, newRoundEvent)
blockEvent := &blockwatch.Event{
Type: blockwatch.Added,
BlockHeader: header,
}

go tw.Watch()
time.Sleep(2 * time.Millisecond)

// New Round event
watcher.sink <- []*blockwatch.Event{blockEvent}
Expand Down Expand Up @@ -123,7 +187,7 @@ func TestTimeWatcher_WatchAndStop(t *testing.T) {
watcher.sink <- []*blockwatch.Event{blockEvent}
time.Sleep(2 * time.Millisecond)
bhForRound = tw.LastInitializedBlockHash()
assert.Equal(hash, common.BytesToHash(bh[:]))
assert.Equal(hash, common.BytesToHash(bhForRound[:]))
assert.Equal(round, tw.LastInitializedRound())
assert.Equal(size, tw.GetTranscoderPoolSize())
assert.Equal(header.Number, tw.LastSeenBlock())
Expand All @@ -132,16 +196,6 @@ func TestTimeWatcher_WatchAndStop(t *testing.T) {
tw.Stop()
time.Sleep(2 * time.Millisecond)
assert.True(watcher.sub.unsubscribed)

// Test watch error when RPC calls fail
tw = &TimeWatcher{
lpEth: &eth.StubClient{
RoundsErr: errors.New("timewatcher error"),
},
}
err = tw.Watch()
assert.NotNil(err)
assert.Contains(err.Error(), "timewatcher error")
}

func TestTimeWatcher_HandleLog(t *testing.T) {
Expand Down