Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unique ID for every transfer (multiple token transfers per tx) and concurrent erc20 downloads #1482

Merged
merged 12 commits into from
Jun 13, 2019
Merged
75 changes: 75 additions & 0 deletions services/wallet/async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package wallet

import (
"context"
"sync"
"time"
)

type Command interface {
Run(context.Context)
}

type FiniteCommand struct {
Interval time.Duration
Runable func(context.Context) error
}

func (c FiniteCommand) Run(ctx context.Context) {
ticker := time.NewTicker(c.Interval)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := c.Runable(ctx)
if err == nil {
return
}
}
}
}

type InfiniteCommand struct {
Interval time.Duration
Runable func(context.Context) error
}

func (c InfiniteCommand) Run(ctx context.Context) {
ticker := time.NewTicker(c.Interval)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
_ = c.Runable(ctx)
}
}
}

func NewGroup() *Group {
ctx, cancel := context.WithCancel(context.Background())
return &Group{
ctx: ctx,
cancel: cancel,
}
}

type Group struct {
ctx context.Context
cancel func()
wg sync.WaitGroup
}

func (g *Group) Add(cmd Command) {
g.wg.Add(1)
go func() {
cmd.Run(g.ctx)
g.wg.Done()
}()
}

func (g *Group) Stop() {
g.cancel()
g.wg.Wait()
}
79 changes: 8 additions & 71 deletions services/wallet/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package wallet

import (
"context"
"errors"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -12,74 +12,6 @@ import (
"github.com/ethereum/go-ethereum/log"
)

type Command interface {
Run(context.Context)
}

type FiniteCommand struct {
Interval time.Duration
Runable func(context.Context) error
}

func (c FiniteCommand) Run(ctx context.Context) {
ticker := time.NewTicker(c.Interval)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := c.Runable(ctx)
if err == nil {
return
}
}
}
}

type InfiniteCommand struct {
Interval time.Duration
Runable func(context.Context) error
}

func (c InfiniteCommand) Run(ctx context.Context) {
ticker := time.NewTicker(c.Interval)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
_ = c.Runable(ctx)
}
}
}

func NewGroup() *Group {
ctx, cancel := context.WithCancel(context.Background())
return &Group{
ctx: ctx,
cancel: cancel,
}
}

type Group struct {
ctx context.Context
cancel func()
wg sync.WaitGroup
}

func (g *Group) Add(cmd Command) {
g.wg.Add(1)
go func() {
cmd.Run(g.ctx)
g.wg.Done()
}()
}

func (g *Group) Stop() {
g.cancel()
g.wg.Wait()
}

type ethHistoricalCommand struct {
db *Database
eth TransferDownloader
Expand Down Expand Up @@ -115,12 +47,17 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) {
concurrent := NewConcurrentDownloader(ctx)
start := time.Now()
downloadEthConcurrently(concurrent, c.client, c.eth, c.address, zero, c.previous.Number)
concurrent.Wait()
select {
case <-concurrent.WaitAsync():
case <-ctx.Done():
log.Error("eth downloader is stuck")
return errors.New("eth downloader is stuck")
}
if concurrent.Error() != nil {
log.Error("failed to dowloader transfers using concurrent downloader", "error", err)
return concurrent.Error()
}
transfers := concurrent.Transfers()
transfers := concurrent.Get()
log.Info("eth historical downloader finished succesfully", "total transfers", len(transfers), "time", time.Since(start))
// TODO(dshulyak) insert 0 block number with transfers
err = c.db.ProcessTranfers(transfers, headersFromTransfers(transfers), nil, ethSync)
Expand Down
88 changes: 62 additions & 26 deletions services/wallet/concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,60 @@ import (

// NewConcurrentDownloader creates ConcurrentDownloader instance.
func NewConcurrentDownloader(ctx context.Context) *ConcurrentDownloader {
runner := NewConcurrentRunner(ctx)
result := &Result{}
return &ConcurrentDownloader{runner, result}
}

type ConcurrentDownloader struct {
*ConcurrentRunner
*Result
}

type Result struct {
mu sync.Mutex
transfers []Transfer
}

func (r *Result) Add(transfers ...Transfer) {
r.mu.Lock()
defer r.mu.Unlock()
r.transfers = append(r.transfers, transfers...)
}

func (r *Result) Get() []Transfer {
r.mu.Lock()
defer r.mu.Unlock()
rst := make([]Transfer, len(r.transfers))
copy(rst, r.transfers)
return rst
}

func NewConcurrentRunner(ctx context.Context) *ConcurrentRunner {
// TODO(dshulyak) rename to atomic group and keep interface consistent with regular Group.
ctx, cancel := context.WithCancel(ctx)
return &ConcurrentDownloader{
return &ConcurrentRunner{
ctx: ctx,
cancel: cancel,
}
}

// ConcurrentDownloader manages downloaders life cycle.
type ConcurrentDownloader struct {
// ConcurrentRunner runs group atomically.
type ConcurrentRunner struct {
ctx context.Context
cancel func()
wg sync.WaitGroup

mu sync.Mutex
results []Transfer
error error
mu sync.Mutex
error error
}

// Go spawns function in a goroutine and stores results or errors.
func (d *ConcurrentDownloader) Go(f func(context.Context) ([]Transfer, error)) {
func (d *ConcurrentRunner) Go(f func(context.Context) error) {
d.wg.Add(1)
go func() {
defer d.wg.Done()
transfers, err := f(d.ctx)
err := f(d.ctx)
d.mu.Lock()
defer d.mu.Unlock()
if err != nil {
Expand All @@ -46,29 +76,30 @@ func (d *ConcurrentDownloader) Go(f func(context.Context) ([]Transfer, error)) {
d.cancel()
return
}
d.results = append(d.results, transfers...)
}()
}

// Transfers returns collected transfers. To get all results should be called after Wait.
func (d *ConcurrentDownloader) Transfers() []Transfer {
d.mu.Lock()
defer d.mu.Unlock()
rst := make([]Transfer, len(d.results))
copy(rst, d.results)
return rst
}

// Wait for all downloaders to finish.
func (d *ConcurrentDownloader) Wait() {
func (d *ConcurrentRunner) Wait() {
d.wg.Wait()
if d.Error() == nil {
d.mu.Lock()
defer d.mu.Unlock()
d.cancel()
}
}

func (d *ConcurrentRunner) WaitAsync() <-chan struct{} {
ch := make(chan struct{})
go func() {
d.Wait()
close(ch)
}()
return ch
}

// Error stores an error that was reported by any of the downloader. Should be called after Wait.
func (d *ConcurrentDownloader) Error() error {
func (d *ConcurrentRunner) Error() error {
d.mu.Lock()
defer d.mu.Unlock()
return d.error
Expand All @@ -80,29 +111,34 @@ type TransferDownloader interface {
}

func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, downloader TransferDownloader, account common.Address, low, high *big.Int) {
c.Go(func(ctx context.Context) ([]Transfer, error) {
c.Go(func(ctx context.Context) error {
log.Debug("eth transfers comparing blocks", "low", low, "high", high)
lb, err := client.BalanceAt(ctx, account, low)
if err != nil {
return nil, err
return err
}
hb, err := client.BalanceAt(ctx, account, high)
if err != nil {
return nil, err
return err
}
if lb.Cmp(hb) == 0 {
log.Debug("balances are equal", "low", low, "high", high)
return nil, nil
return nil
}
if new(big.Int).Sub(high, low).Cmp(one) == 0 {
log.Debug("higher block is a parent", "low", low, "high", high)
return downloader.GetTransfersByNumber(ctx, high)
transfers, err := downloader.GetTransfersByNumber(ctx, high)
if err != nil {
return err
}
c.Add(transfers...)
return nil
}
mid := new(big.Int).Add(low, high)
mid = mid.Div(mid, two)
log.Debug("balances are not equal spawn two concurrent downloaders", "low", low, "mid", mid, "high", high)
downloadEthConcurrently(c, client, downloader, account, low, mid)
downloadEthConcurrently(c, client, downloader, account, mid, high)
return nil, nil
return nil
})
}
22 changes: 12 additions & 10 deletions services/wallet/concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ import (
func TestConcurrentErrorInterrupts(t *testing.T) {
concurrent := NewConcurrentDownloader(context.Background())
var interrupted bool
concurrent.Go(func(ctx context.Context) ([]Transfer, error) {
concurrent.Go(func(ctx context.Context) error {
select {
case <-ctx.Done():
interrupted = true
case <-time.After(10 * time.Second):
}
return nil, nil
return nil
})
err := errors.New("interrupt")
concurrent.Go(func(ctx context.Context) ([]Transfer, error) {
return nil, err
concurrent.Go(func(ctx context.Context) error {
return err
})
concurrent.Wait()
require.True(t, interrupted)
Expand All @@ -34,14 +34,16 @@ func TestConcurrentErrorInterrupts(t *testing.T) {

func TestConcurrentCollectsTransfers(t *testing.T) {
concurrent := NewConcurrentDownloader(context.Background())
concurrent.Go(func(context.Context) ([]Transfer, error) {
return []Transfer{{}}, nil
concurrent.Go(func(context.Context) error {
concurrent.Add(Transfer{})
return nil
})
concurrent.Go(func(context.Context) ([]Transfer, error) {
return []Transfer{{}}, nil
concurrent.Go(func(context.Context) error {
concurrent.Add(Transfer{})
return nil
})
concurrent.Wait()
require.Len(t, concurrent.Transfers(), 2)
require.Len(t, concurrent.Get(), 2)
}

type balancesFixture []*big.Int
Expand Down Expand Up @@ -111,7 +113,7 @@ func TestConcurrentEthDownloader(t *testing.T) {
common.Address{}, zero, tc.options.last)
concurrent.Wait()
require.NoError(t, concurrent.Error())
rst := concurrent.Transfers()
rst := concurrent.Get()
require.Len(t, rst, len(tc.options.result))
sort.Slice(rst, func(i, j int) bool {
return rst[i].BlockNumber.Cmp(rst[j].BlockNumber) < 0
Expand Down
Loading