Skip to content

Commit

Permalink
eth: move timewatcher state initialisation into constructor to be syn…
Browse files Browse the repository at this point in the history
…chronous
  • Loading branch information
kyriediculous committed Apr 12, 2021
1 parent 3610c2e commit aeeab38
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 74 deletions.
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

- \#1810 Display "n/a" in CLI when max gas price isn't specified (@kyriediculous)
- \#1827 Limit the maximum size of a segment read over HTTP (@jailuthra)
- \#1833 Prevent nil pointer errors when fetching transcoder pool size (@kyriediculous)

#### Orchestrator

Expand Down
34 changes: 21 additions & 13 deletions eth/stubclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ type StubClient struct {
ProcessHistoricalUnbondError error
Orchestrators []*lpTypes.Transcoder
Round *big.Int
RoundsErr error
SenderInfo *pm.SenderInfo
PoolSize *big.Int
ClaimedAmount *big.Int
Expand All @@ -213,28 +212,35 @@ type StubClient struct {
TranscoderPoolError error
RoundLocked bool
RoundLockedErr error
Errors map[string]error
}

type stubTranscoder struct {
ServiceURI string
}

func (e *StubClient) Setup(password string, gasLimit uint64, gasPrice *big.Int) error { return nil }
func (e *StubClient) Account() accounts.Account { return accounts.Account{Address: e.TranscoderAddress} }
func (e *StubClient) Backend() (Backend, error) { return nil, nil }
func (e *StubClient) Account() accounts.Account {
return accounts.Account{Address: e.TranscoderAddress}
}
func (e *StubClient) Backend() (Backend, error) { return nil, nil }

// Rounds

func (e *StubClient) InitializeRound() (*types.Transaction, error) { return nil, nil }
func (e *StubClient) CurrentRound() (*big.Int, error) { return big.NewInt(0), e.RoundsErr }
func (e *StubClient) LastInitializedRound() (*big.Int, error) { return e.Round, e.RoundsErr }
func (e *StubClient) CurrentRound() (*big.Int, error) { return big.NewInt(0), e.Errors["CurrentRound"] }
func (e *StubClient) LastInitializedRound() (*big.Int, error) {
return e.Round, e.Errors["LastInitializedRound"]
}
func (e *StubClient) BlockHashForRound(round *big.Int) ([32]byte, error) {
return e.BlockHashToReturn, nil
return e.BlockHashToReturn, e.Errors["BlockHashForRound"]
}
func (e *StubClient) CurrentRoundInitialized() (bool, error) { return false, nil }
func (e *StubClient) CurrentRoundLocked() (bool, error) { return e.RoundLocked, e.RoundLockedErr }
func (e *StubClient) CurrentRoundStartBlock() (*big.Int, error) { return nil, nil }
func (e *StubClient) Paused() (bool, error) { return false, nil }
func (e *StubClient) CurrentRoundInitialized() (bool, error) { return false, nil }
func (e *StubClient) CurrentRoundLocked() (bool, error) { return e.RoundLocked, e.RoundLockedErr }
func (e *StubClient) CurrentRoundStartBlock() (*big.Int, error) {
return e.BlockNum, e.Errors["CurrentRoundStartBlock"]
}
func (e *StubClient) Paused() (bool, error) { return false, nil }

// Token

Expand Down Expand Up @@ -300,9 +306,11 @@ func (e *StubClient) GetTranscoderEarningsPoolForRound(addr common.Address, roun
func (e *StubClient) TranscoderPool() ([]*lpTypes.Transcoder, error) {
return e.Orchestrators, e.TranscoderPoolError
}
func (e *StubClient) IsActiveTranscoder() (bool, error) { return false, nil }
func (e *StubClient) GetTotalBonded() (*big.Int, error) { return big.NewInt(0), nil }
func (e *StubClient) GetTranscoderPoolSize() (*big.Int, error) { return e.PoolSize, nil }
func (e *StubClient) IsActiveTranscoder() (bool, error) { return false, nil }
func (e *StubClient) GetTotalBonded() (*big.Int, error) { return big.NewInt(0), nil }
func (e *StubClient) GetTranscoderPoolSize() (*big.Int, error) {
return e.PoolSize, e.Errors["GetTranscoderPoolSize"]
}
func (e *StubClient) ClaimedReserve(sender ethcommon.Address, claimant ethcommon.Address) (*big.Int, error) {
return e.ClaimedAmount, e.ClaimedReserveError
}
Expand Down
5 changes: 3 additions & 2 deletions eth/watchers/orchestratorwatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func TestOrchWatcher_HandleRoundEvent_CacheOrchestratorStake(t *testing.T) {
stubStore := &stubOrchestratorStore{}
lpEth := &eth.StubClient{
TotalStake: expStake,
Errors: make(map[string]error),
}
tw := &stubTimeWatcher{}

Expand All @@ -170,13 +171,13 @@ func TestOrchWatcher_HandleRoundEvent_CacheOrchestratorStake(t *testing.T) {
assert.Equal(stubStore.stake, int64(500000000))

// LivepeerEthClient.CurrentRound() error
lpEth.RoundsErr = errors.New("CurrentRound error")
lpEth.Errors["CurrentRound"] = errors.New("CurrentRound error")
errorLogsBefore := glog.Stats.Error.Lines()
tw.sink <- newRoundEvent
time.Sleep(20 * time.Millisecond)
errorLogsAfter := glog.Stats.Error.Lines()
assert.Equal(int64(1), errorLogsAfter-errorLogsBefore)
lpEth.RoundsErr = nil
lpEth.Errors["CurrentRound"] = nil

// OrchestratorStore.SelectOrchs error
stubStore.selectErr = errors.New("SelectOrchs error")
Expand Down
67 changes: 36 additions & 31 deletions eth/watchers/timewatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,42 @@ func NewTimeWatcher(roundsManagerAddr ethcommon.Address, watcher BlockWatcher, l
return nil, fmt.Errorf("error creating decoder: %v", err)
}

return &TimeWatcher{
tw := &TimeWatcher{
quit: make(chan struct{}),
watcher: watcher,
lpEth: lpEth,
dec: dec,
}, nil
}

lr, err := tw.lpEth.LastInitializedRound()
if err != nil {
return nil, fmt.Errorf("error fetching initial lastInitializedRound value err=%v", err)
}
bh, err := tw.lpEth.BlockHashForRound(lr)
if err != nil {
return nil, fmt.Errorf("error fetching initial lastInitializedBlockHash value err=%v", err)
}
num, err := tw.lpEth.CurrentRoundStartBlock()
if err != nil {
return nil, fmt.Errorf("error fetching current round start block %v=", err)
}
tw.setLastInitializedRound(lr, bh, num)

lastSeenBlock, err := tw.watcher.GetLatestBlock()
if err != nil {
return nil, fmt.Errorf("error fetching last seen block err=%v", err)
}
blockNum := big.NewInt(0)
if lastSeenBlock != nil {
blockNum = lastSeenBlock.Number
}
tw.setLastSeenBlock(blockNum)

if err := tw.fetchAndSetTranscoderPoolSize(); err != nil {
return nil, fmt.Errorf("error fetching initial transcoderPoolSize err=%v", err)
}

return tw, nil
}

// LastInitializedRound gets the last initialized round from cache
Expand Down Expand Up @@ -93,6 +123,9 @@ func (tw *TimeWatcher) setLastInitializedRound(round *big.Int, hash [32]byte, st
func (tw *TimeWatcher) GetTranscoderPoolSize() *big.Int {
tw.mu.RLock()
defer tw.mu.RUnlock()
if tw.transcoderPoolSize == nil {
return big.NewInt(0)
}
return tw.transcoderPoolSize
}

Expand All @@ -116,34 +149,6 @@ func (tw *TimeWatcher) setLastSeenBlock(blockNum *big.Int) {

// Watch the blockwatch subscription for NewRound events
func (tw *TimeWatcher) Watch() error {
lr, err := tw.lpEth.LastInitializedRound()
if err != nil {
return fmt.Errorf("error fetching initial lastInitializedRound value err=%v", err)
}
bh, err := tw.lpEth.BlockHashForRound(lr)
if err != nil {
return fmt.Errorf("error fetching initial lastInitializedBlockHash value err=%v", err)
}
num, err := tw.lpEth.CurrentRoundStartBlock()
if err != nil {
return fmt.Errorf("error fetching current round start block %v=", err)
}
tw.setLastInitializedRound(lr, bh, num)

if err := tw.fetchAndSetTranscoderPoolSize(); err != nil {
return fmt.Errorf("error fetching initial transcoderPoolSize err=%v", err)
}

lastSeenBlock, err := tw.watcher.GetLatestBlock()
if err != nil {
return fmt.Errorf("error fetching last seen block err=%v", err)
}
blockNum := big.NewInt(0)
if lastSeenBlock != nil {
blockNum = lastSeenBlock.Number
}
tw.setLastSeenBlock(blockNum)

events := make(chan []*blockwatch.Event, 10)
sub := tw.watcher.Subscribe(events)
defer sub.Unsubscribe()
Expand Down Expand Up @@ -252,7 +257,7 @@ func (tw *TimeWatcher) handleLog(log types.Log) error {
func (tw *TimeWatcher) fetchAndSetTranscoderPoolSize() error {
size, err := tw.lpEth.GetTranscoderPoolSize()
if err != nil {
return fmt.Errorf("error fetching initial transcoderPoolSize: %v", err)
return err
}
tw.setTranscoderPoolSize(size)
return nil
Expand Down
110 changes: 82 additions & 28 deletions eth/watchers/timewatcher_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package watchers

import (
"errors"
"fmt"
"math/big"
"testing"
"time"
Expand Down Expand Up @@ -40,9 +40,14 @@ func TestSetAndGet_TranscoderPoolSize(t *testing.T) {
tw.setTranscoderPoolSize(size)
assert.Equal(size, tw.transcoderPoolSize)
assert.Equal(size, tw.GetTranscoderPoolSize())

// return big.Int(0) when nil
tw.setTranscoderPoolSize(nil)
assert.Nil(tw.transcoderPoolSize)
assert.Equal(big.NewInt(0), tw.GetTranscoderPoolSize())
}

func TestTimeWatcher_WatchAndStop(t *testing.T) {
func TestTimeWatcher_NewTimeWatcher(t *testing.T) {
assert := assert.New(t)
size := big.NewInt(50)
block := big.NewInt(10)
Expand All @@ -53,44 +58,103 @@ func TestTimeWatcher_WatchAndStop(t *testing.T) {
BlockNum: block,
BlockHashToReturn: hash,
Round: round,
Errors: make(map[string]error),
}
watcher := &stubBlockWatcher{
latestHeader: &blockwatch.MiniHeader{Number: block},
}

testErr := fmt.Errorf("error")

// Last InitializedRound error
lpEth.Errors["LastInitializedRound"] = testErr
expErr := fmt.Sprintf("error fetching initial lastInitializedRound value err=%v", testErr)
tw, err := NewTimeWatcher(stubRoundsManagerAddr, watcher, lpEth)
assert.Nil(err)
assert.Nil(tw)
assert.EqualError(err, expErr)
lpEth.Errors["LastInitializedRound"] = nil

header := defaultMiniHeader()
newRoundEvent := newStubNewRoundLog()
// BlockHashForRound error
lpEth.Errors["BlockHashForRound"] = testErr
expErr = fmt.Sprintf("error fetching initial lastInitializedBlockHash value err=%v", testErr)
tw, err = NewTimeWatcher(stubRoundsManagerAddr, watcher, lpEth)
assert.Nil(tw)
assert.EqualError(err, expErr)
lpEth.Errors["BlockHashForRound"] = nil

header.Logs = append(header.Logs, newRoundEvent)
blockEvent := &blockwatch.Event{
Type: blockwatch.Added,
BlockHeader: header,
}
// CurrentRoundStartBlock error
lpEth.Errors["CurrentRoundStartBlock"] = testErr
expErr = fmt.Sprintf("error fetching current round start block %v=", testErr)
tw, err = NewTimeWatcher(stubRoundsManagerAddr, watcher, lpEth)
assert.Nil(tw)
assert.EqualError(err, expErr)
lpEth.Errors["CurrentRoundStartBlock"] = nil

go tw.Watch()
time.Sleep(2 * time.Millisecond)
// Check state initialization
// GetLastestBlock error
watcher.err = fmt.Errorf("GetLatestBlock error")
expErr = fmt.Sprintf("error fetching last seen block err=%v", watcher.err)
tw, err = NewTimeWatcher(stubRoundsManagerAddr, watcher, lpEth)
assert.Nil(tw)
assert.EqualError(err, expErr)
watcher.err = nil

// TranscoderPoolSize error
lpEth.Errors["GetTranscoderPoolSize"] = testErr
expErr = fmt.Sprintf("error fetching initial transcoderPoolSize err=%v", testErr)
tw, err = NewTimeWatcher(stubRoundsManagerAddr, watcher, lpEth)
assert.Nil(tw)
assert.EqualError(err, expErr)
lpEth.Errors["GetTranscoderPoolSize"] = nil

tw, err = NewTimeWatcher(stubRoundsManagerAddr, watcher, lpEth)
assert.Nil(err)
bh := tw.LastInitializedBlockHash()
assert.Equal(hash, common.BytesToHash(bh[:]))
assert.Equal(round, tw.LastInitializedRound())
assert.Equal(size, tw.GetTranscoderPoolSize())
assert.Equal(block, tw.LastSeenBlock())
tw.Stop()

// if watcher.GetLatestBlock() == nil, initialise lastSeenBlock to big.NewInt(0)
watcher.latestHeader = nil
tw, err = NewTimeWatcher(stubRoundsManagerAddr, watcher, lpEth)
assert.Nil(err)
go tw.Watch()
time.Sleep(2 * time.Millisecond)
// Check state initialization
bh = tw.LastInitializedBlockHash()
assert.Equal(hash, common.BytesToHash(bh[:]))
assert.Equal(round, tw.LastInitializedRound())
assert.Equal(size, tw.GetTranscoderPoolSize())
assert.Equal(big.NewInt(0), tw.LastSeenBlock())
}

func TestTimeWatcher_WatchAndStop(t *testing.T) {
assert := assert.New(t)
size := big.NewInt(50)
block := big.NewInt(10)
round := big.NewInt(1)
hash := ethcommon.HexToHash("foo")
lpEth := &eth.StubClient{
PoolSize: size,
BlockNum: block,
BlockHashToReturn: hash,
Round: round,
Errors: make(map[string]error),
}
watcher := &stubBlockWatcher{
latestHeader: &blockwatch.MiniHeader{Number: block},
}
tw, err := NewTimeWatcher(stubRoundsManagerAddr, watcher, lpEth)
require.Nil(t, err)

header := defaultMiniHeader()
newRoundEvent := newStubNewRoundLog()

header.Logs = append(header.Logs, newRoundEvent)
blockEvent := &blockwatch.Event{
Type: blockwatch.Added,
BlockHeader: header,
}

go tw.Watch()
time.Sleep(2 * time.Millisecond)

// New Round event
watcher.sink <- []*blockwatch.Event{blockEvent}
Expand Down Expand Up @@ -123,7 +187,7 @@ func TestTimeWatcher_WatchAndStop(t *testing.T) {
watcher.sink <- []*blockwatch.Event{blockEvent}
time.Sleep(2 * time.Millisecond)
bhForRound = tw.LastInitializedBlockHash()
assert.Equal(hash, common.BytesToHash(bh[:]))
assert.Equal(hash, common.BytesToHash(bhForRound[:]))
assert.Equal(round, tw.LastInitializedRound())
assert.Equal(size, tw.GetTranscoderPoolSize())
assert.Equal(header.Number, tw.LastSeenBlock())
Expand All @@ -132,16 +196,6 @@ func TestTimeWatcher_WatchAndStop(t *testing.T) {
tw.Stop()
time.Sleep(2 * time.Millisecond)
assert.True(watcher.sub.unsubscribed)

// Test watch error when RPC calls fail
tw = &TimeWatcher{
lpEth: &eth.StubClient{
RoundsErr: errors.New("timewatcher error"),
},
}
err = tw.Watch()
assert.NotNil(err)
assert.Contains(err.Error(), "timewatcher error")
}

func TestTimeWatcher_HandleLog(t *testing.T) {
Expand Down

0 comments on commit aeeab38

Please sign in to comment.