From 55744d3effeb23f339aa20834deca845f9360cfd Mon Sep 17 00:00:00 2001 From: Dmitry Date: Thu, 6 Jun 2019 10:06:20 +0300 Subject: [PATCH 01/12] Rewrite concurrent downloader and make erc20 downloader concurrent --- services/wallet/async.go | 75 ++++++++++++++++++++++++++ services/wallet/commands.go | 79 +++------------------------ services/wallet/concurrent.go | 87 +++++++++++++++++++++--------- services/wallet/concurrent_test.go | 22 ++++---- services/wallet/downloader.go | 65 +++++++++++++--------- 5 files changed, 197 insertions(+), 131 deletions(-) create mode 100644 services/wallet/async.go diff --git a/services/wallet/async.go b/services/wallet/async.go new file mode 100644 index 0000000000..fbc551de9b --- /dev/null +++ b/services/wallet/async.go @@ -0,0 +1,75 @@ +package wallet + +import ( + "context" + "sync" + "time" +) + +type Command interface { + Run(context.Context) +} + +type FiniteCommand struct { + Interval time.Duration + Runable func(context.Context) error +} + +func (c FiniteCommand) Run(ctx context.Context) { + ticker := time.NewTicker(c.Interval) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + err := c.Runable(ctx) + if err == nil { + return + } + } + } +} + +type InfiniteCommand struct { + Interval time.Duration + Runable func(context.Context) error +} + +func (c InfiniteCommand) Run(ctx context.Context) { + ticker := time.NewTicker(c.Interval) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + _ = c.Runable(ctx) + } + } +} + +func NewGroup() *Group { + ctx, cancel := context.WithCancel(context.Background()) + return &Group{ + ctx: ctx, + cancel: cancel, + } +} + +type Group struct { + ctx context.Context + cancel func() + wg sync.WaitGroup +} + +func (g *Group) Add(cmd Command) { + g.wg.Add(1) + go func() { + cmd.Run(g.ctx) + g.wg.Done() + }() +} + +func (g *Group) Stop() { + g.cancel() + g.wg.Wait() +} diff --git a/services/wallet/commands.go b/services/wallet/commands.go index 07c4d2f875..e9071cce4d 100644 --- a/services/wallet/commands.go +++ b/services/wallet/commands.go @@ -2,8 +2,8 @@ package wallet import ( "context" + "errors" "math/big" - "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -12,74 +12,6 @@ import ( "github.com/ethereum/go-ethereum/log" ) -type Command interface { - Run(context.Context) -} - -type FiniteCommand struct { - Interval time.Duration - Runable func(context.Context) error -} - -func (c FiniteCommand) Run(ctx context.Context) { - ticker := time.NewTicker(c.Interval) - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - err := c.Runable(ctx) - if err == nil { - return - } - } - } -} - -type InfiniteCommand struct { - Interval time.Duration - Runable func(context.Context) error -} - -func (c InfiniteCommand) Run(ctx context.Context) { - ticker := time.NewTicker(c.Interval) - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - _ = c.Runable(ctx) - } - } -} - -func NewGroup() *Group { - ctx, cancel := context.WithCancel(context.Background()) - return &Group{ - ctx: ctx, - cancel: cancel, - } -} - -type Group struct { - ctx context.Context - cancel func() - wg sync.WaitGroup -} - -func (g *Group) Add(cmd Command) { - g.wg.Add(1) - go func() { - cmd.Run(g.ctx) - g.wg.Done() - }() -} - -func (g *Group) Stop() { - g.cancel() - g.wg.Wait() -} - type ethHistoricalCommand struct { db *Database eth TransferDownloader @@ -115,12 +47,17 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) { concurrent := NewConcurrentDownloader(ctx) start := time.Now() downloadEthConcurrently(concurrent, c.client, c.eth, c.address, zero, c.previous.Number) - concurrent.Wait() + select { + case <-concurrent.WaitAsync(): + case <-ctx.Done(): + log.Error("eth downloader is stuck") + return errors.New("eth downloader is stuck") + } if concurrent.Error() != nil { log.Error("failed to dowloader transfers using concurrent downloader", "error", err) return concurrent.Error() } - transfers := concurrent.Transfers() + transfers := concurrent.Get() log.Info("eth historical downloader finished succesfully", "total transfers", len(transfers), "time", time.Since(start)) // TODO(dshulyak) insert 0 block number with transfers err = c.db.ProcessTranfers(transfers, headersFromTransfers(transfers), nil, ethSync) diff --git a/services/wallet/concurrent.go b/services/wallet/concurrent.go index 2c0d1f6881..6d071326e8 100644 --- a/services/wallet/concurrent.go +++ b/services/wallet/concurrent.go @@ -11,30 +11,59 @@ import ( // NewConcurrentDownloader creates ConcurrentDownloader instance. func NewConcurrentDownloader(ctx context.Context) *ConcurrentDownloader { + runner := NewConcurrentRunner(ctx) + result := &Result{} + return &ConcurrentDownloader{runner, result} +} + +type ConcurrentDownloader struct { + *ConcurrentRunner + *Result +} + +type Result struct { + mu sync.Mutex + transfers []Transfer +} + +func (r *Result) Add(transfers ...Transfer) { + r.mu.Lock() + defer r.mu.Unlock() + r.transfers = append(r.transfers, transfers...) +} + +func (r *Result) Get() []Transfer { + r.mu.Lock() + defer r.mu.Unlock() + rst := make([]Transfer, len(r.transfers)) + copy(rst, r.transfers) + return rst +} + +func NewConcurrentRunner(ctx context.Context) *ConcurrentRunner { ctx, cancel := context.WithCancel(ctx) - return &ConcurrentDownloader{ + return &ConcurrentRunner{ ctx: ctx, cancel: cancel, } } -// ConcurrentDownloader manages downloaders life cycle. -type ConcurrentDownloader struct { +// ConcurrentRunner runs group atomically. +type ConcurrentRunner struct { ctx context.Context cancel func() wg sync.WaitGroup - mu sync.Mutex - results []Transfer - error error + mu sync.Mutex + error error } // Go spawns function in a goroutine and stores results or errors. -func (d *ConcurrentDownloader) Go(f func(context.Context) ([]Transfer, error)) { +func (d *ConcurrentRunner) Go(f func(context.Context) error) { d.wg.Add(1) go func() { defer d.wg.Done() - transfers, err := f(d.ctx) + err := f(d.ctx) d.mu.Lock() defer d.mu.Unlock() if err != nil { @@ -46,29 +75,30 @@ func (d *ConcurrentDownloader) Go(f func(context.Context) ([]Transfer, error)) { d.cancel() return } - d.results = append(d.results, transfers...) }() } -// Transfers returns collected transfers. To get all results should be called after Wait. -func (d *ConcurrentDownloader) Transfers() []Transfer { - d.mu.Lock() - defer d.mu.Unlock() - rst := make([]Transfer, len(d.results)) - copy(rst, d.results) - return rst -} - // Wait for all downloaders to finish. -func (d *ConcurrentDownloader) Wait() { +func (d *ConcurrentRunner) Wait() { d.wg.Wait() if d.Error() == nil { + d.mu.Lock() + defer d.mu.Unlock() d.cancel() } } +func (d *ConcurrentRunner) WaitAsync() <-chan struct{} { + ch := make(chan struct{}) + go func() { + d.Wait() + close(ch) + }() + return ch +} + // Error stores an error that was reported by any of the downloader. Should be called after Wait. -func (d *ConcurrentDownloader) Error() error { +func (d *ConcurrentRunner) Error() error { d.mu.Lock() defer d.mu.Unlock() return d.error @@ -80,29 +110,34 @@ type TransferDownloader interface { } func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, downloader TransferDownloader, account common.Address, low, high *big.Int) { - c.Go(func(ctx context.Context) ([]Transfer, error) { + c.Go(func(ctx context.Context) error { log.Debug("eth transfers comparing blocks", "low", low, "high", high) lb, err := client.BalanceAt(ctx, account, low) if err != nil { - return nil, err + return err } hb, err := client.BalanceAt(ctx, account, high) if err != nil { - return nil, err + return err } if lb.Cmp(hb) == 0 { log.Debug("balances are equal", "low", low, "high", high) - return nil, nil + return nil } if new(big.Int).Sub(high, low).Cmp(one) == 0 { log.Debug("higher block is a parent", "low", low, "high", high) - return downloader.GetTransfersByNumber(ctx, high) + transfers, err := downloader.GetTransfersByNumber(ctx, high) + if err != nil { + return err + } + c.Add(transfers...) + return nil } mid := new(big.Int).Add(low, high) mid = mid.Div(mid, two) log.Debug("balances are not equal spawn two concurrent downloaders", "low", low, "mid", mid, "high", high) downloadEthConcurrently(c, client, downloader, account, low, mid) downloadEthConcurrently(c, client, downloader, account, mid, high) - return nil, nil + return nil }) } diff --git a/services/wallet/concurrent_test.go b/services/wallet/concurrent_test.go index 859a41497b..e2ba6a400a 100644 --- a/services/wallet/concurrent_test.go +++ b/services/wallet/concurrent_test.go @@ -15,17 +15,17 @@ import ( func TestConcurrentErrorInterrupts(t *testing.T) { concurrent := NewConcurrentDownloader(context.Background()) var interrupted bool - concurrent.Go(func(ctx context.Context) ([]Transfer, error) { + concurrent.Go(func(ctx context.Context) error { select { case <-ctx.Done(): interrupted = true case <-time.After(10 * time.Second): } - return nil, nil + return nil }) err := errors.New("interrupt") - concurrent.Go(func(ctx context.Context) ([]Transfer, error) { - return nil, err + concurrent.Go(func(ctx context.Context) error { + return err }) concurrent.Wait() require.True(t, interrupted) @@ -34,14 +34,16 @@ func TestConcurrentErrorInterrupts(t *testing.T) { func TestConcurrentCollectsTransfers(t *testing.T) { concurrent := NewConcurrentDownloader(context.Background()) - concurrent.Go(func(context.Context) ([]Transfer, error) { - return []Transfer{{}}, nil + concurrent.Go(func(context.Context) error { + concurrent.Add(Transfer{}) + return nil }) - concurrent.Go(func(context.Context) ([]Transfer, error) { - return []Transfer{{}}, nil + concurrent.Go(func(context.Context) error { + concurrent.Add(Transfer{}) + return nil }) concurrent.Wait() - require.Len(t, concurrent.Transfers(), 2) + require.Len(t, concurrent.Get(), 2) } type balancesFixture []*big.Int @@ -111,7 +113,7 @@ func TestConcurrentEthDownloader(t *testing.T) { common.Address{}, zero, tc.options.last) concurrent.Wait() require.NoError(t, concurrent.Error()) - rst := concurrent.Transfers() + rst := concurrent.Get() require.Len(t, rst, len(tc.options.result)) sort.Slice(rst, func(i, j int) bool { return rst[i].BlockNumber.Cmp(rst[j].BlockNumber) < 0 diff --git a/services/wallet/downloader.go b/services/wallet/downloader.go index cc9b95b9f7..24a4f4f594 100644 --- a/services/wallet/downloader.go +++ b/services/wallet/downloader.go @@ -2,6 +2,7 @@ package wallet import ( "context" + "errors" "math/big" "time" @@ -163,32 +164,48 @@ func (d *ERC20TransfersDownloader) outboundTopics(address common.Address) [][]co return [][]common.Hash{{d.signature}, {d.paddedAddress(address)}, {}} } +func (d *ERC20TransfersDownloader) tranasferFromLogs(parent context.Context, log types.Log, address common.Address) (Transfer, error) { + ctx, cancel := context.WithTimeout(parent, 3*time.Second) + tx, _, err := d.client.TransactionByHash(ctx, log.TxHash) + cancel() + if err != nil { + return Transfer{}, err + } + ctx, cancel = context.WithTimeout(parent, 3*time.Second) + receipt, err := d.client.TransactionReceipt(ctx, log.TxHash) + cancel() + if err != nil { + return Transfer{}, err + } + return Transfer{ + Address: address, + Type: erc20Transfer, + BlockNumber: new(big.Int).SetUint64(log.BlockNumber), + BlockHash: log.BlockHash, + Transaction: tx, + Receipt: receipt, + }, nil +} + func (d *ERC20TransfersDownloader) transfersFromLogs(parent context.Context, logs []types.Log, address common.Address) ([]Transfer, error) { - rst := make([]Transfer, len(logs)) - for i, l := range logs { - // TODO(dshulyak) use TransactionInBlock after it is fixed - ctx, cancel := context.WithTimeout(parent, 3*time.Second) - tx, _, err := d.client.TransactionByHash(ctx, l.TxHash) - cancel() - if err != nil { - return nil, err - } - ctx, cancel = context.WithTimeout(parent, 3*time.Second) - receipt, err := d.client.TransactionReceipt(ctx, l.TxHash) - cancel() - if err != nil { - return nil, err - } - rst[i] = Transfer{ - Address: address, - Type: erc20Transfer, - BlockNumber: new(big.Int).SetUint64(l.BlockNumber), - BlockHash: l.BlockHash, - Transaction: tx, - Receipt: receipt, - } + concurrent := NewConcurrentDownloader(parent) + for i := range logs { + l := logs[i] + concurrent.Go(func(ctx context.Context) error { + transfer, err := d.tranasferFromLogs(ctx, l, address) + if err != nil { + return err + } + concurrent.Add(transfer) + return nil + }) } - return rst, nil + select { + case <-concurrent.WaitAsync(): + case <-parent.Done(): + return nil, errors.New("logs downloader stuck") + } + return concurrent.Get(), nil } func any(address common.Address, compare []common.Address) bool { From dbb7499dcb0cf266267e6dff1c51bdaa4253d22b Mon Sep 17 00:00:00 2001 From: Dmitry Date: Thu, 6 Jun 2019 10:46:39 +0300 Subject: [PATCH 02/12] Generate unique ID for every transfer --- services/wallet/concurrent.go | 1 + services/wallet/database.go | 27 ++---------- services/wallet/database_test.go | 7 ++- services/wallet/downloader.go | 73 +++++++++++++++++++------------- services/wallet/query.go | 22 +++++++++- 5 files changed, 73 insertions(+), 57 deletions(-) diff --git a/services/wallet/concurrent.go b/services/wallet/concurrent.go index 6d071326e8..4eea484979 100644 --- a/services/wallet/concurrent.go +++ b/services/wallet/concurrent.go @@ -41,6 +41,7 @@ func (r *Result) Get() []Transfer { } func NewConcurrentRunner(ctx context.Context) *ConcurrentRunner { + // TODO(dshulyak) rename to atomic group and keep interface consistent with regular Group. ctx, cancel := context.WithCancel(ctx) return &ConcurrentRunner{ ctx: ctx, diff --git a/services/wallet/database.go b/services/wallet/database.go index 7c05b423d5..96e3bd5596 100644 --- a/services/wallet/database.go +++ b/services/wallet/database.go @@ -144,7 +144,7 @@ func (db *Database) GetTransfersByAddress(address common.Address, start, end *bi return } defer rows.Close() - return scanTransfers(rows) + return query.Scan(rows) } // GetTransfers load transfers transfer betweeen two blocks. @@ -154,11 +154,8 @@ func (db *Database) GetTransfers(start, end *big.Int) (rst []Transfer, err error if err != nil { return } - if err != nil { - return - } defer rows.Close() - return scanTransfers(rows) + return query.Scan(rows) } // SaveHeader stores a single header. @@ -289,24 +286,6 @@ SELECT blocks.hash, blk_number FROM accounts_to_blocks JOIN blocks ON blk_number return nil, nil } -func scanTransfers(rows *sql.Rows) (rst []Transfer, err error) { - for rows.Next() { - transfer := Transfer{ - BlockNumber: &big.Int{}, - Transaction: &types.Transaction{}, - Receipt: &types.Receipt{}, - } - err = rows.Scan( - &transfer.Type, &transfer.BlockHash, (*SQLBigInt)(transfer.BlockNumber), &transfer.Address, - &JSONBlob{transfer.Transaction}, &JSONBlob{transfer.Receipt}) - if err != nil { - return nil, err - } - rst = append(rst, transfer) - } - return rst, nil -} - // statementCreator allows to pass transaction or database to use in consumer. type statementCreator interface { Prepare(query string) (*sql.Stmt, error) @@ -346,7 +325,7 @@ func insertTransfers(creator statementCreator, transfers []Transfer) error { return err } for _, t := range transfers { - _, err = insert.Exec(t.Transaction.Hash(), t.BlockHash, t.Address, &JSONBlob{t.Transaction}, &JSONBlob{t.Receipt}, t.Type) + _, err = insert.Exec(t.ID, t.BlockHash, t.Address, &JSONBlob{t.Transaction}, &JSONBlob{t.Receipt}, t.Type) if err != nil { return err } diff --git a/services/wallet/database_test.go b/services/wallet/database_test.go index 7fc3959fa5..aeb438010c 100644 --- a/services/wallet/database_test.go +++ b/services/wallet/database_test.go @@ -105,6 +105,7 @@ func TestDBProcessTransfer(t *testing.T) { tx := types.NewTransaction(1, common.Address{1}, nil, 10, big.NewInt(10), nil) transfers := []Transfer{ { + ID: common.Hash{1}, Type: ethTransfer, BlockHash: header.Hash, BlockNumber: header.Number, @@ -131,10 +132,10 @@ func TestDBReorgTransfers(t *testing.T) { originalTX := types.NewTransaction(1, common.Address{1}, nil, 10, big.NewInt(10), nil) replacedTX := types.NewTransaction(2, common.Address{1}, nil, 10, big.NewInt(10), nil) require.NoError(t, db.ProcessTranfers([]Transfer{ - {ethTransfer, *originalTX.To(), original.Number, original.Hash, originalTX, rcpt}, + {ethTransfer, common.Hash{1}, *originalTX.To(), original.Number, original.Hash, originalTX, rcpt}, }, []*DBHeader{original}, nil, 0)) require.NoError(t, db.ProcessTranfers([]Transfer{ - {ethTransfer, *replacedTX.To(), replaced.Number, replaced.Hash, replacedTX, rcpt}, + {ethTransfer, common.Hash{2}, *replacedTX.To(), replaced.Number, replaced.Hash, replacedTX, rcpt}, }, []*DBHeader{replaced}, []*DBHeader{original}, 0)) all, err := db.GetTransfers(big.NewInt(0), nil) @@ -158,6 +159,7 @@ func TestDBGetTransfersFromBlock(t *testing.T) { receipt := types.NewReceipt(nil, false, 100) receipt.Logs = []*types.Log{} transfer := Transfer{ + ID: tx.Hash(), Type: ethTransfer, BlockNumber: header.Number, BlockHash: header.Hash, @@ -223,6 +225,7 @@ func TestDBProcessTransfersUpdate(t *testing.T) { Hash: common.Hash{1}, } transfer := Transfer{ + ID: common.Hash{1}, BlockNumber: header.Number, BlockHash: header.Hash, Transaction: types.NewTransaction(0, common.Address{}, nil, 0, nil, nil), diff --git a/services/wallet/downloader.go b/services/wallet/downloader.go index 24a4f4f594..4e52b6cf45 100644 --- a/services/wallet/downloader.go +++ b/services/wallet/downloader.go @@ -2,6 +2,7 @@ package wallet import ( "context" + "encoding/binary" "errors" "math/big" "time" @@ -25,14 +26,15 @@ const ( ) var ( - one = big.NewInt(1) zero = big.NewInt(0) + one = big.NewInt(1) two = big.NewInt(2) ) // Transfer stores information about transfer. type Transfer struct { Type TransferType `json:"type"` + ID common.Hash `json:"-"` Address common.Address `json:"address"` BlockNumber *big.Int `json:"blockNumber"` BlockHash common.Hash `json:"blockhash"` @@ -94,37 +96,33 @@ func (d *ETHTransferDownloader) GetTransfersByNumber(ctx context.Context, number func (d *ETHTransferDownloader) getTransfersInBlock(ctx context.Context, blk *types.Block, accounts []common.Address) (rst []Transfer, err error) { for _, tx := range blk.Transactions() { + var address *common.Address from, err := types.Sender(d.signer, tx) if err != nil { return nil, err } - // payload is empty for eth transfers if any(from, accounts) { - receipt, err := d.client.TransactionReceipt(ctx, tx.Hash()) - if err != nil { - return nil, err - } - rst = append(rst, Transfer{Type: ethTransfer, - Address: from, - BlockNumber: blk.Number(), - BlockHash: blk.Hash(), - Transaction: tx, Receipt: receipt}) - continue + address = &from + } else if tx.To() != nil && any(*tx.To(), accounts) { + address = tx.To() } - if tx.To() == nil { - continue - } - if any(*tx.To(), accounts) { + if address != nil { receipt, err := d.client.TransactionReceipt(ctx, tx.Hash()) if err != nil { return nil, err } - rst = append(rst, Transfer{Type: ethTransfer, - Address: *tx.To(), + if isTokenTransfer(receipt.Logs) { + log.Debug("eth downloader found token transfer", "hash", tx.Hash()) + continue + } + rst = append(rst, Transfer{ + Type: ethTransfer, + ID: tx.Hash(), + Address: *address, BlockNumber: blk.Number(), BlockHash: blk.Hash(), Transaction: tx, Receipt: receipt}) - continue + } } // TODO(dshulyak) test that balance difference was covered by transactions @@ -164,7 +162,7 @@ func (d *ERC20TransfersDownloader) outboundTopics(address common.Address) [][]co return [][]common.Hash{{d.signature}, {d.paddedAddress(address)}, {}} } -func (d *ERC20TransfersDownloader) tranasferFromLogs(parent context.Context, log types.Log, address common.Address) (Transfer, error) { +func (d *ERC20TransfersDownloader) transferFromLog(parent context.Context, log types.Log, address common.Address) (Transfer, error) { ctx, cancel := context.WithTimeout(parent, 3*time.Second) tx, _, err := d.client.TransactionByHash(ctx, log.TxHash) cancel() @@ -177,8 +175,13 @@ func (d *ERC20TransfersDownloader) tranasferFromLogs(parent context.Context, log if err != nil { return Transfer{}, err } + // TODO(dshulyak) what is the max number of logs? + index := [4]byte{} + binary.BigEndian.PutUint32(index[:], uint32(log.Index)) + id := crypto.Keccak256Hash(log.TxHash.Bytes(), index[:]) return Transfer{ Address: address, + ID: id, Type: erc20Transfer, BlockNumber: new(big.Int).SetUint64(log.BlockNumber), BlockHash: log.BlockHash, @@ -192,7 +195,7 @@ func (d *ERC20TransfersDownloader) transfersFromLogs(parent context.Context, log for i := range logs { l := logs[i] concurrent.Go(func(ctx context.Context) error { - transfer, err := d.tranasferFromLogs(ctx, l, address) + transfer, err := d.transferFromLog(ctx, l, address) if err != nil { return err } @@ -208,15 +211,6 @@ func (d *ERC20TransfersDownloader) transfersFromLogs(parent context.Context, log return concurrent.Get(), nil } -func any(address common.Address, compare []common.Address) bool { - for _, c := range compare { - if c == address { - return true - } - } - return false -} - // GetTransfers for erc20 uses eth_getLogs rpc with Transfer event signature and our address acount. func (d *ERC20TransfersDownloader) GetTransfers(ctx context.Context, header *DBHeader) ([]Transfer, error) { hash := header.Hash @@ -289,3 +283,22 @@ func (d *ERC20TransfersDownloader) GetTransfersInRange(parent context.Context, f log.Debug("found erc20 transfers between two blocks", "from", from, "to", to, "lth", len(transfers), "took", time.Since(start)) return transfers, nil } + +func any(address common.Address, compare []common.Address) bool { + for _, c := range compare { + if c == address { + return true + } + } + return false +} + +func isTokenTransfer(logs []*types.Log) bool { + signature := crypto.Keccak256Hash([]byte(erc20TransferEventSignature)) + for _, l := range logs { + if len(l.Topics) > 0 && l.Topics[0] == signature { + return true + } + } + return false +} diff --git a/services/wallet/query.go b/services/wallet/query.go index 00a3128000..73d94659eb 100644 --- a/services/wallet/query.go +++ b/services/wallet/query.go @@ -2,12 +2,14 @@ package wallet import ( "bytes" + "database/sql" "math/big" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" ) -const baseTransfersQuery = "SELECT type, blocks.hash, blocks.number, address, tx, receipt FROM transfers JOIN blocks ON blk_hash = blocks.hash" +const baseTransfersQuery = "SELECT transfers.hash, type, blocks.hash, blocks.number, address, tx, receipt FROM transfers JOIN blocks ON blk_hash = blocks.hash" func newTransfersQuery() *transfersQuery { buf := bytes.NewBuffer(nil) @@ -64,3 +66,21 @@ func (q *transfersQuery) String() string { func (q *transfersQuery) Args() []interface{} { return q.args } + +func (q *transfersQuery) Scan(rows *sql.Rows) (rst []Transfer, err error) { + for rows.Next() { + transfer := Transfer{ + BlockNumber: &big.Int{}, + Transaction: &types.Transaction{}, + Receipt: &types.Receipt{}, + } + err = rows.Scan( + &transfer.ID, &transfer.Type, &transfer.BlockHash, (*SQLBigInt)(transfer.BlockNumber), &transfer.Address, + &JSONBlob{transfer.Transaction}, &JSONBlob{transfer.Receipt}) + if err != nil { + return nil, err + } + rst = append(rst, transfer) + } + return rst, nil +} From d3ce2b78d828654ebe9d39bc9e12b2265fedbda7 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Fri, 7 Jun 2019 11:48:57 +0300 Subject: [PATCH 03/12] Write better tests for reorg handling and improve handling description --- services/wallet/commands.go | 99 ++++++++-- services/wallet/commands_test.go | 293 ++++++++++++++++++++---------- services/wallet/database.go | 18 ++ services/wallet/database_test.go | 25 +++ services/wallet/events.go | 6 +- services/wallet/iterative.go | 4 +- services/wallet/iterative_test.go | 42 +++++ services/wallet/reactor.go | 52 +----- t/devtests/testchain/node.go | 80 ++++++++ 9 files changed, 459 insertions(+), 160 deletions(-) create mode 100644 t/devtests/testchain/node.go diff --git a/services/wallet/commands.go b/services/wallet/commands.go index e9071cce4d..8f49a9d281 100644 --- a/services/wallet/commands.go +++ b/services/wallet/commands.go @@ -13,11 +13,12 @@ import ( ) type ethHistoricalCommand struct { - db *Database - eth TransferDownloader - address common.Address - client reactorClient - feed *event.Feed + db *Database + eth TransferDownloader + address common.Address + client reactorClient + feed *event.Feed + safetyDepth *big.Int previous *DBHeader } @@ -36,7 +37,7 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) { return err } if c.previous == nil { - c.previous, err = lastKnownHeader(c.db, c.client) + c.previous, err = lastKnownHeader(ctx, c.db, c.client, c.safetyDepth) if err != nil { return err } @@ -78,11 +79,12 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) { } type erc20HistoricalCommand struct { - db *Database - erc20 BatchDownloader - address common.Address - client reactorClient - feed *event.Feed + db *Database + erc20 BatchDownloader + address common.Address + client reactorClient + feed *event.Feed + safetyDepth *big.Int iterator *IterativeDownloader } @@ -96,7 +98,9 @@ func (c *erc20HistoricalCommand) Command() FiniteCommand { func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) { if c.iterator == nil { - c.iterator, err = SetupIterativeDownloader(c.db, c.client, c.address, erc20Sync, c.erc20, erc20BatchSize) + c.iterator, err = SetupIterativeDownloader( + c.db, c.client, c.address, erc20Sync, + c.erc20, erc20BatchSize, c.safetyDepth) if err != nil { log.Error("failed to setup historical downloader for erc20") return err @@ -129,12 +133,13 @@ func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) { } type newBlocksTransfersCommand struct { - db *Database - chain *big.Int - erc20 *ERC20TransfersDownloader - eth *ETHTransferDownloader - client reactorClient - feed *event.Feed + db *Database + chain *big.Int + erc20 *ERC20TransfersDownloader + eth *ETHTransferDownloader + client reactorClient + feed *event.Feed + safetyDepth *big.Int previous *DBHeader } @@ -148,7 +153,7 @@ func (c *newBlocksTransfersCommand) Command() InfiniteCommand { func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) { if c.previous == nil { - c.previous, err = lastKnownHeader(c.db, c.client) + c.previous, err = lastKnownHeader(parent, c.db, c.client, c.safetyDepth) if err != nil { log.Error("failed to get last known header", "error", err) return err @@ -268,3 +273,59 @@ func uniqueAccountsFromTransfers(transfers []Transfer) []common.Address { } return accounts } + +// lastKnownHeader selects last stored header in database. Such header should have atleast safety depth predecessor in our database. +// We don't store every single header in the database. +// Historical downloaders storing only block where transfer was found. +// New block downloaders store every block it downloaded. +// It could happen that historical downloader found transfers in block 15. With a current head set at 20. +// If we will notice reorg at 20 but chain was rewritten starting from 10th block we won't be able to backtrack that transfer +// found in 15 block was removed from chain. +// See tests TestSafetyBufferFailed and TestSafetyBufferSuccess. +func lastKnownHeader(parent context.Context, db *Database, client HeaderReader, safetyLimit *big.Int) (*DBHeader, error) { + headers, err := db.LastHeaders(safetyLimit) + if err != nil { + return nil, err + } + if int64(len(headers)) > limit.Int64() && isSequence(headers) { + return headers[0], nil + } + var latest *DBHeader + if len(headers) == 0 { + ctx, cancel := context.WithTimeout(parent, 3*time.Second) + header, err := client.HeaderByNumber(ctx, nil) + cancel() + if err != nil { + return nil, err + } + latest = toDBHeader(header) + } else { + latest = headers[0] + } + diff := new(big.Int).Sub(latest.Number, safetyLimit) + if diff.Cmp(zero) <= 0 { + diff = zero + } + ctx, cancel := context.WithTimeout(parent, 3*time.Second) + header, err := client.HeaderByNumber(ctx, safetyLimit) + cancel() + if err != nil { + return nil, err + } + return toDBHeader(header), nil +} + +func isSequence(headers []*DBHeader) bool { + if len(headers) == 0 { + return false + } + child := headers[0] + diff := big.NewInt(0) + for _, parent := range headers[1:] { + if diff.Sub(child.Number, parent.Number).Cmp(one) != 0 { + return false + } + child = parent + } + return true +} diff --git a/services/wallet/commands_test.go b/services/wallet/commands_test.go index 344664e4b3..27463bca96 100644 --- a/services/wallet/commands_test.go +++ b/services/wallet/commands_test.go @@ -2,124 +2,229 @@ package wallet import ( "context" - "errors" "math/big" - "sort" "testing" + "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" - "github.com/stretchr/testify/require" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/event" + "github.com/status-im/status-go/t/devtests/testchain" + "github.com/stretchr/testify/suite" ) -type headers []*types.Header +func TestNewBlocksSuite(t *testing.T) { + suite.Run(t, new(NewBlocksSuite)) +} -func (h headers) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { - for _, item := range h { - if item.Hash() == hash { - return item, nil - } - } - return nil, errors.New("not found") +type NewBlocksSuite struct { + suite.Suite + backend *testchain.Backend + cmd *newBlocksTransfersCommand + address common.Address + db *Database + dbStop func() + feed *event.Feed } -func (h headers) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { - for _, item := range h { - if item.Number.Cmp(number) == 0 { - return item, nil - } +func (s *NewBlocksSuite) SetupTest() { + var err error + db, stop := setupTestDB(s.Suite.T()) + s.db = db + s.dbStop = stop + s.backend, err = testchain.NewBackend() + s.Require().NoError(err) + account, err := crypto.GenerateKey() + s.Require().NoError(err) + s.address = crypto.PubkeyToAddress(account.PublicKey) + s.feed = &event.Feed{} + s.cmd = &newBlocksTransfersCommand{ + db: s.db, + erc20: NewERC20TransfersDownloader(s.backend.Client, []common.Address{s.address}), + eth: ÐTransferDownloader{ + client: s.backend.Client, + signer: s.backend.Signer, + accounts: []common.Address{s.address}, + }, + feed: s.feed, + client: s.backend.Client, + safetyDepth: big.NewInt(15), } - return nil, errors.New("not found") } -func (h headers) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) { - return nil, errors.New("not implemented") +func (s *NewBlocksSuite) TearDownTest() { + s.dbStop() + s.Require().NoError(s.backend.Stop()) } -func TestReorgOnNewBlock(t *testing.T) { - db, stop := setupTestDB(t) - defer stop() - original := genHeadersChain(5, 1) - require.NoError(t, db.SaveHeaders(original)) - // rewrite parents ater 2nd block - reorg := make(headers, 5) - copy(reorg, original[:2]) - for i := 2; i < len(reorg); i++ { - reorg[i] = &types.Header{ - Number: big.NewInt(int64(i)), - Difficulty: big.NewInt(2), // actual difficulty is not important. using it to change a hash - Time: big.NewInt(1), - ParentHash: reorg[i-1].Hash(), - } - } - cmd := &newBlocksTransfersCommand{ - client: reorg, - db: db, - } - latest := &types.Header{ - Number: big.NewInt(5), - Difficulty: big.NewInt(2), - Time: big.NewInt(1), - ParentHash: reorg[len(reorg)-1].Hash(), - } - previous := original[len(original)-1] - added, removed, err := cmd.onNewBlock(context.TODO(), toDBHeader(previous), latest) - require.NoError(t, err) - require.Len(t, added, 4) - require.Len(t, removed, 3) - - sort.Slice(removed, func(i, j int) bool { - return removed[i].Number.Cmp(removed[j].Number) < 1 +func (s *NewBlocksSuite) TestOneBlock() { + ctx := context.Background() + s.Require().EqualError(s.cmd.Run(ctx), "not found") + tx := types.NewTransaction(0, s.address, big.NewInt(1e17), 21000, big.NewInt(1), nil) + tx, err := types.SignTx(tx, s.backend.Signer, s.backend.Faucet) + s.Require().NoError(err) + blocks := s.backend.GenerateBlocks(1, 0, func(n int, gen *core.BlockGen) { + gen.AddTx(tx) }) - for i, h := range original[2:] { - require.Equal(t, h.Hash(), removed[i].Hash) - } + n, err := s.backend.Ethereum.BlockChain().InsertChain(blocks) + s.Require().Equal(1, n) + s.Require().NoError(err) - expected := make([]*types.Header, 4) - copy(expected, reorg[2:]) - expected[3] = latest - sort.Slice(added, func(i, j int) bool { - return added[i].Number.Cmp(added[j].Number) < 1 - }) - for i, h := range expected { - require.Equal(t, h.Hash(), added[i].Hash) + events := make(chan Event, 1) + sub := s.feed.Subscribe(events) + defer sub.Unsubscribe() + + s.Require().NoError(s.cmd.Run(ctx)) + + select { + case ev := <-events: + s.Require().Equal(ev.Type, EventNewBlock) + s.Require().Equal(ev.BlockNumber, big.NewInt(1)) + default: + s.Require().FailNow("event wasn't emitted") } + transfers, err := s.db.GetTransfers(big.NewInt(0), nil) + s.Require().NoError(err) + s.Require().Len(transfers, 1) + s.Require().Equal(tx.Hash(), transfers[0].ID) +} + +func (s *NewBlocksSuite) genTx(nonce int) *types.Transaction { + tx := types.NewTransaction(uint64(nonce), s.address, big.NewInt(1e10), 21000, big.NewInt(1), nil) + tx, err := types.SignTx(tx, s.backend.Signer, s.backend.Faucet) + s.Require().NoError(err) + return tx } -func TestReactorReorgAllKnownHeaders(t *testing.T) { - db, stop := setupTestDB(t) - defer stop() - original := genHeadersChain(2, 1) - reorg := make(headers, 2) - copy(reorg, genHeadersChain(2, 2)) - latest := &types.Header{ - Number: big.NewInt(2), - Difficulty: big.NewInt(2), - Time: big.NewInt(1), - ParentHash: reorg[len(reorg)-1].Hash(), +func (s *NewBlocksSuite) runCmdUntilError(ctx context.Context) (err error) { + for err == nil { + err = s.cmd.Run(ctx) } - require.NoError(t, db.SaveHeaders(original)) - cmd := &newBlocksTransfersCommand{ - client: reorg, - db: db, + return err +} + +func (s *NewBlocksSuite) TestReorg() { + blocks := s.backend.GenerateBlocks(20, 0, nil) + n, err := s.backend.Ethereum.BlockChain().InsertChain(blocks) + s.Require().Equal(20, n) + s.Require().NoError(err) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + s.Require().EqualError(s.runCmdUntilError(ctx), "not found") + + blocks = s.backend.GenerateBlocks(3, 20, func(n int, gen *core.BlockGen) { + gen.AddTx(s.genTx(n)) + }) + n, err = s.backend.Ethereum.BlockChain().InsertChain(blocks) + s.Require().Equal(3, n) + s.Require().NoError(err) + + // `not found` returned when we query head+1 block + ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + s.Require().EqualError(s.runCmdUntilError(ctx), "not found") + + transfers, err := s.db.GetTransfers(big.NewInt(0), nil) + s.Require().NoError(err) + s.Require().Len(transfers, 3) + + blocks = s.backend.GenerateBlocks(10, 15, func(n int, gen *core.BlockGen) { + gen.AddTx(s.genTx(n)) + }) + n, err = s.backend.Ethereum.BlockChain().InsertChain(blocks) + s.Require().Equal(10, n) + s.Require().NoError(err) + + // it will be less but even if something wrong we can't get more + events := make(chan Event, 10) + sub := s.feed.Subscribe(events) + defer sub.Unsubscribe() + + ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + s.Require().EqualError(s.runCmdUntilError(ctx), "not found") + + close(events) + expected := []Event{{Type: EventReorg, BlockNumber: big.NewInt(16)}, {Type: EventNewBlock, BlockNumber: big.NewInt(25)}} + i := 0 + for ev := range events { + s.Require().Equal(expected[i].Type, ev.Type) + s.Require().Equal(expected[i].BlockNumber, ev.BlockNumber) + i++ } - added, removed, err := cmd.onNewBlock(context.TODO(), toDBHeader(original[len(original)-1]), latest) - require.NoError(t, err) - require.Len(t, added, 3) - require.Len(t, removed, 2) + + transfers, err = s.db.GetTransfers(big.NewInt(0), nil) + s.Require().NoError(err) + s.Require().Len(transfers, 10) } -func genHeadersChain(size, difficulty int) []*types.Header { - rst := make([]*types.Header, size) - for i := 0; i < size; i++ { - rst[i] = &types.Header{ - Number: big.NewInt(int64(i)), - Difficulty: big.NewInt(int64(difficulty)), - Time: big.NewInt(1), - } - if i != 0 { - rst[i].ParentHash = rst[i-1].Hash() +func (s *NewBlocksSuite) downloadHistorical() { + blocks := s.backend.GenerateBlocks(40, 0, func(n int, gen *core.BlockGen) { + if n == 36 { + gen.AddTx(s.genTx(0)) + } else if n == 39 { + gen.AddTx(s.genTx(1)) } + }) + n, err := s.backend.Ethereum.BlockChain().InsertChain(blocks) + s.Require().Equal(40, n) + s.Require().NoError(err) + + eth := ðHistoricalCommand{ + db: s.db, + eth: ÐTransferDownloader{ + client: s.backend.Client, + signer: s.backend.Signer, + accounts: []common.Address{s.address}, + }, + feed: s.feed, + address: s.address, + client: s.backend.Client, + safetyDepth: big.NewInt(0), } - return rst + s.Require().NoError(eth.Run(context.Background()), "eth historical command failed to sync transfers") + transfers, err := s.db.GetTransfers(big.NewInt(0), nil) + s.Require().NoError(err) + s.Require().Len(transfers, 2) +} + +func (s *NewBlocksSuite) reorgHistorical() { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + s.Require().EqualError(s.runCmdUntilError(ctx), "not found") + + blocks := s.backend.GenerateBlocks(10, 35, nil) + n, err := s.backend.Ethereum.BlockChain().InsertChain(blocks) + s.Require().Equal(10, n) + s.Require().NoError(err) + + ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + s.Require().EqualError(s.runCmdUntilError(ctx), "not found") + +} + +func (s *NewBlocksSuite) TestSafetyBufferFailure() { + s.downloadHistorical() + + s.cmd.safetyDepth = big.NewInt(0) + s.reorgHistorical() + + transfers, err := s.db.GetTransfers(big.NewInt(0), nil) + s.Require().NoError(err) + s.Require().Len(transfers, 1) +} + +func (s *NewBlocksSuite) TestSafetyBufferSuccess() { + s.downloadHistorical() + + s.cmd.safetyDepth = big.NewInt(10) + s.reorgHistorical() + + transfers, err := s.db.GetTransfers(big.NewInt(0), nil) + s.Require().NoError(err) + s.Require().Len(transfers, 0) } diff --git a/services/wallet/database.go b/services/wallet/database.go index 96e3bd5596..cf2c3f7fbf 100644 --- a/services/wallet/database.go +++ b/services/wallet/database.go @@ -242,6 +242,24 @@ func (db *Database) LastHeader() (header *DBHeader, err error) { return nil, nil } +func (db *Database) LastHeaders(limit *big.Int) ([]*DBHeader, error) { + rows, err := db.db.Query("SELECT hash,number FROM blocks WHERE number ORDER BY number DESC LIMIT ?", (*SQLBigInt)(limit)) + if err != nil { + return nil, err + } + defer rows.Close() + headers := []*DBHeader{} + for rows.Next() { + header := &DBHeader{Hash: common.Hash{}, Number: new(big.Int)} + err = rows.Scan(&header.Hash, (*SQLBigInt)(header.Number)) + if err != nil { + return nil, err + } + headers = append(headers, header) + } + return headers, nil +} + // HeaderExists checks if header with hash exists in db. func (db *Database) HeaderExists(hash common.Hash) (bool, error) { var val sql.NullBool diff --git a/services/wallet/database_test.go b/services/wallet/database_test.go index aeb438010c..de1aa79df5 100644 --- a/services/wallet/database_test.go +++ b/services/wallet/database_test.go @@ -4,6 +4,7 @@ import ( "io/ioutil" "math/big" "os" + "sort" "testing" "github.com/ethereum/go-ethereum/common" @@ -238,3 +239,27 @@ func TestDBProcessTransfersUpdate(t *testing.T) { require.NoError(t, err) require.Equal(t, header.Hash, earliest.Hash) } + +func TestDBLastHeadersReverseSorted(t *testing.T) { + db, stop := setupTestDB(t) + defer stop() + + headers := make([]*DBHeader, 10) + for i := range headers { + headers[i] = &DBHeader{Hash: common.Hash{byte(i)}, Number: big.NewInt(int64(i))} + } + require.NoError(t, db.ProcessTranfers(nil, headers, nil, ethSync)) + + headers, err := db.LastHeaders(big.NewInt(5)) + require.NoError(t, err) + require.Len(t, headers, 5) + + sorted := make([]*DBHeader, len(headers)) + copy(sorted, headers) + sort.Slice(sorted, func(i, j int) bool { + return sorted[i].Number.Cmp(sorted[j].Number) > 0 + }) + for i := range headers { + require.Equal(t, sorted[i], headers[i]) + } +} diff --git a/services/wallet/events.go b/services/wallet/events.go index d9d0246045..f126daab76 100644 --- a/services/wallet/events.go +++ b/services/wallet/events.go @@ -11,11 +11,11 @@ type EventType string const ( // EventNewBlock emitted when new block was added to the same canonical chan. - EventNewBlock = "newblock" + EventNewBlock EventType = "newblock" // EventReorg emitted when canonical chain was changed. In this case, BlockNumber will be an earliest added block. - EventReorg = "reorg" + EventReorg EventType = "reorg" // EventNewHistory emitted if transfer from older block was added. - EventNewHistory = "history" + EventNewHistory EventType = "history" ) // Event is a type for wallet events. diff --git a/services/wallet/iterative.go b/services/wallet/iterative.go index df7f781b4c..1ccde86754 100644 --- a/services/wallet/iterative.go +++ b/services/wallet/iterative.go @@ -12,7 +12,7 @@ import ( // SetupIterativeDownloader configures IterativeDownloader with last known synced block. func SetupIterativeDownloader( db *Database, client HeaderReader, address common.Address, option SyncOption, - downloader BatchDownloader, size *big.Int) (*IterativeDownloader, error) { + downloader BatchDownloader, size *big.Int, limit *big.Int) (*IterativeDownloader, error) { d := &IterativeDownloader{ client: client, batchSize: size, @@ -24,7 +24,7 @@ func SetupIterativeDownloader( return nil, err } if earliest == nil { - previous, err := lastKnownHeader(db, client) + previous, err := lastKnownHeader(context.Background(), db, client, limit) if err != nil { log.Error("failed to get last known header", "error", err) return nil, err diff --git a/services/wallet/iterative_test.go b/services/wallet/iterative_test.go index 4cfd32b89a..99d435ec86 100644 --- a/services/wallet/iterative_test.go +++ b/services/wallet/iterative_test.go @@ -2,9 +2,12 @@ package wallet import ( "context" + "errors" "math/big" "testing" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/stretchr/testify/require" ) @@ -65,3 +68,42 @@ func TestIterProgress(t *testing.T) { require.Len(t, batch, 5) require.True(t, iter.Finished()) } + +type headers []*types.Header + +func (h headers) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { + for _, item := range h { + if item.Hash() == hash { + return item, nil + } + } + return nil, errors.New("not found") +} + +func (h headers) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + for _, item := range h { + if item.Number.Cmp(number) == 0 { + return item, nil + } + } + return nil, errors.New("not found") +} + +func (h headers) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) { + return nil, errors.New("not implemented") +} + +func genHeadersChain(size, difficulty int) []*types.Header { + rst := make([]*types.Header, size) + for i := 0; i < size; i++ { + rst[i] = &types.Header{ + Number: big.NewInt(int64(i)), + Difficulty: big.NewInt(int64(difficulty)), + Time: big.NewInt(1), + } + if i != 0 { + rst[i].ParentHash = rst[i-1].Hash() + } + } + return rst +} diff --git a/services/wallet/reactor.go b/services/wallet/reactor.go index 242995229e..8f0186dba6 100644 --- a/services/wallet/reactor.go +++ b/services/wallet/reactor.go @@ -85,10 +85,11 @@ func (r *Reactor) Start() error { for _, address := range r.accounts { r.group = NewGroup() erc20 := &erc20HistoricalCommand{ - db: r.db, - erc20: NewERC20TransfersDownloader(r.client, []common.Address{address}), - client: r.client, - feed: r.feed, + db: r.db, + erc20: NewERC20TransfersDownloader(r.client, []common.Address{address}), + client: r.client, + feed: r.feed, + safetyDepth: reorgSafetyDepth, } r.group.Add(erc20.Command()) eth := ðHistoricalCommand{ @@ -100,7 +101,8 @@ func (r *Reactor) Start() error { accounts: []common.Address{address}, signer: types.NewEIP155Signer(r.chain), }, - feed: r.feed, + feed: r.feed, + safetyDepth: reorgSafetyDepth, } r.group.Add(eth.Command()) } @@ -113,8 +115,9 @@ func (r *Reactor) Start() error { accounts: r.accounts, signer: types.NewEIP155Signer(r.chain), }, - erc20: NewERC20TransfersDownloader(r.client, r.accounts), - feed: r.feed, + erc20: NewERC20TransfersDownloader(r.client, r.accounts), + feed: r.feed, + safetyDepth: reorgSafetyDepth, } r.group.Add(newBlocks.Command()) return nil @@ -146,38 +149,3 @@ func headersFromTransfers(transfers []Transfer) []*DBHeader { } return rst } - -// lastKnownHeader selects last stored header in database. -// If not found it will get head of the chain and in this case last known header will be atleast -// `reorgSafetyDepth` blocks away from chain head. -// `reorgSafetyDepth` is used for two purposes: -// 1. to minimize chances that historical downloader and new blocks downloader will find different transfers -// due to hitting different replicas (infura load balancer). new blocks downloader will eventually resolve conflicts -// by going back parent by parent but it will require more time. -// 2. as we don't store whole chain of blocks, but only blocks with transfers we won't be always able to find parent -// when reorg occurs if we won't start syncing headers atleast 15 blocks away from canonical chain head -func lastKnownHeader(db *Database, client HeaderReader) (*DBHeader, error) { - known, err := db.LastHeader() - if err != nil { - return nil, err - } - if known == nil { - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - latest, err := client.HeaderByNumber(ctx, nil) - cancel() - if err != nil { - return nil, err - } - if latest.Number.Cmp(reorgSafetyDepth) >= 0 { - num := new(big.Int).Sub(latest.Number, reorgSafetyDepth) - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - latest, err = client.HeaderByNumber(ctx, num) - cancel() - if err != nil { - return nil, err - } - } - known = toDBHeader(latest) - } - return known, nil -} diff --git a/t/devtests/testchain/node.go b/t/devtests/testchain/node.go new file mode 100644 index 0000000000..a26578db21 --- /dev/null +++ b/t/devtests/testchain/node.go @@ -0,0 +1,80 @@ +package testchain + +import ( + "crypto/ecdsa" + "math/big" + + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/params" +) + +type Backend struct { + Node *node.Node + Client *ethclient.Client + genesis *core.Genesis + Ethereum *eth.Ethereum + Faucet *ecdsa.PrivateKey + Signer types.Signer +} + +func NewBackend() (*Backend, error) { + faucet, err := crypto.GenerateKey() + if err != nil { + return nil, err + } + config := params.AllEthashProtocolChanges + genesis := &core.Genesis{ + Config: config, + Alloc: core.GenesisAlloc{crypto.PubkeyToAddress(faucet.PublicKey): {Balance: big.NewInt(1e18)}}, + ExtraData: []byte("test genesis"), + Timestamp: 9000, + } + var ethservice *eth.Ethereum + n, err := node.New(&node.Config{}) + if err != nil { + return nil, err + } + err = n.Register(func(ctx *node.ServiceContext) (node.Service, error) { + config := ð.Config{Genesis: genesis} + config.Ethash.PowMode = ethash.ModeFake + ethservice, err = eth.New(ctx, config) + return ethservice, err + }) + if err != nil { + return nil, err + } + + if err := n.Start(); err != nil { + return nil, err + } + client, err := n.Attach() + if err != nil { + return nil, err + } + return &Backend{ + Node: n, + Client: ethclient.NewClient(client), + Ethereum: ethservice, + Faucet: faucet, + Signer: types.NewEIP155Signer(config.ChainID), + genesis: genesis, + }, nil +} + +// GenerateBlocks generate n blocks starting from genesis. +func (b *Backend) GenerateBlocks(n int, start uint64, gen func(int, *core.BlockGen)) []*types.Block { + block := b.Ethereum.BlockChain().GetBlockByNumber(start) + engine := ethash.NewFaker() + blocks, _ := core.GenerateChain(b.genesis.Config, block, engine, b.Ethereum.ChainDb(), n, gen) + return blocks +} + +func (b *Backend) Stop() error { + return b.Node.Stop() +} From f0b640942da8e5a06d2066c321d081cea092821f Mon Sep 17 00:00:00 2001 From: Dmitry Date: Fri, 7 Jun 2019 11:52:52 +0300 Subject: [PATCH 04/12] Renaming leftover --- services/wallet/commands.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/wallet/commands.go b/services/wallet/commands.go index 8f49a9d281..121d432429 100644 --- a/services/wallet/commands.go +++ b/services/wallet/commands.go @@ -287,7 +287,7 @@ func lastKnownHeader(parent context.Context, db *Database, client HeaderReader, if err != nil { return nil, err } - if int64(len(headers)) > limit.Int64() && isSequence(headers) { + if int64(len(headers)) > safetyLimit.Int64() && isSequence(headers) { return headers[0], nil } var latest *DBHeader From a059cf8faf2faa9eeefcf817174432d9ebf9787e Mon Sep 17 00:00:00 2001 From: Dmitry Date: Fri, 7 Jun 2019 12:17:47 +0300 Subject: [PATCH 05/12] Add logs and simplify lastKnownHeader --- services/wallet/commands.go | 25 ++++++++++++------------- services/wallet/reactor.go | 1 + 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/services/wallet/commands.go b/services/wallet/commands.go index 121d432429..b661c2c185 100644 --- a/services/wallet/commands.go +++ b/services/wallet/commands.go @@ -42,6 +42,7 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) { return err } } + log.Info("initialized downloader for eth historical transfers", "address", c.address, "starting at", c.previous.Number) } ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) defer cancel() @@ -105,6 +106,7 @@ func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) { log.Error("failed to setup historical downloader for erc20") return err } + log.Info("initialized downloader for erc20 historical transfers", "address", c.address, "starting at", c.iterator.Header().Number) } for !c.iterator.Finished() { transfers, err := c.iterator.Next(ctx) @@ -158,6 +160,7 @@ func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) { log.Error("failed to get last known header", "error", err) return err } + log.Info("initialized downloader for new blocks transfers", "starting at", c.previous.Number) } num := new(big.Int).Add(c.previous.Number, one) ctx, cancel := context.WithTimeout(parent, 5*time.Second) @@ -290,24 +293,20 @@ func lastKnownHeader(parent context.Context, db *Database, client HeaderReader, if int64(len(headers)) > safetyLimit.Int64() && isSequence(headers) { return headers[0], nil } - var latest *DBHeader - if len(headers) == 0 { - ctx, cancel := context.WithTimeout(parent, 3*time.Second) - header, err := client.HeaderByNumber(ctx, nil) - cancel() - if err != nil { - return nil, err - } - latest = toDBHeader(header) - } else { - latest = headers[0] + ctx, cancel := context.WithTimeout(parent, 3*time.Second) + header, err := client.HeaderByNumber(ctx, nil) + cancel() + if err != nil { + return nil, err } + log.Info("head of the chain", "number", header.Number) + latest := toDBHeader(header) diff := new(big.Int).Sub(latest.Number, safetyLimit) if diff.Cmp(zero) <= 0 { diff = zero } - ctx, cancel := context.WithTimeout(parent, 3*time.Second) - header, err := client.HeaderByNumber(ctx, safetyLimit) + ctx, cancel = context.WithTimeout(parent, 3*time.Second) + header, err = client.HeaderByNumber(ctx, diff) cancel() if err != nil { return nil, err diff --git a/services/wallet/reactor.go b/services/wallet/reactor.go index 8f0186dba6..0bb84e5cab 100644 --- a/services/wallet/reactor.go +++ b/services/wallet/reactor.go @@ -90,6 +90,7 @@ func (r *Reactor) Start() error { client: r.client, feed: r.feed, safetyDepth: reorgSafetyDepth, + address: address, } r.group.Add(erc20.Command()) eth := ðHistoricalCommand{ From 72ca0112a711689f456d3093cebc12264f68f92a Mon Sep 17 00:00:00 2001 From: Dmitry Date: Fri, 7 Jun 2019 12:57:55 +0300 Subject: [PATCH 06/12] Update accounts with all blocks in the set --- services/wallet/commands.go | 17 ++++++++------ services/wallet/commands_test.go | 5 +++-- services/wallet/database.go | 38 +++++++++++++++++--------------- services/wallet/database_test.go | 14 ++++++------ services/wallet/iterative.go | 1 + services/wallet/reactor.go | 7 +++--- 6 files changed, 45 insertions(+), 37 deletions(-) diff --git a/services/wallet/commands.go b/services/wallet/commands.go index b661c2c185..a86875ede2 100644 --- a/services/wallet/commands.go +++ b/services/wallet/commands.go @@ -42,7 +42,7 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) { return err } } - log.Info("initialized downloader for eth historical transfers", "address", c.address, "starting at", c.previous.Number) + log.Debug("initialized downloader for eth historical transfers", "address", c.address, "starting at", c.previous.Number) } ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) defer cancel() @@ -62,7 +62,7 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) { transfers := concurrent.Get() log.Info("eth historical downloader finished succesfully", "total transfers", len(transfers), "time", time.Since(start)) // TODO(dshulyak) insert 0 block number with transfers - err = c.db.ProcessTranfers(transfers, headersFromTransfers(transfers), nil, ethSync) + err = c.db.ProcessTranfers(transfers, []common.Address{c.address}, headersFromTransfers(transfers), nil, ethSync) if err != nil { log.Error("failed to save downloaded erc20 transfers", "error", err) return err @@ -106,23 +106,26 @@ func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) { log.Error("failed to setup historical downloader for erc20") return err } - log.Info("initialized downloader for erc20 historical transfers", "address", c.address, "starting at", c.iterator.Header().Number) + log.Debug("initialized downloader for erc20 historical transfers", "address", c.address, "starting at", c.iterator.Header().Number) } for !c.iterator.Finished() { + start := time.Now() transfers, err := c.iterator.Next(ctx) if err != nil { log.Error("failed to get next batch", "error", err) break } headers := headersFromTransfers(transfers) + log.Info("storing header of the iterator", "header", c.iterator.Header().Number) headers = append(headers, c.iterator.Header()) - err = c.db.ProcessTranfers(transfers, headers, nil, erc20Sync) + err = c.db.ProcessTranfers(transfers, []common.Address{c.address}, headers, nil, erc20Sync) if err != nil { c.iterator.Revert() log.Error("failed to save downloaded erc20 transfers", "error", err) return err } if len(transfers) > 0 { + log.Debug("erc20 downloader imported transfers", "len", len(transfers), "time", time.Since(start)) c.feed.Send(Event{ Type: EventNewHistory, BlockNumber: c.iterator.Header().Number, @@ -136,6 +139,7 @@ func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) { type newBlocksTransfersCommand struct { db *Database + accounts []common.Address chain *big.Int erc20 *ERC20TransfersDownloader eth *ETHTransferDownloader @@ -160,7 +164,7 @@ func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) { log.Error("failed to get last known header", "error", err) return err } - log.Info("initialized downloader for new blocks transfers", "starting at", c.previous.Number) + log.Debug("initialized downloader for new blocks transfers", "starting at", c.previous.Number) } num := new(big.Int).Add(c.previous.Number, one) ctx, cancel := context.WithTimeout(parent, 5*time.Second) @@ -190,7 +194,7 @@ func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) { log.Debug("reactor adding transfers", "block", added[i].Hash, "number", added[i].Number, "len", len(transfers)) all = append(all, transfers...) } - err = c.db.ProcessTranfers(all, added, removed, erc20Sync|ethSync) + err = c.db.ProcessTranfers(all, c.accounts, added, removed, erc20Sync|ethSync) if err != nil { log.Error("failed to persist transfers", "error", err) return err @@ -299,7 +303,6 @@ func lastKnownHeader(parent context.Context, db *Database, client HeaderReader, if err != nil { return nil, err } - log.Info("head of the chain", "number", header.Number) latest := toDBHeader(header) diff := new(big.Int).Sub(latest.Number, safetyLimit) if diff.Cmp(zero) <= 0 { diff --git a/services/wallet/commands_test.go b/services/wallet/commands_test.go index 27463bca96..5a56f8258f 100644 --- a/services/wallet/commands_test.go +++ b/services/wallet/commands_test.go @@ -41,8 +41,9 @@ func (s *NewBlocksSuite) SetupTest() { s.address = crypto.PubkeyToAddress(account.PublicKey) s.feed = &event.Feed{} s.cmd = &newBlocksTransfersCommand{ - db: s.db, - erc20: NewERC20TransfersDownloader(s.backend.Client, []common.Address{s.address}), + db: s.db, + accounts: []common.Address{s.address}, + erc20: NewERC20TransfersDownloader(s.backend.Client, []common.Address{s.address}), eth: ÐTransferDownloader{ client: s.backend.Client, signer: s.backend.Signer, diff --git a/services/wallet/database.go b/services/wallet/database.go index cf2c3f7fbf..12bf1157a4 100644 --- a/services/wallet/database.go +++ b/services/wallet/database.go @@ -105,7 +105,7 @@ func (db Database) Close() error { } // ProcessTranfers atomically adds/removes blocks and adds new tranfers. -func (db Database) ProcessTranfers(transfers []Transfer, added, removed []*DBHeader, option SyncOption) (err error) { +func (db Database) ProcessTranfers(transfers []Transfer, accounts []common.Address, added, removed []*DBHeader, option SyncOption) (err error) { var ( tx *sql.Tx ) @@ -132,7 +132,7 @@ func (db Database) ProcessTranfers(transfers []Transfer, added, removed []*DBHea if err != nil { return } - err = updateAccounts(tx, transfers, option) + err = updateAccounts(tx, accounts, added, option) return } @@ -351,7 +351,7 @@ func insertTransfers(creator statementCreator, transfers []Transfer) error { return nil } -func updateAccounts(creator statementCreator, transfers []Transfer, option SyncOption) error { +func updateAccounts(creator statementCreator, accounts []common.Address, headers []*DBHeader, option SyncOption) error { update, err := creator.Prepare("UPDATE accounts_to_blocks SET sync=sync|? WHERE address=? AND blk_number=?") if err != nil { return err @@ -360,21 +360,23 @@ func updateAccounts(creator statementCreator, transfers []Transfer, option SyncO if err != nil { return err } - for _, t := range transfers { - rst, err := update.Exec(option, t.Address, (*SQLBigInt)(t.BlockNumber)) - if err != nil { - return err - } - affected, err := rst.RowsAffected() - if err != nil { - return err - } - if affected > 0 { - continue - } - _, err = insert.Exec(t.Address, (*SQLBigInt)(t.BlockNumber), option) - if err != nil { - return err + for _, acc := range accounts { + for _, h := range headers { + rst, err := update.Exec(option, acc, (*SQLBigInt)(h.Number)) + if err != nil { + return err + } + affected, err := rst.RowsAffected() + if err != nil { + return err + } + if affected > 0 { + continue + } + _, err = insert.Exec(acc, (*SQLBigInt)(h.Number), option) + if err != nil { + return err + } } } return nil diff --git a/services/wallet/database_test.go b/services/wallet/database_test.go index de1aa79df5..b9bd41d001 100644 --- a/services/wallet/database_test.go +++ b/services/wallet/database_test.go @@ -114,7 +114,7 @@ func TestDBProcessTransfer(t *testing.T) { Receipt: types.NewReceipt(nil, false, 100), }, } - require.NoError(t, db.ProcessTranfers(transfers, []*DBHeader{header}, nil, 0)) + require.NoError(t, db.ProcessTranfers(transfers, nil, []*DBHeader{header}, nil, 0)) } func TestDBReorgTransfers(t *testing.T) { @@ -134,10 +134,10 @@ func TestDBReorgTransfers(t *testing.T) { replacedTX := types.NewTransaction(2, common.Address{1}, nil, 10, big.NewInt(10), nil) require.NoError(t, db.ProcessTranfers([]Transfer{ {ethTransfer, common.Hash{1}, *originalTX.To(), original.Number, original.Hash, originalTX, rcpt}, - }, []*DBHeader{original}, nil, 0)) + }, nil, []*DBHeader{original}, nil, 0)) require.NoError(t, db.ProcessTranfers([]Transfer{ {ethTransfer, common.Hash{2}, *replacedTX.To(), replaced.Number, replaced.Hash, replacedTX, rcpt}, - }, []*DBHeader{replaced}, []*DBHeader{original}, 0)) + }, nil, []*DBHeader{replaced}, []*DBHeader{original}, 0)) all, err := db.GetTransfers(big.NewInt(0), nil) require.NoError(t, err) @@ -169,7 +169,7 @@ func TestDBGetTransfersFromBlock(t *testing.T) { } transfers = append(transfers, transfer) } - require.NoError(t, db.ProcessTranfers(transfers, headers, nil, 0)) + require.NoError(t, db.ProcessTranfers(transfers, nil, headers, nil, 0)) rst, err := db.GetTransfers(big.NewInt(7), nil) require.NoError(t, err) require.Len(t, rst, 3) @@ -232,8 +232,8 @@ func TestDBProcessTransfersUpdate(t *testing.T) { Transaction: types.NewTransaction(0, common.Address{}, nil, 0, nil, nil), Address: address, } - require.NoError(t, db.ProcessTranfers([]Transfer{transfer}, []*DBHeader{header}, nil, ethSync)) - require.NoError(t, db.ProcessTranfers([]Transfer{transfer}, []*DBHeader{header}, nil, erc20Sync)) + require.NoError(t, db.ProcessTranfers([]Transfer{transfer}, []common.Address{address}, []*DBHeader{header}, nil, ethSync)) + require.NoError(t, db.ProcessTranfers([]Transfer{transfer}, []common.Address{address}, []*DBHeader{header}, nil, erc20Sync)) earliest, err := db.GetEarliestSynced(address, ethSync|erc20Sync) require.NoError(t, err) @@ -248,7 +248,7 @@ func TestDBLastHeadersReverseSorted(t *testing.T) { for i := range headers { headers[i] = &DBHeader{Hash: common.Hash{byte(i)}, Number: big.NewInt(int64(i))} } - require.NoError(t, db.ProcessTranfers(nil, headers, nil, ethSync)) + require.NoError(t, db.ProcessTranfers(nil, nil, headers, nil, ethSync)) headers, err := db.LastHeaders(big.NewInt(5)) require.NoError(t, err) diff --git a/services/wallet/iterative.go b/services/wallet/iterative.go index 1ccde86754..fc542e8817 100644 --- a/services/wallet/iterative.go +++ b/services/wallet/iterative.go @@ -19,6 +19,7 @@ func SetupIterativeDownloader( downloader: downloader, } earliest, err := db.GetEarliestSynced(address, option) + log.Info("earleist synced erc20 block", "address", address, "block", earliest) if err != nil { log.Error("failed to get earliest synced block", "error", err) return nil, err diff --git a/services/wallet/reactor.go b/services/wallet/reactor.go index 0bb84e5cab..079ff1a2e3 100644 --- a/services/wallet/reactor.go +++ b/services/wallet/reactor.go @@ -108,9 +108,10 @@ func (r *Reactor) Start() error { r.group.Add(eth.Command()) } newBlocks := &newBlocksTransfersCommand{ - db: r.db, - chain: r.chain, - client: r.client, + db: r.db, + chain: r.chain, + client: r.client, + accounts: r.accounts, eth: ÐTransferDownloader{ client: r.client, accounts: r.accounts, From 402837a90ca2729e3e8b2853729c44838e3c00b3 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Fri, 7 Jun 2019 13:06:39 +0300 Subject: [PATCH 07/12] Encrypt database with same password taht is used for pfs db --- api/backend.go | 8 ++++---- services/wallet/commands.go | 1 - services/wallet/database.go | 4 ++-- services/wallet/database_test.go | 2 +- services/wallet/service.go | 4 ++-- sqlite/sqlite.go | 12 +++++++++--- 6 files changed, 18 insertions(+), 13 deletions(-) diff --git a/api/backend.go b/api/backend.go index f857f95afe..4a65a90112 100644 --- a/api/backend.go +++ b/api/backend.go @@ -510,7 +510,7 @@ func (b *StatusBackend) reSelectAccount() error { default: return err } - return b.startWallet() + return nil } // SelectAccount selects current wallet and chat accounts, by verifying that each address has corresponding account which can be decrypted @@ -551,10 +551,10 @@ func (b *StatusBackend) SelectAccount(walletAddress, chatAddress, password strin return err } } - return b.startWallet() + return b.startWallet(password) } -func (b *StatusBackend) startWallet() error { +func (b *StatusBackend) startWallet(password string) error { if !b.statusNode.Config().WalletConfig.Enabled { return nil } @@ -567,7 +567,7 @@ func (b *StatusBackend) startWallet() error { return err } path := path.Join(b.statusNode.Config().DataDir, fmt.Sprintf("wallet-%x.sql", account.Address)) - return wallet.StartReactor(path, + return wallet.StartReactor(path, password, b.statusNode.RPCClient().Ethclient(), []common.Address{account.Address}, new(big.Int).SetUint64(b.statusNode.Config().NetworkID)) diff --git a/services/wallet/commands.go b/services/wallet/commands.go index a86875ede2..4ca9c50f41 100644 --- a/services/wallet/commands.go +++ b/services/wallet/commands.go @@ -116,7 +116,6 @@ func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) { break } headers := headersFromTransfers(transfers) - log.Info("storing header of the iterator", "header", c.iterator.Header().Number) headers = append(headers, c.iterator.Header()) err = c.db.ProcessTranfers(transfers, []common.Address{c.address}, headers, nil, erc20Sync) if err != nil { diff --git a/services/wallet/database.go b/services/wallet/database.go index 12bf1157a4..25d1483579 100644 --- a/services/wallet/database.go +++ b/services/wallet/database.go @@ -36,8 +36,8 @@ const ( ) // InitializeDB creates db file at a given path and applies migrations. -func InitializeDB(path string) (*Database, error) { - db, err := sqlite.OpenDB(path) +func InitializeDB(path, password string) (*Database, error) { + db, err := sqlite.OpenDB(path, password) if err != nil { return nil, err } diff --git a/services/wallet/database_test.go b/services/wallet/database_test.go index b9bd41d001..9770846e4d 100644 --- a/services/wallet/database_test.go +++ b/services/wallet/database_test.go @@ -15,7 +15,7 @@ import ( func setupTestDB(t *testing.T) (*Database, func()) { tmpfile, err := ioutil.TempFile("", "wallet-tests-") require.NoError(t, err) - db, err := InitializeDB(tmpfile.Name()) + db, err := InitializeDB(tmpfile.Name(), "wallet-tests") require.NoError(t, err) return db, func() { require.NoError(t, db.Close()) diff --git a/services/wallet/service.go b/services/wallet/service.go index ce46c9d553..bf16f7a244 100644 --- a/services/wallet/service.go +++ b/services/wallet/service.go @@ -34,8 +34,8 @@ func (s *Service) Start(*p2p.Server) error { } // StartReactor separately because it requires known ethereum address, which will become available only after login. -func (s *Service) StartReactor(dbpath string, client *ethclient.Client, accounts []common.Address, chain *big.Int) error { - db, err := InitializeDB(dbpath) +func (s *Service) StartReactor(dbpath, password string, client *ethclient.Client, accounts []common.Address, chain *big.Int) error { + db, err := InitializeDB(dbpath, password) if err != nil { return err } diff --git a/sqlite/sqlite.go b/sqlite/sqlite.go index fc9ba6ea12..afe173c5d2 100644 --- a/sqlite/sqlite.go +++ b/sqlite/sqlite.go @@ -2,12 +2,13 @@ package sqlite import ( "database/sql" + "errors" "fmt" _ "github.com/mutecomm/go-sqlcipher" // We require go sqlcipher that overrides default implementation ) -func openDB(path string) (*sql.DB, error) { +func openDB(path, key string) (*sql.DB, error) { db, err := sql.Open("sqlite3", path) if err != nil { return nil, err @@ -19,6 +20,11 @@ func openDB(path string) (*sql.DB, error) { if _, err = db.Exec("PRAGMA foreign_keys=ON"); err != nil { return nil, err } + keyString := fmt.Sprintf("PRAGMA key = '%s'", key) + if _, err = db.Exec(keyString); err != nil { + return nil, errors.New("failed to set key pragma") + } + // readers do not block writers and faster i/o operations // https://www.sqlite.org/draft/wal.html // must be set after db is encrypted @@ -35,6 +41,6 @@ func openDB(path string) (*sql.DB, error) { } // OpenDB opens not-encrypted database. -func OpenDB(path string) (*sql.DB, error) { - return openDB(path) +func OpenDB(path, key string) (*sql.DB, error) { + return openDB(path, key) } From 4f4d0fa047e585aab3985acf50577cd7b2a62633 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Mon, 10 Jun 2019 08:48:06 +0300 Subject: [PATCH 08/12] Rename ConcurrentRunner to AtomicGroup --- services/wallet/async.go | 86 ++++++++++++++++++++++++++---- services/wallet/commands.go | 28 +++++++--- services/wallet/concurrent.go | 75 ++------------------------ services/wallet/concurrent_test.go | 12 ++--- services/wallet/downloader.go | 4 +- services/wallet/iterative.go | 2 +- services/wallet/reactor.go | 19 +------ services/wallet/service.go | 3 +- 8 files changed, 116 insertions(+), 113 deletions(-) diff --git a/services/wallet/async.go b/services/wallet/async.go index fbc551de9b..1476f7255e 100644 --- a/services/wallet/async.go +++ b/services/wallet/async.go @@ -6,41 +6,41 @@ import ( "time" ) -type Command interface { - Run(context.Context) -} +type Command func(context.Context) error +// FiniteCommand terminates when error is nil. type FiniteCommand struct { Interval time.Duration Runable func(context.Context) error } -func (c FiniteCommand) Run(ctx context.Context) { +func (c FiniteCommand) Run(ctx context.Context) error { ticker := time.NewTicker(c.Interval) for { select { case <-ctx.Done(): - return + return ctx.Err() case <-ticker.C: err := c.Runable(ctx) if err == nil { - return + return nil } } } } +// InfiniteCommand runs until context is closed. type InfiniteCommand struct { Interval time.Duration Runable func(context.Context) error } -func (c InfiniteCommand) Run(ctx context.Context) { +func (c InfiniteCommand) Run(ctx context.Context) error { ticker := time.NewTicker(c.Interval) for { select { case <-ctx.Done(): - return + return ctx.Err() case <-ticker.C: _ = c.Runable(ctx) } @@ -64,12 +64,80 @@ type Group struct { func (g *Group) Add(cmd Command) { g.wg.Add(1) go func() { - cmd.Run(g.ctx) + _ = cmd(g.ctx) g.wg.Done() }() } func (g *Group) Stop() { g.cancel() +} + +func (g *Group) Wait() { g.wg.Wait() } + +func NewAtomicGroup(parent context.Context) *AtomicGroup { + ctx, cancel := context.WithCancel(parent) + return &AtomicGroup{ctx: ctx, cancel: cancel} +} + +// AtomicGroup terminates as soon as first goroutine terminates.. +type AtomicGroup struct { + ctx context.Context + cancel func() + wg sync.WaitGroup + + mu sync.Mutex + error error +} + +// Go spawns function in a goroutine and stores results or errors. +func (d *AtomicGroup) Add(cmd Command) { + d.wg.Add(1) + go func() { + defer d.wg.Done() + err := cmd(d.ctx) + d.mu.Lock() + defer d.mu.Unlock() + if err != nil { + // do not overwrite original error by context errors + if d.error != nil { + return + } + d.error = err + d.cancel() + return + } + }() +} + +// Wait for all downloaders to finish. +func (d *AtomicGroup) Wait() { + d.wg.Wait() + if d.Error() == nil { + d.mu.Lock() + defer d.mu.Unlock() + d.cancel() + } +} + +func (d *AtomicGroup) WaitAsync() <-chan struct{} { + ch := make(chan struct{}) + go func() { + d.Wait() + close(ch) + }() + return ch +} + +// Error stores an error that was reported by any of the downloader. Should be called after Wait. +func (d *AtomicGroup) Error() error { + d.mu.Lock() + defer d.mu.Unlock() + return d.error +} + +func (d *AtomicGroup) Stop() { + d.cancel() +} diff --git a/services/wallet/commands.go b/services/wallet/commands.go index 4ca9c50f41..b58ca2b1c3 100644 --- a/services/wallet/commands.go +++ b/services/wallet/commands.go @@ -23,11 +23,11 @@ type ethHistoricalCommand struct { previous *DBHeader } -func (c *ethHistoricalCommand) Command() FiniteCommand { +func (c *ethHistoricalCommand) Command() Command { return FiniteCommand{ Interval: 5 * time.Second, Runable: c.Run, - } + }.Run } func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) { @@ -90,11 +90,11 @@ type erc20HistoricalCommand struct { iterator *IterativeDownloader } -func (c *erc20HistoricalCommand) Command() FiniteCommand { +func (c *erc20HistoricalCommand) Command() Command { return FiniteCommand{ Interval: 5 * time.Second, Runable: c.Run, - } + }.Run } func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) { @@ -149,11 +149,11 @@ type newBlocksTransfersCommand struct { previous *DBHeader } -func (c *newBlocksTransfersCommand) Command() InfiniteCommand { +func (c *newBlocksTransfersCommand) Command() Command { return InfiniteCommand{ Interval: pollingPeriodByChain(c.chain), Runable: c.Run, - } + }.Run } func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) { @@ -330,3 +330,19 @@ func isSequence(headers []*DBHeader) bool { } return true } + +func headersFromTransfers(transfers []Transfer) []*DBHeader { + byHash := map[common.Hash]struct{}{} + rst := []*DBHeader{} + for i := range transfers { + _, exists := byHash[transfers[i].BlockHash] + if exists { + continue + } + rst = append(rst, &DBHeader{ + Hash: transfers[i].BlockHash, + Number: transfers[i].BlockNumber, + }) + } + return rst +} diff --git a/services/wallet/concurrent.go b/services/wallet/concurrent.go index 4eea484979..e8747c1d1b 100644 --- a/services/wallet/concurrent.go +++ b/services/wallet/concurrent.go @@ -11,13 +11,13 @@ import ( // NewConcurrentDownloader creates ConcurrentDownloader instance. func NewConcurrentDownloader(ctx context.Context) *ConcurrentDownloader { - runner := NewConcurrentRunner(ctx) + runner := NewAtomicGroup(ctx) result := &Result{} return &ConcurrentDownloader{runner, result} } type ConcurrentDownloader struct { - *ConcurrentRunner + *AtomicGroup *Result } @@ -26,7 +26,7 @@ type Result struct { transfers []Transfer } -func (r *Result) Add(transfers ...Transfer) { +func (r *Result) Push(transfers ...Transfer) { r.mu.Lock() defer r.mu.Unlock() r.transfers = append(r.transfers, transfers...) @@ -40,78 +40,13 @@ func (r *Result) Get() []Transfer { return rst } -func NewConcurrentRunner(ctx context.Context) *ConcurrentRunner { - // TODO(dshulyak) rename to atomic group and keep interface consistent with regular Group. - ctx, cancel := context.WithCancel(ctx) - return &ConcurrentRunner{ - ctx: ctx, - cancel: cancel, - } -} - -// ConcurrentRunner runs group atomically. -type ConcurrentRunner struct { - ctx context.Context - cancel func() - wg sync.WaitGroup - - mu sync.Mutex - error error -} - -// Go spawns function in a goroutine and stores results or errors. -func (d *ConcurrentRunner) Go(f func(context.Context) error) { - d.wg.Add(1) - go func() { - defer d.wg.Done() - err := f(d.ctx) - d.mu.Lock() - defer d.mu.Unlock() - if err != nil { - // do not overwrite original error by context errors - if d.error != nil { - return - } - d.error = err - d.cancel() - return - } - }() -} - -// Wait for all downloaders to finish. -func (d *ConcurrentRunner) Wait() { - d.wg.Wait() - if d.Error() == nil { - d.mu.Lock() - defer d.mu.Unlock() - d.cancel() - } -} - -func (d *ConcurrentRunner) WaitAsync() <-chan struct{} { - ch := make(chan struct{}) - go func() { - d.Wait() - close(ch) - }() - return ch -} - -// Error stores an error that was reported by any of the downloader. Should be called after Wait. -func (d *ConcurrentRunner) Error() error { - d.mu.Lock() - defer d.mu.Unlock() - return d.error -} - // TransferDownloader downloads transfers from single block using number. type TransferDownloader interface { GetTransfersByNumber(context.Context, *big.Int) ([]Transfer, error) } func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, downloader TransferDownloader, account common.Address, low, high *big.Int) { - c.Go(func(ctx context.Context) error { + c.Add(func(ctx context.Context) error { log.Debug("eth transfers comparing blocks", "low", low, "high", high) lb, err := client.BalanceAt(ctx, account, low) if err != nil { @@ -131,7 +66,7 @@ func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, down if err != nil { return err } - c.Add(transfers...) + c.Push(transfers...) return nil } mid := new(big.Int).Add(low, high) diff --git a/services/wallet/concurrent_test.go b/services/wallet/concurrent_test.go index e2ba6a400a..d60be339cd 100644 --- a/services/wallet/concurrent_test.go +++ b/services/wallet/concurrent_test.go @@ -15,7 +15,7 @@ import ( func TestConcurrentErrorInterrupts(t *testing.T) { concurrent := NewConcurrentDownloader(context.Background()) var interrupted bool - concurrent.Go(func(ctx context.Context) error { + concurrent.Add(func(ctx context.Context) error { select { case <-ctx.Done(): interrupted = true @@ -24,7 +24,7 @@ func TestConcurrentErrorInterrupts(t *testing.T) { return nil }) err := errors.New("interrupt") - concurrent.Go(func(ctx context.Context) error { + concurrent.Add(func(ctx context.Context) error { return err }) concurrent.Wait() @@ -34,12 +34,12 @@ func TestConcurrentErrorInterrupts(t *testing.T) { func TestConcurrentCollectsTransfers(t *testing.T) { concurrent := NewConcurrentDownloader(context.Background()) - concurrent.Go(func(context.Context) error { - concurrent.Add(Transfer{}) + concurrent.Add(func(context.Context) error { + concurrent.Push(Transfer{}) return nil }) - concurrent.Go(func(context.Context) error { - concurrent.Add(Transfer{}) + concurrent.Add(func(context.Context) error { + concurrent.Push(Transfer{}) return nil }) concurrent.Wait() diff --git a/services/wallet/downloader.go b/services/wallet/downloader.go index 4e52b6cf45..fc6a20a2fa 100644 --- a/services/wallet/downloader.go +++ b/services/wallet/downloader.go @@ -194,12 +194,12 @@ func (d *ERC20TransfersDownloader) transfersFromLogs(parent context.Context, log concurrent := NewConcurrentDownloader(parent) for i := range logs { l := logs[i] - concurrent.Go(func(ctx context.Context) error { + concurrent.Add(func(ctx context.Context) error { transfer, err := d.transferFromLog(ctx, l, address) if err != nil { return err } - concurrent.Add(transfer) + concurrent.Push(transfer) return nil }) } diff --git a/services/wallet/iterative.go b/services/wallet/iterative.go index fc542e8817..8adcfa8c17 100644 --- a/services/wallet/iterative.go +++ b/services/wallet/iterative.go @@ -67,7 +67,7 @@ func (d *IterativeDownloader) Header() *DBHeader { func (d *IterativeDownloader) Next(parent context.Context) ([]Transfer, error) { start := new(big.Int).Sub(d.known.Number, d.batchSize) // if start < 0; start = 0 - if start.Cmp(big.NewInt(0)) <= 0 { + if start.Cmp(zero) <= 0 { start = big.NewInt(0) } ctx, cancel := context.WithTimeout(parent, 5*time.Second) diff --git a/services/wallet/reactor.go b/services/wallet/reactor.go index 079ff1a2e3..ba53b0e0db 100644 --- a/services/wallet/reactor.go +++ b/services/wallet/reactor.go @@ -79,11 +79,11 @@ func (r *Reactor) Start() error { if r.group != nil { return errors.New("already running") } + r.group = NewGroup() // TODO(dshulyak) to support adding accounts in runtime implement keyed group // and export private api to start downloaders from accounts // private api should have access only to reactor for _, address := range r.accounts { - r.group = NewGroup() erc20 := &erc20HistoricalCommand{ db: r.db, erc20: NewERC20TransfersDownloader(r.client, []common.Address{address}), @@ -133,21 +133,6 @@ func (r *Reactor) Stop() { return } r.group.Stop() + r.group.Wait() r.group = nil } - -func headersFromTransfers(transfers []Transfer) []*DBHeader { - byHash := map[common.Hash]struct{}{} - rst := []*DBHeader{} - for i := range transfers { - _, exists := byHash[transfers[i].BlockHash] - if exists { - continue - } - rst = append(rst, &DBHeader{ - Hash: transfers[i].BlockHash, - Number: transfers[i].BlockNumber, - }) - } - return rst -} diff --git a/services/wallet/service.go b/services/wallet/service.go index bf16f7a244..7082d75e75 100644 --- a/services/wallet/service.go +++ b/services/wallet/service.go @@ -63,9 +63,8 @@ func (s *Service) StopReactor() error { // Stop reactor, signals transmitter and close db. func (s *Service) Stop() error { - log.Info("wallet stopping reactor") + log.Info("wallet will be stopped") err := s.StopReactor() - log.Info("wallet stopping signals") s.signals.Stop() log.Info("wallet stopped") return err From 527218abec4d5017a647ef6d6904b0b2dc66a51f Mon Sep 17 00:00:00 2001 From: Dmitry Date: Mon, 10 Jun 2019 14:12:18 +0300 Subject: [PATCH 09/12] Add fast catchup after app was offline for longer than 15 blocks When downloader is started it will go through following procedure: - verify that the last synced block is still in canonical chain - run fast indexer for every account from last synced block to head of the chain - start watching block by block --- services/wallet/api.go | 4 + services/wallet/async.go | 13 +- services/wallet/commands.go | 294 ++++++++++++------ services/wallet/commands_test.go | 20 +- services/wallet/concurrent.go | 3 +- services/wallet/database.go | 40 ++- services/wallet/iterative.go | 58 ++-- services/wallet/iterative_test.go | 18 +- services/wallet/migrations/bindata.go | 2 +- .../migrations/sql/0001_transfers.up.db.sql | 3 +- services/wallet/reactor.go | 32 +- services/wallet/service.go | 2 +- 12 files changed, 315 insertions(+), 174 deletions(-) diff --git a/services/wallet/api.go b/services/wallet/api.go index 11f24ebd17..535c52cdda 100644 --- a/services/wallet/api.go +++ b/services/wallet/api.go @@ -10,6 +10,10 @@ import ( "github.com/ethereum/go-ethereum/log" ) +func NewAPI(s *Service) *API { + return &API{s} +} + // API is class with methods available over RPC. type API struct { s *Service diff --git a/services/wallet/async.go b/services/wallet/async.go index 1476f7255e..253e1def0a 100644 --- a/services/wallet/async.go +++ b/services/wallet/async.go @@ -47,8 +47,8 @@ func (c InfiniteCommand) Run(ctx context.Context) error { } } -func NewGroup() *Group { - ctx, cancel := context.WithCancel(context.Background()) +func NewGroup(parent context.Context) *Group { + ctx, cancel := context.WithCancel(parent) return &Group{ ctx: ctx, cancel: cancel, @@ -77,6 +77,15 @@ func (g *Group) Wait() { g.wg.Wait() } +func (g *Group) WaitAsync() <-chan struct{} { + ch := make(chan struct{}) + go func() { + g.Wait() + close(ch) + }() + return ch +} + func NewAtomicGroup(parent context.Context) *AtomicGroup { ctx, cancel := context.WithCancel(parent) return &AtomicGroup{ctx: ctx, cancel: cancel} diff --git a/services/wallet/commands.go b/services/wallet/commands.go index b58ca2b1c3..0c770bed28 100644 --- a/services/wallet/commands.go +++ b/services/wallet/commands.go @@ -8,19 +8,19 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" ) type ethHistoricalCommand struct { - db *Database - eth TransferDownloader - address common.Address - client reactorClient - feed *event.Feed - safetyDepth *big.Int + db *Database + eth TransferDownloader + address common.Address + client reactorClient + feed *event.Feed - previous *DBHeader + from, to *big.Int } func (c *ethHistoricalCommand) Command() Command { @@ -31,24 +31,23 @@ func (c *ethHistoricalCommand) Command() Command { } func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) { - if c.previous == nil { - c.previous, err = c.db.GetEarliestSynced(c.address, ethSync) + if c.from == nil { + from, err := c.db.GetLatestSynced(c.address, ethSync) if err != nil { return err } - if c.previous == nil { - c.previous, err = lastKnownHeader(ctx, c.db, c.client, c.safetyDepth) - if err != nil { - return err - } + if from == nil { + c.from = zero + } else { + c.from = from.Number } - log.Debug("initialized downloader for eth historical transfers", "address", c.address, "starting at", c.previous.Number) + log.Debug("initialized downloader for eth historical transfers", "address", c.address, "starting at", c.from, "up to", c.to) } ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) defer cancel() concurrent := NewConcurrentDownloader(ctx) start := time.Now() - downloadEthConcurrently(concurrent, c.client, c.eth, c.address, zero, c.previous.Number) + downloadEthConcurrently(concurrent, c.client, c.eth, c.address, c.from, c.to) select { case <-concurrent.WaitAsync(): case <-ctx.Done(): @@ -56,7 +55,7 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) { return errors.New("eth downloader is stuck") } if concurrent.Error() != nil { - log.Error("failed to dowloader transfers using concurrent downloader", "error", err) + log.Error("failed to dowload transfers using concurrent downloader", "error", err) return concurrent.Error() } transfers := concurrent.Get() @@ -70,8 +69,8 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) { if len(transfers) > 0 { // we download all or nothing c.feed.Send(Event{ - Type: EventNewHistory, - BlockNumber: zero, + Type: EventNewBlock, + BlockNumber: c.from, Accounts: []common.Address{c.address}, }) } @@ -80,14 +79,14 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) { } type erc20HistoricalCommand struct { - db *Database - erc20 BatchDownloader - address common.Address - client reactorClient - feed *event.Feed - safetyDepth *big.Int + db *Database + erc20 BatchDownloader + address common.Address + client reactorClient + feed *event.Feed iterator *IterativeDownloader + to *DBHeader } func (c *erc20HistoricalCommand) Command() Command { @@ -101,12 +100,11 @@ func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) { if c.iterator == nil { c.iterator, err = SetupIterativeDownloader( c.db, c.client, c.address, erc20Sync, - c.erc20, erc20BatchSize, c.safetyDepth) + c.erc20, erc20BatchSize, c.to) if err != nil { log.Error("failed to setup historical downloader for erc20") return err } - log.Debug("initialized downloader for erc20 historical transfers", "address", c.address, "starting at", c.iterator.Header().Number) } for !c.iterator.Finished() { start := time.Now() @@ -126,7 +124,7 @@ func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) { if len(transfers) > 0 { log.Debug("erc20 downloader imported transfers", "len", len(transfers), "time", time.Since(start)) c.feed.Send(Event{ - Type: EventNewHistory, + Type: EventNewBlock, BlockNumber: c.iterator.Header().Number, Accounts: []common.Address{c.address}, }) @@ -137,35 +135,58 @@ func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) { } type newBlocksTransfersCommand struct { - db *Database - accounts []common.Address - chain *big.Int - erc20 *ERC20TransfersDownloader - eth *ETHTransferDownloader - client reactorClient - feed *event.Feed - safetyDepth *big.Int + db *Database + accounts []common.Address + chain *big.Int + erc20 *ERC20TransfersDownloader + eth *ETHTransferDownloader + client reactorClient + feed *event.Feed - previous *DBHeader + from, to *DBHeader } func (c *newBlocksTransfersCommand) Command() Command { + // if both blocks are specified we will use this command to verify that lastly synced blocks are still + // in canonical chain + if c.to != nil && c.from != nil { + return FiniteCommand{ + Interval: 5 * time.Second, + Runable: c.Verify, + }.Run + } return InfiniteCommand{ Interval: pollingPeriodByChain(c.chain), Runable: c.Run, }.Run } +func (c *newBlocksTransfersCommand) Verify(parent context.Context) (err error) { + if c.to == nil || c.from == nil { + return errors.New("`from` and `to` blocks must be specified") + } + for c.from.Number.Cmp(c.to.Number) != 0 { + err = c.Run(parent) + if err != nil { + return err + } + } + return nil +} + func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) { - if c.previous == nil { - c.previous, err = lastKnownHeader(parent, c.db, c.client, c.safetyDepth) + if c.from == nil { + ctx, cancel := context.WithTimeout(parent, 3*time.Second) + from, err := c.client.HeaderByNumber(ctx, nil) + cancel() if err != nil { log.Error("failed to get last known header", "error", err) return err } - log.Debug("initialized downloader for new blocks transfers", "starting at", c.previous.Number) + c.from = toDBHeader(from) + log.Debug("initialized downloader for new blocks transfers", "starting at", c.from.Number) } - num := new(big.Int).Add(c.previous.Number, one) + num := new(big.Int).Add(c.from.Number, one) ctx, cancel := context.WithTimeout(parent, 5*time.Second) latest, err := c.client.HeaderByNumber(ctx, num) cancel() @@ -175,7 +196,7 @@ func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) { } log.Debug("reactor received new block", "header", latest.Hash()) ctx, cancel = context.WithTimeout(parent, 10*time.Second) - added, removed, err := c.onNewBlock(ctx, c.previous, latest) + added, removed, err := c.onNewBlock(ctx, c.from, latest) cancel() if err != nil { log.Error("failed to process new header", "header", latest, "error", err) @@ -198,7 +219,7 @@ func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) { log.Error("failed to persist transfers", "error", err) return err } - c.previous = toDBHeader(latest) + c.from = toDBHeader(latest) if len(added) == 1 && len(removed) == 0 { c.feed.Send(Event{ Type: EventNewBlock, @@ -217,14 +238,14 @@ func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) { return nil } -func (c *newBlocksTransfersCommand) onNewBlock(ctx context.Context, previous *DBHeader, latest *types.Header) (added, removed []*DBHeader, err error) { - if previous == nil { +func (c *newBlocksTransfersCommand) onNewBlock(ctx context.Context, from *DBHeader, latest *types.Header) (added, removed []*DBHeader, err error) { + if from == nil { // first node in the cache - return []*DBHeader{toDBHeader(latest)}, nil, nil + return []*DBHeader{toHead(latest)}, nil, nil } - if previous.Hash == latest.ParentHash { - // parent matching previous node in the cache. on the same chain. - return []*DBHeader{toDBHeader(latest)}, nil, nil + if from.Hash == latest.ParentHash { + // parent matching from node in the cache. on the same chain. + return []*DBHeader{toHead(latest)}, nil, nil } exists, err := c.db.HeaderExists(latest.Hash()) if err != nil { @@ -233,20 +254,20 @@ func (c *newBlocksTransfersCommand) onNewBlock(ctx context.Context, previous *DB if exists { return nil, nil, nil } - log.Debug("wallet reactor spotted reorg", "last header in db", previous.Hash, "new parent", latest.ParentHash) - for previous != nil && previous.Hash != latest.ParentHash { - removed = append(removed, previous) - added = append(added, toDBHeader(latest)) + log.Debug("wallet reactor spotted reorg", "last header in db", from.Hash, "new parent", latest.ParentHash) + for from != nil && from.Hash != latest.ParentHash { + removed = append(removed, from) + added = append(added, toHead(latest)) latest, err = c.client.HeaderByHash(ctx, latest.ParentHash) if err != nil { return nil, nil, err } - previous, err = c.db.GetHeaderByNumber(new(big.Int).Sub(latest.Number, one)) + from, err = c.db.GetHeaderByNumber(new(big.Int).Sub(latest.Number, one)) if err != nil { return nil, nil, err } } - added = append(added, toDBHeader(latest)) + added = append(added, toHead(latest)) return added, removed, nil } @@ -266,54 +287,131 @@ func (c *newBlocksTransfersCommand) getTransfers(parent context.Context, header return append(ethT, erc20T...), nil } -func uniqueAccountsFromTransfers(transfers []Transfer) []common.Address { - accounts := []common.Address{} - unique := map[common.Address]struct{}{} - for i := range transfers { - _, exist := unique[transfers[i].Address] - if exist { - continue +type controlCommand struct { + accounts []common.Address + db *Database + eth *ETHTransferDownloader + erc20 *ERC20TransfersDownloader + chain *big.Int + client *ethclient.Client + feed *event.Feed + safetyDepth *big.Int +} + +// run fast indexing for every accont managed by this command. +func (c *controlCommand) fastIndex(ctx context.Context, to *DBHeader) error { + start := time.Now() + group := NewGroup(ctx) + for _, address := range c.accounts { + erc20 := &erc20HistoricalCommand{ + db: c.db, + erc20: NewERC20TransfersDownloader(c.client, []common.Address{address}), + client: c.client, + feed: c.feed, + address: address, + to: to, } - unique[transfers[i].Address] = struct{}{} - accounts = append(accounts, transfers[i].Address) + group.Add(erc20.Command()) + eth := ðHistoricalCommand{ + db: c.db, + client: c.client, + address: address, + eth: ÐTransferDownloader{ + client: c.client, + accounts: []common.Address{address}, + signer: types.NewEIP155Signer(c.chain), + }, + feed: c.feed, + to: to.Number, + } + group.Add(eth.Command()) + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-group.WaitAsync(): + log.Debug("fast indexer finished", "in", time.Since(start)) + return nil } - return accounts } -// lastKnownHeader selects last stored header in database. Such header should have atleast safety depth predecessor in our database. -// We don't store every single header in the database. -// Historical downloaders storing only block where transfer was found. -// New block downloaders store every block it downloaded. -// It could happen that historical downloader found transfers in block 15. With a current head set at 20. -// If we will notice reorg at 20 but chain was rewritten starting from 10th block we won't be able to backtrack that transfer -// found in 15 block was removed from chain. -// See tests TestSafetyBufferFailed and TestSafetyBufferSuccess. -func lastKnownHeader(parent context.Context, db *Database, client HeaderReader, safetyLimit *big.Int) (*DBHeader, error) { - headers, err := db.LastHeaders(safetyLimit) +func (c *controlCommand) verifyLastSynced(parent context.Context, last *DBHeader, head *types.Header) error { + log.Debug("verifying that previous header is still in canonical chan", "from", last.Number, "chain head", head.Number) + if new(big.Int).Sub(head.Number, last.Number).Cmp(c.safetyDepth) <= 0 { + log.Debug("no need to verify. last block is close enough to chain head") + return nil + } + header, err := c.client.HeaderByNumber(parent, new(big.Int).Add(last.Number, c.safetyDepth)) if err != nil { - return nil, err + return err } - if int64(len(headers)) > safetyLimit.Int64() && isSequence(headers) { - return headers[0], nil + log.Debug("spawn reorg verifier", "from", last.Number, "to", header.Number) + cmd := &newBlocksTransfersCommand{ + db: c.db, + chain: c.chain, + client: c.client, + accounts: c.accounts, + eth: c.eth, + erc20: c.erc20, + feed: c.feed, + + from: last, + to: toDBHeader(header), } - ctx, cancel := context.WithTimeout(parent, 3*time.Second) - header, err := client.HeaderByNumber(ctx, nil) - cancel() + return cmd.Command()(parent) +} + +func (c *controlCommand) Run(parent context.Context) error { + log.Debug("start control command") + head, err := c.client.HeaderByNumber(parent, nil) if err != nil { - return nil, err + return err } - latest := toDBHeader(header) - diff := new(big.Int).Sub(latest.Number, safetyLimit) - if diff.Cmp(zero) <= 0 { - diff = zero + log.Debug("current head is", "block number", head.Number) + last, err := c.db.GetLastHead() + if err != nil { + log.Error("failed to load last head from database", "error", err) + return err } - ctx, cancel = context.WithTimeout(parent, 3*time.Second) - header, err = client.HeaderByNumber(ctx, diff) - cancel() + if last != nil { + err = c.verifyLastSynced(parent, last, head) + if err != nil { + log.Error("failed verification for last header in canonical chain", "error", err) + return err + } + } + target := new(big.Int).Sub(head.Number, c.safetyDepth) + if target.Cmp(zero) <= 0 { + target = zero + } + head, err = c.client.HeaderByNumber(parent, target) if err != nil { - return nil, err + return err } - return toDBHeader(header), nil + log.Debug("run fast indexing for the transfers", "up to", head.Number) + err = c.fastIndex(parent, toDBHeader(head)) + if err != nil { + return err + } + log.Debug("watching new blocks", "start from", head.Number) + cmd := &newBlocksTransfersCommand{ + db: c.db, + chain: c.chain, + client: c.client, + accounts: c.accounts, + eth: c.eth, + erc20: c.erc20, + feed: c.feed, + from: toDBHeader(head), + } + return cmd.Command()(parent) +} + +func (c *controlCommand) Command() Command { + return FiniteCommand{ + Interval: 5 * time.Second, + Runable: c.Run, + }.Run } func isSequence(headers []*DBHeader) bool { @@ -346,3 +444,17 @@ func headersFromTransfers(transfers []Transfer) []*DBHeader { } return rst } + +func uniqueAccountsFromTransfers(transfers []Transfer) []common.Address { + accounts := []common.Address{} + unique := map[common.Address]struct{}{} + for i := range transfers { + _, exist := unique[transfers[i].Address] + if exist { + continue + } + unique[transfers[i].Address] = struct{}{} + accounts = append(accounts, transfers[i].Address) + } + return accounts +} diff --git a/services/wallet/commands_test.go b/services/wallet/commands_test.go index 5a56f8258f..46f0af01c3 100644 --- a/services/wallet/commands_test.go +++ b/services/wallet/commands_test.go @@ -49,9 +49,8 @@ func (s *NewBlocksSuite) SetupTest() { signer: s.backend.Signer, accounts: []common.Address{s.address}, }, - feed: s.feed, - client: s.backend.Client, - safetyDepth: big.NewInt(15), + feed: s.feed, + client: s.backend.Client, } } @@ -126,6 +125,7 @@ func (s *NewBlocksSuite) TestReorg() { // `not found` returned when we query head+1 block ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) defer cancel() + s.cmd.from = toDBHeader(s.backend.Ethereum.BlockChain().GetHeaderByNumber(15)) s.Require().EqualError(s.runCmdUntilError(ctx), "not found") transfers, err := s.db.GetTransfers(big.NewInt(0), nil) @@ -139,7 +139,7 @@ func (s *NewBlocksSuite) TestReorg() { s.Require().Equal(10, n) s.Require().NoError(err) - // it will be less but even if something wrong we can't get more + // it will be less but even if something went wrong we can't get more events := make(chan Event, 10) sub := s.feed.Subscribe(events) defer sub.Unsubscribe() @@ -181,10 +181,10 @@ func (s *NewBlocksSuite) downloadHistorical() { signer: s.backend.Signer, accounts: []common.Address{s.address}, }, - feed: s.feed, - address: s.address, - client: s.backend.Client, - safetyDepth: big.NewInt(0), + feed: s.feed, + address: s.address, + client: s.backend.Client, + to: s.backend.Ethereum.BlockChain().CurrentBlock().Number(), } s.Require().NoError(eth.Run(context.Background()), "eth historical command failed to sync transfers") transfers, err := s.db.GetTransfers(big.NewInt(0), nil) @@ -211,7 +211,6 @@ func (s *NewBlocksSuite) reorgHistorical() { func (s *NewBlocksSuite) TestSafetyBufferFailure() { s.downloadHistorical() - s.cmd.safetyDepth = big.NewInt(0) s.reorgHistorical() transfers, err := s.db.GetTransfers(big.NewInt(0), nil) @@ -222,7 +221,8 @@ func (s *NewBlocksSuite) TestSafetyBufferFailure() { func (s *NewBlocksSuite) TestSafetyBufferSuccess() { s.downloadHistorical() - s.cmd.safetyDepth = big.NewInt(10) + safety := new(big.Int).Sub(s.backend.Ethereum.BlockChain().CurrentHeader().Number, big.NewInt(10)) + s.cmd.from = toDBHeader(s.backend.Ethereum.BlockChain().GetHeaderByNumber(safety.Uint64())) s.reorgHistorical() transfers, err := s.db.GetTransfers(big.NewInt(0), nil) diff --git a/services/wallet/concurrent.go b/services/wallet/concurrent.go index e8747c1d1b..5472832e89 100644 --- a/services/wallet/concurrent.go +++ b/services/wallet/concurrent.go @@ -61,7 +61,6 @@ func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, down return nil } if new(big.Int).Sub(high, low).Cmp(one) == 0 { - log.Debug("higher block is a parent", "low", low, "high", high) transfers, err := downloader.GetTransfersByNumber(ctx, high) if err != nil { return err @@ -71,7 +70,7 @@ func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, down } mid := new(big.Int).Add(low, high) mid = mid.Div(mid, two) - log.Debug("balances are not equal spawn two concurrent downloaders", "low", low, "mid", mid, "high", high) + log.Debug("balances are not equal. spawn two concurrent downloaders", "low", low, "mid", mid, "high", high) downloadEthConcurrently(c, client, downloader, account, low, mid) downloadEthConcurrently(c, client, downloader, account, mid, high) return nil diff --git a/services/wallet/database.go b/services/wallet/database.go index 25d1483579..cbfd699185 100644 --- a/services/wallet/database.go +++ b/services/wallet/database.go @@ -17,6 +17,8 @@ import ( type DBHeader struct { Number *big.Int Hash common.Hash + // Head is true if the block was a head at the time it was pulled from chain. + Head bool } func toDBHeader(header *types.Header) *DBHeader { @@ -26,6 +28,14 @@ func toDBHeader(header *types.Header) *DBHeader { } } +func toHead(header *types.Header) *DBHeader { + return &DBHeader{ + Hash: header.Hash(), + Number: header.Number, + Head: true, + } +} + // SyncOption is used to specify that application processed transfers for that block. type SyncOption uint @@ -283,6 +293,18 @@ func (db *Database) GetHeaderByNumber(number *big.Int) (header *DBHeader, err er return nil, err } +func (db *Database) GetLastHead() (header *DBHeader, err error) { + header = &DBHeader{Hash: common.Hash{}, Number: new(big.Int)} + err = db.db.QueryRow("SELECT hash,number FROM blocks WHERE head = 1").Scan(&header.Hash, (*SQLBigInt)(header.Number)) + if err == nil { + return header, nil + } + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err +} + func (db *Database) GetEarliestSynced(address common.Address, option SyncOption) (header *DBHeader, err error) { rows, err := db.db.Query(` SELECT blocks.hash, blk_number FROM accounts_to_blocks JOIN blocks ON blk_number = blocks.number WHERE address = $1 AND blk_number @@ -304,6 +326,20 @@ SELECT blocks.hash, blk_number FROM accounts_to_blocks JOIN blocks ON blk_number return nil, nil } +func (db *Database) GetLatestSynced(address common.Address, option SyncOption) (header *DBHeader, err error) { + header = &DBHeader{Hash: common.Hash{}, Number: new(big.Int)} + err = db.db.QueryRow(` +SELECT blocks.hash, blk_number FROM accounts_to_blocks JOIN blocks ON blk_number = blocks.number WHERE address = $1 AND blk_number += (SELECT MAX(blk_number) FROM accounts_to_blocks WHERE address = $1 AND sync & $2 = $2)`, address, option).Scan(&header.Hash, (*SQLBigInt)(header.Number)) + if err == nil { + return header, nil + } + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err +} + // statementCreator allows to pass transaction or database to use in consumer. type statementCreator interface { Prepare(query string) (*sql.Stmt, error) @@ -324,12 +360,12 @@ func deleteHeaders(creator statementCreator, headers []*DBHeader) error { } func insertHeaders(creator statementCreator, headers []*DBHeader) error { - insert, err := creator.Prepare("INSERT OR IGNORE INTO blocks(hash, number) VALUES (?, ?)") + insert, err := creator.Prepare("INSERT OR IGNORE INTO blocks(hash, number, head) VALUES (?, ?, ?)") if err != nil { return err } for _, h := range headers { - _, err = insert.Exec(h.Hash, (*SQLBigInt)(h.Number)) + _, err = insert.Exec(h.Hash, (*SQLBigInt)(h.Number), h.Head) if err != nil { return err } diff --git a/services/wallet/iterative.go b/services/wallet/iterative.go index 8adcfa8c17..0cccabdc29 100644 --- a/services/wallet/iterative.go +++ b/services/wallet/iterative.go @@ -12,27 +12,23 @@ import ( // SetupIterativeDownloader configures IterativeDownloader with last known synced block. func SetupIterativeDownloader( db *Database, client HeaderReader, address common.Address, option SyncOption, - downloader BatchDownloader, size *big.Int, limit *big.Int) (*IterativeDownloader, error) { + downloader BatchDownloader, size *big.Int, to *DBHeader) (*IterativeDownloader, error) { + from, err := db.GetLatestSynced(address, option) + if err != nil { + log.Error("failed to get latest synced block", "error", err) + return nil, err + } + if from == nil { + from = &DBHeader{Number: zero} + } + log.Debug("iterative downloader", "address", address, "from", from.Number, "to", to.Number) d := &IterativeDownloader{ client: client, batchSize: size, downloader: downloader, + from: from, + to: to, } - earliest, err := db.GetEarliestSynced(address, option) - log.Info("earleist synced erc20 block", "address", address, "block", earliest) - if err != nil { - log.Error("failed to get earliest synced block", "error", err) - return nil, err - } - if earliest == nil { - previous, err := lastKnownHeader(context.Background(), db, client, limit) - if err != nil { - log.Error("failed to get last known header", "error", err) - return nil, err - } - earliest = previous - } - d.known = earliest return d, nil } @@ -49,41 +45,41 @@ type IterativeDownloader struct { downloader BatchDownloader - known *DBHeader + from, to *DBHeader previous *DBHeader } // Finished true when earliest block with given sync option is zero. func (d *IterativeDownloader) Finished() bool { - return d.known.Number.Cmp(big.NewInt(0)) == 0 + return d.from.Number.Cmp(d.to.Number) == 0 } // Header return last synced header. func (d *IterativeDownloader) Header() *DBHeader { - return d.known + return d.previous } // Next moves closer to the end on every new iteration. func (d *IterativeDownloader) Next(parent context.Context) ([]Transfer, error) { - start := new(big.Int).Sub(d.known.Number, d.batchSize) + to := new(big.Int).Add(d.from.Number, d.batchSize) // if start < 0; start = 0 - if start.Cmp(zero) <= 0 { - start = big.NewInt(0) + if to.Cmp(d.to.Number) == 1 { + to = d.to.Number } - ctx, cancel := context.WithTimeout(parent, 5*time.Second) - from, err := d.client.HeaderByNumber(ctx, start) - cancel() + transfers, err := d.downloader.GetTransfersInRange(parent, d.from.Number, to) if err != nil { - log.Error("failed to get header by number", "number", start, "error", err) + log.Error("failed to get transfer inbetween two bloks", "from", d.from.Number, "to", to, "error", err) return nil, err } - transfers, err := d.downloader.GetTransfersInRange(parent, start, d.known.Number) + // use integers instead of DBHeader + ctx, cancel := context.WithTimeout(parent, 5*time.Second) + header, err := d.client.HeaderByNumber(ctx, to) + cancel() if err != nil { - log.Error("failed to get transfer inbetween two bloks", "from", start, "to", d.known.Number, "error", err) + log.Error("failed to get header by number", "from", d.from.Number, "to", to, "error", err) return nil, err } - // use integers instead of DBHeader - d.previous, d.known = d.known, toDBHeader(from) + d.previous, d.from = d.from, toDBHeader(header) return transfers, nil } @@ -91,6 +87,6 @@ func (d *IterativeDownloader) Next(parent context.Context) ([]Transfer, error) { // For example failed to persist them. func (d *IterativeDownloader) Revert() { if d.previous != nil { - d.known = d.previous + d.from = d.previous } } diff --git a/services/wallet/iterative_test.go b/services/wallet/iterative_test.go index 99d435ec86..6739e046d0 100644 --- a/services/wallet/iterative_test.go +++ b/services/wallet/iterative_test.go @@ -24,19 +24,26 @@ func (f transfersFixture) GetTransfersInRange(ctx context.Context, from, to *big } func TestIterFinished(t *testing.T) { - iterator := IterativeDownloader{known: &DBHeader{Number: big.NewInt(0)}} + iterator := IterativeDownloader{ + from: &DBHeader{Number: big.NewInt(10)}, + to: &DBHeader{Number: big.NewInt(10)}, + } require.True(t, iterator.Finished()) } func TestIterNotFinished(t *testing.T) { - iterator := IterativeDownloader{known: &DBHeader{Number: big.NewInt(2)}} + iterator := IterativeDownloader{ + from: &DBHeader{Number: big.NewInt(2)}, + to: &DBHeader{Number: big.NewInt(5)}, + } require.False(t, iterator.Finished()) } func TestIterRevert(t *testing.T) { iterator := IterativeDownloader{ - known: &DBHeader{Number: big.NewInt(0)}, - previous: &DBHeader{Number: big.NewInt(10)}, + from: &DBHeader{Number: big.NewInt(12)}, + to: &DBHeader{Number: big.NewInt(12)}, + previous: &DBHeader{Number: big.NewInt(9)}, } require.True(t, iterator.Finished()) iterator.Revert() @@ -58,7 +65,8 @@ func TestIterProgress(t *testing.T) { client: chain, downloader: transfers, batchSize: big.NewInt(5), - known: &DBHeader{Number: big.NewInt(9)}, + from: &DBHeader{Number: big.NewInt(0)}, + to: &DBHeader{Number: big.NewInt(9)}, } batch, err := iter.Next(context.TODO()) require.NoError(t, err) diff --git a/services/wallet/migrations/bindata.go b/services/wallet/migrations/bindata.go index e9522dfb1e..88abbccca0 100644 --- a/services/wallet/migrations/bindata.go +++ b/services/wallet/migrations/bindata.go @@ -34,7 +34,7 @@ func _0001_transfers_down_db_sql() ([]byte, error) { ) } -var __0001_transfers_up_db_sql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x8c\x91\x51\x6f\x82\x30\x14\x85\xdf\xfb\x2b\xee\xa3\x24\xfd\x07\x7b\x2a\x78\xd5\x66\xac\xdd\x4a\x99\xf3\x89\x20\x76\xd3\xa8\x85\x51\x48\xe6\xbf\x5f\x10\xaa\x73\x33\x64\x8f\xdc\xc3\xbd\xe7\x9c\xaf\x91\x42\xa6\x11\x34\x0b\x63\x04\x3e\x03\x21\x35\xe0\x1b\x4f\x74\x02\x4d\x9d\x5b\xf7\x6e\x6a\x07\x13\xb2\xcd\xdd\x16\x5e\x99\x8a\x16\x4c\x41\x2a\xf8\x4b\x8a\x94\xe4\x9b\x4d\x6d\x9c\xbb\xcc\xbb\x5d\x91\xc6\x31\x25\xeb\xc3\x3e\xbb\x59\xb9\x4a\xcd\x17\x84\xb1\x0c\x29\xa9\x4d\x61\x76\x55\x33\x7c\x35\xa7\xca\xdc\xf9\x7b\x26\x15\xf2\xb9\x80\x47\x5c\x4d\xfc\xd1\x00\x14\xce\x50\xa1\x88\x30\x81\xf5\xa1\x2c\xf6\x6e\xd2\xcf\xa5\x80\x29\xc6\xa8\x11\x22\x96\x44\x6c\x8a\x94\x44\x52\x24\x5a\x31\x2e\x34\xb4\x76\xf7\xd9\x9a\xcc\xd7\xca\x4a\x7b\x3e\x97\xf9\x1a\x7d\x2d\x38\xdf\xa2\xc3\x30\x20\xc1\x03\x21\x23\x90\x7a\xff\xdf\x84\x9e\x15\x7f\x62\x6a\xd5\xc5\xa6\xc4\xb6\xc7\xb5\xa9\x21\xe4\xf3\x2e\xc5\xe0\xe2\x2b\x92\x00\x96\x5c\x2f\x64\xaa\x41\xc9\x25\x9f\x8e\xbb\xe5\x45\x51\xb6\xb6\x71\x59\x53\x66\x17\xe7\xf1\x67\xb8\x75\xbf\x6a\xee\x64\x0b\xe0\x42\xff\x65\xdc\x6f\xdc\xa3\xec\x95\x7f\x71\x3e\xe6\x55\xb5\xb3\x1f\x1d\xe6\x21\x61\x1f\xd9\x27\xf2\xb8\x07\x91\xfe\xb0\xee\xa0\x7f\x07\x00\x00\xff\xff\xd1\xf0\x46\xeb\x99\x02\x00\x00") +var __0001_transfers_up_db_sql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x8c\x91\xd1\x72\xaa\x30\x10\x86\xef\xf3\x14\x7b\x29\x33\x79\x83\x73\x15\x60\xd1\xcc\xc9\x49\x4e\x43\xa8\xf5\x8a\x41\x4c\xab\xa3\x06\x4a\x60\xa6\xbe\x7d\x07\x01\xad\xad\xe3\xf4\x32\xbb\xd9\xdd\xef\xff\xff\x48\x23\x33\x08\x86\x85\x02\x81\x27\x20\x95\x01\x7c\xe1\xa9\x49\xa1\x6d\x0a\xe7\x5f\x6d\xe3\x61\x46\xb6\x85\xdf\xc2\x33\xd3\xd1\x82\x69\xc8\x24\x7f\xca\x90\x92\x62\xb3\x69\xac\xf7\x97\x7a\x3f\x2b\x33\x21\x28\x59\x1f\xf6\xf9\xcd\xc8\xb5\xd5\x7e\x40\x28\x54\x48\x49\x63\x4b\xbb\xab\xdb\xf1\xd5\x9e\x6a\x7b\xe7\x77\xa2\x34\xf2\xb9\x84\xbf\xb8\x9a\x4d\x4b\x03\xd0\x98\xa0\x46\x19\x61\x0a\xeb\x43\x55\xee\xfd\x6c\xa8\x2b\x09\x31\x0a\x34\x08\x11\x4b\x23\x16\x23\x25\x91\x92\xa9\xd1\x8c\x4b\x03\x9d\xdb\xbd\x77\x36\x9f\x64\xe5\x95\x3b\xaf\xcb\x27\x19\x83\x2c\x38\xef\xa2\x63\x31\x20\xc1\x1f\x42\x1e\x98\x34\xdc\xff\xee\xd0\x7f\xcd\xff\x31\xbd\xea\xb1\x29\x71\xdd\x71\x6d\x1b\x08\xf9\xbc\xa7\x18\xaf\x5c\x25\x6e\x6d\xb1\x81\x50\x29\x01\x31\x26\x2c\x13\x06\x12\x26\x52\x24\x01\x2c\xb9\x59\xa8\xcc\x80\x56\x4b\x1e\x3f\xc6\x28\xca\xb2\xea\x5c\xeb\xf3\xb6\xca\x2f\x48\x8f\xf3\xb9\xc5\xba\xf6\xfc\xc9\x95\xc0\xa5\xf9\x69\xfe\x30\x71\xcf\xfe\xa9\xf3\xab\x00\x8e\x45\x5d\xef\xdc\x5b\xef\xff\x48\x38\x20\x4f\x44\x53\x0e\x63\x93\x7e\x39\xdd\xa7\xf1\x19\x00\x00\xff\xff\xfc\x91\xad\x60\xb2\x02\x00\x00") func _0001_transfers_up_db_sql() ([]byte, error) { return bindata_read( diff --git a/services/wallet/migrations/sql/0001_transfers.up.db.sql b/services/wallet/migrations/sql/0001_transfers.up.db.sql index e8e4e00523..9b5a380aca 100644 --- a/services/wallet/migrations/sql/0001_transfers.up.db.sql +++ b/services/wallet/migrations/sql/0001_transfers.up.db.sql @@ -11,7 +11,8 @@ CONSTRAINT unique_transfer_on_hash_address UNIQUE (hash,address) CREATE TABLE IF NOT EXISTS blocks ( hash VARCHAR PRIMARY KEY, -number BIGINT UNIQUE NOT NULL +number BIGINT UNIQUE NOT NULL, +head BOOL DEFAULT FALSE ) WITHOUT ROWID; CREATE TABLE IF NOT EXISTS accounts_to_blocks ( diff --git a/services/wallet/reactor.go b/services/wallet/reactor.go index ba53b0e0db..abb412d21a 100644 --- a/services/wallet/reactor.go +++ b/services/wallet/reactor.go @@ -30,7 +30,7 @@ func pollingPeriodByChain(chain *big.Int) time.Duration { var ( reorgSafetyDepth = big.NewInt(15) - erc20BatchSize = big.NewInt(50000) + erc20BatchSize = big.NewInt(100000) ) // HeaderReader interface for reading headers using block number or hash. @@ -79,35 +79,11 @@ func (r *Reactor) Start() error { if r.group != nil { return errors.New("already running") } - r.group = NewGroup() + r.group = NewGroup(context.Background()) // TODO(dshulyak) to support adding accounts in runtime implement keyed group // and export private api to start downloaders from accounts // private api should have access only to reactor - for _, address := range r.accounts { - erc20 := &erc20HistoricalCommand{ - db: r.db, - erc20: NewERC20TransfersDownloader(r.client, []common.Address{address}), - client: r.client, - feed: r.feed, - safetyDepth: reorgSafetyDepth, - address: address, - } - r.group.Add(erc20.Command()) - eth := ðHistoricalCommand{ - db: r.db, - client: r.client, - address: address, - eth: ÐTransferDownloader{ - client: r.client, - accounts: []common.Address{address}, - signer: types.NewEIP155Signer(r.chain), - }, - feed: r.feed, - safetyDepth: reorgSafetyDepth, - } - r.group.Add(eth.Command()) - } - newBlocks := &newBlocksTransfersCommand{ + ctl := &controlCommand{ db: r.db, chain: r.chain, client: r.client, @@ -121,7 +97,7 @@ func (r *Reactor) Start() error { feed: r.feed, safetyDepth: reorgSafetyDepth, } - r.group.Add(newBlocks.Command()) + r.group.Add(ctl.Command()) return nil } diff --git a/services/wallet/service.go b/services/wallet/service.go index 7082d75e75..17670884f7 100644 --- a/services/wallet/service.go +++ b/services/wallet/service.go @@ -76,7 +76,7 @@ func (s *Service) APIs() []rpc.API { { Namespace: "wallet", Version: "0.1.0", - Service: &API{s}, + Service: NewAPI(s), Public: true, }, } From c9ace41712391b5b4e73412fc227461844348168 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Mon, 10 Jun 2019 14:58:49 +0300 Subject: [PATCH 10/12] Remove ununes isSequence --- services/wallet/commands.go | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/services/wallet/commands.go b/services/wallet/commands.go index 0c770bed28..2484dc8541 100644 --- a/services/wallet/commands.go +++ b/services/wallet/commands.go @@ -414,21 +414,6 @@ func (c *controlCommand) Command() Command { }.Run } -func isSequence(headers []*DBHeader) bool { - if len(headers) == 0 { - return false - } - child := headers[0] - diff := big.NewInt(0) - for _, parent := range headers[1:] { - if diff.Sub(child.Number, parent.Number).Cmp(one) != 0 { - return false - } - child = parent - } - return true -} - func headersFromTransfers(transfers []Transfer) []*DBHeader { byHash := map[common.Hash]struct{}{} rst := []*DBHeader{} From 20c074a105ffb52219ac7736fe68949cc6f9d115 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Mon, 10 Jun 2019 16:46:16 +0300 Subject: [PATCH 11/12] Increase timeout in test: bootstrap procedure may take longer now --- t/devtests/tranfers_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/t/devtests/tranfers_test.go b/t/devtests/tranfers_test.go index ecbc68fa64..097d5271f0 100644 --- a/t/devtests/tranfers_test.go +++ b/t/devtests/tranfers_test.go @@ -77,7 +77,7 @@ func (s *TransfersSuite) TestNewTransfers() { return fmt.Errorf("waiting for one transfer") } return nil - }, 10*time.Second, 1*time.Second)) + }, 20*time.Second, 1*time.Second)) go func() { for i := 1; i < 10; i++ { From eaff713b2202339fea4d26778757e9a755dbc02c Mon Sep 17 00:00:00 2001 From: Dmitry Date: Tue, 11 Jun 2019 08:48:03 +0300 Subject: [PATCH 12/12] Remove unused code in database and adjust documentation --- services/wallet/README.md | 20 +---- services/wallet/commands.go | 41 +++++++--- services/wallet/database.go | 62 +------------- services/wallet/database_test.go | 80 ++++++------------- .../migrations/sql/0001_transfers.down.db.sql | 1 + 5 files changed, 58 insertions(+), 146 deletions(-) diff --git a/services/wallet/README.md b/services/wallet/README.md index 2ee0be2af9..811ef64c9a 100644 --- a/services/wallet/README.md +++ b/services/wallet/README.md @@ -159,7 +159,7 @@ Objects in the same format. Signals ------- -Three signals will are emitted: +Two signals can be emitted: 1. `newblock` signal @@ -197,21 +197,3 @@ Client expected to request new transfers from received block and replace transfe } } ``` - -3. `history` signal. - -Emmited if new transfer in old block was found. -Client expected to request transfers starting from this new block till the earliest known block. - -```json -{ - "type": "wallet", - "event": { - "type": "history", - "blockNumber": 0, - "accounts": [ - "0x42c8f505b4006d417dd4e0ba0e880692986adbd8" - ] - } -} -``` \ No newline at end of file diff --git a/services/wallet/commands.go b/services/wallet/commands.go index 2484dc8541..b6bd6dcfaf 100644 --- a/services/wallet/commands.go +++ b/services/wallet/commands.go @@ -60,7 +60,6 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) { } transfers := concurrent.Get() log.Info("eth historical downloader finished succesfully", "total transfers", len(transfers), "time", time.Since(start)) - // TODO(dshulyak) insert 0 block number with transfers err = c.db.ProcessTranfers(transfers, []common.Address{c.address}, headersFromTransfers(transfers), nil, ethSync) if err != nil { log.Error("failed to save downloaded erc20 transfers", "error", err) @@ -202,6 +201,10 @@ func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) { log.Error("failed to process new header", "header", latest, "error", err) return err } + if len(added) == 0 && len(removed) == 0 { + log.Debug("new block already in the database", "block", latest.Number) + return nil + } // for each added block get tranfers from downloaders all := []Transfer{} for i := range added { @@ -287,6 +290,10 @@ func (c *newBlocksTransfersCommand) getTransfers(parent context.Context, header return append(ethT, erc20T...), nil } +// controlCommand implements following procedure (following parts are executed sequeantially): +// - verifies that the last header that was synced is still in the canonical chain +// - runs fast indexing for each account separately +// - starts listening to new blocks and watches for reorgs type controlCommand struct { accounts []common.Address db *Database @@ -298,7 +305,8 @@ type controlCommand struct { safetyDepth *big.Int } -// run fast indexing for every accont managed by this command. +// run fast indexing for every accont up to canonical chain head minus safety depth. +// every account will run it from last synced header. func (c *controlCommand) fastIndex(ctx context.Context, to *DBHeader) error { start := time.Now() group := NewGroup(ctx) @@ -335,25 +343,30 @@ func (c *controlCommand) fastIndex(ctx context.Context, to *DBHeader) error { } } +// verifyLastSynced verifies that last header that was added to the database is still in the canonical chain. +// it is done by downloading configured number of parents for the last header in the db. func (c *controlCommand) verifyLastSynced(parent context.Context, last *DBHeader, head *types.Header) error { log.Debug("verifying that previous header is still in canonical chan", "from", last.Number, "chain head", head.Number) if new(big.Int).Sub(head.Number, last.Number).Cmp(c.safetyDepth) <= 0 { log.Debug("no need to verify. last block is close enough to chain head") return nil } - header, err := c.client.HeaderByNumber(parent, new(big.Int).Add(last.Number, c.safetyDepth)) + ctx, cancel := context.WithTimeout(parent, 3*time.Second) + header, err := c.client.HeaderByNumber(ctx, new(big.Int).Add(last.Number, c.safetyDepth)) + cancel() if err != nil { return err } log.Debug("spawn reorg verifier", "from", last.Number, "to", header.Number) + // TODO(dshulyak) make a standalone command that + // doesn't manage transfers and has an upper limit cmd := &newBlocksTransfersCommand{ - db: c.db, - chain: c.chain, - client: c.client, - accounts: c.accounts, - eth: c.eth, - erc20: c.erc20, - feed: c.feed, + db: c.db, + chain: c.chain, + client: c.client, + eth: c.eth, + erc20: c.erc20, + feed: c.feed, from: last, to: toDBHeader(header), @@ -363,7 +376,9 @@ func (c *controlCommand) verifyLastSynced(parent context.Context, last *DBHeader func (c *controlCommand) Run(parent context.Context) error { log.Debug("start control command") - head, err := c.client.HeaderByNumber(parent, nil) + ctx, cancel := context.WithTimeout(parent, 3*time.Second) + head, err := c.client.HeaderByNumber(ctx, nil) + cancel() if err != nil { return err } @@ -384,7 +399,9 @@ func (c *controlCommand) Run(parent context.Context) error { if target.Cmp(zero) <= 0 { target = zero } - head, err = c.client.HeaderByNumber(parent, target) + ctx, cancel = context.WithTimeout(parent, 3*time.Second) + head, err = c.client.HeaderByNumber(ctx, target) + cancel() if err != nil { return err } diff --git a/services/wallet/database.go b/services/wallet/database.go index cbfd699185..936d731599 100644 --- a/services/wallet/database.go +++ b/services/wallet/database.go @@ -232,44 +232,6 @@ func (db *Database) SaveSyncedHeader(address common.Address, header *types.Heade return err } -// LastHeader selects last header by block number. -func (db *Database) LastHeader() (header *DBHeader, err error) { - rows, err := db.db.Query("SELECT hash,number FROM blocks WHERE number = (SELECT MAX(number) FROM blocks)") - if err != nil { - return nil, err - } - defer rows.Close() - for rows.Next() { - header = &DBHeader{Hash: common.Hash{}, Number: new(big.Int)} - err = rows.Scan(&header.Hash, (*SQLBigInt)(header.Number)) - if err != nil { - return nil, err - } - if header != nil { - return header, nil - } - } - return nil, nil -} - -func (db *Database) LastHeaders(limit *big.Int) ([]*DBHeader, error) { - rows, err := db.db.Query("SELECT hash,number FROM blocks WHERE number ORDER BY number DESC LIMIT ?", (*SQLBigInt)(limit)) - if err != nil { - return nil, err - } - defer rows.Close() - headers := []*DBHeader{} - for rows.Next() { - header := &DBHeader{Hash: common.Hash{}, Number: new(big.Int)} - err = rows.Scan(&header.Hash, (*SQLBigInt)(header.Number)) - if err != nil { - return nil, err - } - headers = append(headers, header) - } - return headers, nil -} - // HeaderExists checks if header with hash exists in db. func (db *Database) HeaderExists(hash common.Hash) (bool, error) { var val sql.NullBool @@ -295,7 +257,7 @@ func (db *Database) GetHeaderByNumber(number *big.Int) (header *DBHeader, err er func (db *Database) GetLastHead() (header *DBHeader, err error) { header = &DBHeader{Hash: common.Hash{}, Number: new(big.Int)} - err = db.db.QueryRow("SELECT hash,number FROM blocks WHERE head = 1").Scan(&header.Hash, (*SQLBigInt)(header.Number)) + err = db.db.QueryRow("SELECT hash,number FROM blocks WHERE head = 1 AND number = (SELECT MAX(number) FROM blocks)").Scan(&header.Hash, (*SQLBigInt)(header.Number)) if err == nil { return header, nil } @@ -305,27 +267,7 @@ func (db *Database) GetLastHead() (header *DBHeader, err error) { return nil, err } -func (db *Database) GetEarliestSynced(address common.Address, option SyncOption) (header *DBHeader, err error) { - rows, err := db.db.Query(` -SELECT blocks.hash, blk_number FROM accounts_to_blocks JOIN blocks ON blk_number = blocks.number WHERE address = $1 AND blk_number -= (SELECT MIN(blk_number) FROM accounts_to_blocks WHERE address = $1 AND sync & $2 = $2)`, address, option) - if err != nil { - return nil, err - } - defer rows.Close() - for rows.Next() { - header = &DBHeader{Number: new(big.Int)} - err = rows.Scan(&header.Hash, (*SQLBigInt)(header.Number)) - if err != nil { - return nil, err - } - if header != nil { - return header, nil - } - } - return nil, nil -} - +// GetLatestSynced downloads last synced block with a given option. func (db *Database) GetLatestSynced(address common.Address, option SyncOption) (header *DBHeader, err error) { header = &DBHeader{Hash: common.Hash{}, Number: new(big.Int)} err = db.db.QueryRow(` diff --git a/services/wallet/database_test.go b/services/wallet/database_test.go index 9770846e4d..1197324299 100644 --- a/services/wallet/database_test.go +++ b/services/wallet/database_test.go @@ -4,7 +4,6 @@ import ( "io/ioutil" "math/big" "os" - "sort" "testing" "github.com/ethereum/go-ethereum/common" @@ -68,34 +67,6 @@ func TestDBHeaderDoesntExist(t *testing.T) { require.False(t, rst) } -func TestDBLastHeader(t *testing.T) { - db, stop := setupTestDB(t) - defer stop() - - template := types.Header{ - Difficulty: big.NewInt(1), - Time: big.NewInt(1), - } - first := template - first.Number = big.NewInt(10) - second := template - second.Number = big.NewInt(11) - require.NoError(t, db.SaveHeader(&second)) - require.NoError(t, db.SaveHeader(&first)) - - rst, err := db.LastHeader() - require.NoError(t, err) - require.Equal(t, second.Hash(), rst.Hash) -} - -func TestDBNoLastHeader(t *testing.T) { - db, stop := setupTestDB(t) - defer stop() - header, err := db.LastHeader() - require.NoError(t, err) - require.Nil(t, header) -} - func TestDBProcessTransfer(t *testing.T) { db, stop := setupTestDB(t) defer stop() @@ -180,7 +151,7 @@ func TestDBGetTransfersFromBlock(t *testing.T) { } -func TestDBEarliestSynced(t *testing.T) { +func TestDBLatestSynced(t *testing.T) { db, stop := setupTestDB(t) defer stop() @@ -200,20 +171,20 @@ func TestDBEarliestSynced(t *testing.T) { require.NoError(t, db.SaveSyncedHeader(address, h1, ethSync)) require.NoError(t, db.SaveSyncedHeader(address, h2, ethSync)) - earliest, err := db.GetEarliestSynced(address, ethSync) + latest, err := db.GetLatestSynced(address, ethSync) require.NoError(t, err) - require.NotNil(t, earliest) - require.Equal(t, h2.Number, earliest.Number) - require.Equal(t, h2.Hash(), earliest.Hash) + require.NotNil(t, latest) + require.Equal(t, h1.Number, latest.Number) + require.Equal(t, h1.Hash(), latest.Hash) } -func TestDBEarliestSyncedDoesntExist(t *testing.T) { +func TestDBLatestSyncedDoesntExist(t *testing.T) { db, stop := setupTestDB(t) defer stop() - earliest, err := db.GetEarliestSynced(common.Address{1}, ethSync) + latest, err := db.GetLatestSynced(common.Address{1}, ethSync) require.NoError(t, err) - require.Nil(t, earliest) + require.Nil(t, latest) } func TestDBProcessTransfersUpdate(t *testing.T) { @@ -235,31 +206,30 @@ func TestDBProcessTransfersUpdate(t *testing.T) { require.NoError(t, db.ProcessTranfers([]Transfer{transfer}, []common.Address{address}, []*DBHeader{header}, nil, ethSync)) require.NoError(t, db.ProcessTranfers([]Transfer{transfer}, []common.Address{address}, []*DBHeader{header}, nil, erc20Sync)) - earliest, err := db.GetEarliestSynced(address, ethSync|erc20Sync) + latest, err := db.GetLatestSynced(address, ethSync|erc20Sync) require.NoError(t, err) - require.Equal(t, header.Hash, earliest.Hash) + require.Equal(t, header.Hash, latest.Hash) } -func TestDBLastHeadersReverseSorted(t *testing.T) { +func TestDBLastHeadExist(t *testing.T) { db, stop := setupTestDB(t) defer stop() - headers := make([]*DBHeader, 10) - for i := range headers { - headers[i] = &DBHeader{Hash: common.Hash{byte(i)}, Number: big.NewInt(int64(i))} + headers := []*DBHeader{ + {Number: big.NewInt(1), Hash: common.Hash{1}, Head: true}, + {Number: big.NewInt(2), Hash: common.Hash{2}, Head: true}, + {Number: big.NewInt(3), Hash: common.Hash{3}, Head: true}, } - require.NoError(t, db.ProcessTranfers(nil, nil, headers, nil, ethSync)) - - headers, err := db.LastHeaders(big.NewInt(5)) + require.NoError(t, db.ProcessTranfers(nil, nil, headers, nil, 0)) + last, err := db.GetLastHead() require.NoError(t, err) - require.Len(t, headers, 5) + require.Equal(t, headers[2].Hash, last.Hash) +} - sorted := make([]*DBHeader, len(headers)) - copy(sorted, headers) - sort.Slice(sorted, func(i, j int) bool { - return sorted[i].Number.Cmp(sorted[j].Number) > 0 - }) - for i := range headers { - require.Equal(t, sorted[i], headers[i]) - } +func TestDBLastHeadDoesntExist(t *testing.T) { + db, stop := setupTestDB(t) + defer stop() + last, err := db.GetLastHead() + require.NoError(t, err) + require.Nil(t, last) } diff --git a/services/wallet/migrations/sql/0001_transfers.down.db.sql b/services/wallet/migrations/sql/0001_transfers.down.db.sql index 28d1120889..d82814a90f 100644 --- a/services/wallet/migrations/sql/0001_transfers.down.db.sql +++ b/services/wallet/migrations/sql/0001_transfers.down.db.sql @@ -1,2 +1,3 @@ DROP TABLE transfers; DROP TABLE blocks; +DROP TABLE accounts_to_blocks;