Skip to content

Commit

Permalink
Apply several suggestions after review
Browse files Browse the repository at this point in the history
- More debug info when node is stopped
- low keys in events
- send history event only if new transfers were found
  • Loading branch information
dshulyak committed Jun 3, 2019
1 parent 8f5e869 commit 7dfb23d
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 31 deletions.
12 changes: 6 additions & 6 deletions services/wallet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ Client expected to request transfers starting from received block.
{
"type": "wallet",
"event": {
"Type": "newblock",
"BlockNumber": 10
"type": "newblock",
"blockNumber": 10
}
}
```
Expand All @@ -100,8 +100,8 @@ Client expected to request new transfers from received block and replace transfe
{
"type": "wallet",
"event": {
"Type": "reorg",
"BlockNumber": 10
"type": "reorg",
"blockNumber": 10
}
}
```
Expand All @@ -115,8 +115,8 @@ Client expected to request transfers starting from this new block till the earli
{
"type": "wallet",
"event": {
"Type": "history",
"BlockNumber": 10
"type": "history",
"blockNumber": 10
}
}
```
4 changes: 2 additions & 2 deletions services/wallet/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ const (

// Event is a type for wallet events.
type Event struct {
Type EventType
BlockNumber *big.Int
Type EventType `json:"type"`
BlockNumber *big.Int `json:"blockNumber"`
}
50 changes: 28 additions & 22 deletions services/wallet/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,17 @@ type Reactor struct {
eth *ETHTransferDownloader
erc20 *ERC20TransfersDownloader

wg sync.WaitGroup
quit chan struct{}
wg sync.WaitGroup
ctx context.Context
cancel func()
}

// Start runs reactor loop in background.
func (r *Reactor) Start() error {
if r.quit != nil {
if r.ctx != nil {
return errors.New("already running")
}
r.quit = make(chan struct{})
r.ctx, r.cancel = context.WithCancel(context.Background())

r.wg.Add(1)
go func() {
Expand All @@ -104,12 +105,13 @@ func (r *Reactor) Start() error {

// Stop stops reactor loop and waits till it exits.
func (r *Reactor) Stop() {
if r.quit == nil {
if r.ctx == nil {
return
}
close(r.quit)
r.cancel()
r.wg.Wait()
r.quit = nil
r.cancel = nil
r.ctx = nil
}

func (r *Reactor) erc20HistoricalLoop() {
Expand All @@ -123,7 +125,7 @@ func (r *Reactor) erc20HistoricalLoop() {
defer ticker.Stop()
for {
select {
case <-r.quit:
case <-r.ctx.Done():
return
case <-ticker.C:
if iterator == nil {
Expand All @@ -147,10 +149,12 @@ func (r *Reactor) erc20HistoricalLoop() {
log.Error("failed to save downloaded erc20 transfers", "error", err)
break
}
r.feed.Send(Event{
Type: EventNewHistory,
BlockNumber: iterator.Header().Number,
})
if len(transfers) > 0 {
r.feed.Send(Event{
Type: EventNewHistory,
BlockNumber: iterator.Header().Number,
})
}
}
if iterator.Finished() {
log.Info("wallet historical downloader for erc20 transfers finished")
Expand All @@ -173,7 +177,7 @@ func (r *Reactor) ethHistoricalLoop() {
defer ticker.Stop()
for {
select {
case <-r.quit:
case <-r.ctx.Done():
return
case <-ticker.C:
if iterator == nil {
Expand All @@ -197,10 +201,12 @@ func (r *Reactor) ethHistoricalLoop() {
log.Error("failed to save downloaded eth transfers", "error", err)
break
}
r.feed.Send(Event{
Type: EventNewHistory,
BlockNumber: iterator.Header().Number,
})
if len(transfers) > 0 {
r.feed.Send(Event{
Type: EventNewHistory,
BlockNumber: iterator.Header().Number,
})
}
}
if iterator.Finished() {
log.Info("wallet historical downloader for eth transfers finished")
Expand All @@ -221,7 +227,7 @@ func (r *Reactor) newTransfersLoop() {
defer ticker.Stop()
for {
select {
case <-r.quit:
case <-r.ctx.Done():
return
case <-ticker.C:
if previous == nil {
Expand All @@ -232,15 +238,15 @@ func (r *Reactor) newTransfersLoop() {
}
}
num = num.Add(previous.Number, one)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(r.ctx, 5*time.Second)
latest, err := r.client.HeaderByNumber(ctx, num)
cancel()
if err != nil {
log.Warn("failed to get latest block", "number", num, "error", err)
continue
}
log.Debug("reactor received new block", "header", latest.Hash())
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel = context.WithTimeout(r.ctx, 10*time.Second)
added, removed, err := r.onNewBlock(ctx, previous, latest)
cancel()
if err != nil {
Expand Down Expand Up @@ -284,13 +290,13 @@ func (r *Reactor) newTransfersLoop() {

// getTransfers fetches erc20 and eth transfers and returns single slice with them.
func (r *Reactor) getTransfers(header *DBHeader) ([]Transfer, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(r.ctx, 5*time.Second)
ethT, err := r.eth.GetTransfers(ctx, header)
cancel()
if err != nil {
return nil, err
}
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel = context.WithTimeout(r.ctx, 5*time.Second)
erc20T, err := r.erc20.GetTransfers(ctx, header)
cancel()
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion services/wallet/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
)
Expand Down Expand Up @@ -62,8 +63,12 @@ func (s *Service) StopReactor() error {

// Stop reactor, signals transmitter and close db.
func (s *Service) Stop() error {
log.Info("wallet stopping reactor")
err := s.StopReactor()
log.Info("wallet stopping signals")
s.signals.Stop()
return s.StopReactor()
log.Info("wallet stopped")
return err
}

// APIs returns list of available RPC APIs.
Expand Down
5 changes: 5 additions & 0 deletions t/devtests/tranfers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ func (s *TransfersSuite) SetupTest() {
s.Address = common.HexToAddress(info.WalletAddress)
}

func (s *TransfersSuite) TearDownTest() {
s.Require().NoError(s.backend.Logout())
s.DevNodeSuite.TearDownTest()
}

func (s *TransfersSuite) getAllTranfers() (rst []wallet.Transfer, err error) {
return rst, s.Local.Call(&rst, "wallet_getTransfers", (*hexutil.Big)(big.NewInt(0)))
}
Expand Down

0 comments on commit 7dfb23d

Please sign in to comment.