diff --git a/api/backend.go b/api/backend.go index 8ee9531423..4a65a90112 100644 --- a/api/backend.go +++ b/api/backend.go @@ -510,7 +510,7 @@ func (b *StatusBackend) reSelectAccount() error { default: return err } - return b.startWallet() + return nil } // SelectAccount selects current wallet and chat accounts, by verifying that each address has corresponding account which can be decrypted @@ -551,10 +551,10 @@ func (b *StatusBackend) SelectAccount(walletAddress, chatAddress, password strin return err } } - return b.startWallet() + return b.startWallet(password) } -func (b *StatusBackend) startWallet() error { +func (b *StatusBackend) startWallet(password string) error { if !b.statusNode.Config().WalletConfig.Enabled { return nil } @@ -567,9 +567,9 @@ func (b *StatusBackend) startWallet() error { return err } path := path.Join(b.statusNode.Config().DataDir, fmt.Sprintf("wallet-%x.sql", account.Address)) - return wallet.StartReactor(path, + return wallet.StartReactor(path, password, b.statusNode.RPCClient().Ethclient(), - account.Address, + []common.Address{account.Address}, new(big.Int).SetUint64(b.statusNode.Config().NetworkID)) } diff --git a/services/wallet/README.md b/services/wallet/README.md index b2582c7e65..811ef64c9a 100644 --- a/services/wallet/README.md +++ b/services/wallet/README.md @@ -34,6 +34,14 @@ Returns avaiable transfers in a given range. - `start`: `BIGINT` - start of the range - `end`: `BIGINT` - end of the range. if nil query will return all transfers from start. +##### Examples + +```json +{"jsonrpc":"2.0","id":14,"method":"wallet_getTransfers","params":[0,20]} +{"jsonrpc":"2.0","id":14,"method":"wallet_getTransfers","params":[0,null]} +{"jsonrpc":"2.0","id":13,"method":"wallet_getTransfers","params":[0]} +``` + ##### Returns List of objects like: @@ -42,55 +50,116 @@ List of objects like: ```json [ { - "Type": "eth", - "Header": { - "parentHash": "0xd2130443688b760cb6710a8550ffe68106238a3103bf6e62f0784c9cb3e18591", - "sha3Uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", - "miner": "0x0000000000000000000000000000000000000000", - "stateRoot": "0x420b2a421e99c28c3c07825c1ddac37e84cb55561b01c3702eb36a733499defe", - "transactionsRoot": "0x00a1aaef3eac0928cd2ecd7c00a4562e5db92cf4763db9840ba449904616a2c8", - "receiptsRoot": "0x056b23fbba480696b65fe5a59b8f2148a1299103c4f57df839233af2cf4ca2d2", - "logsBloom": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", - "difficulty": "0x2", - "number": "0x1", - "gasLimit": "0x5ff7a7", - "gasUsed": "0x5208", - "timestamp": "0x5cebc0ed", - "extraData": "0xd883010817846765746888676f312e31322e35856c696e75780000000000000064b57fc2cdf3d0318f02602a90160b749dd2813908a58d6428e695d438e749fc71f7528bee286fe8827db3bb3fccb1aa76593e81c4d9d5f1f7387f61f8b2854000", - "mixHash": "0x0000000000000000000000000000000000000000000000000000000000000000", - "nonce": "0x0000000000000000", - "hash": "0x576123ac3d561b618d2c9730e5c4432af8d18ed151caeac8b166757ea3b060c7" + "type": "erc20", + "address": "0x5dc6108dc6296b052bbd33000553afe0ea576b5e", + "blockNumber": 5687981, + "blockhash": "0xcc4553f125be0bc6cc974518368145fcf1344f41e5de238205db0a1c185ea2fc", + "transaction": { + "nonce": "0x57", + "gasPrice": "0x3b9aca00", + "gas": "0x44ba8", + "to": "0xc55cf4b03948d7ebc8b9e8bad92643703811d162", + "value": "0x0", + "input": "0xcae9ca5100000000000000000000000039d16cdb56b5a6a89e1a397a13fe48034694316e0000000000000000000000000000000000000000000000015af1d78b58c40000000000000000000000000000000000000000000000000000000000000000006000000000000000000000000000000000000000000000000000000000000000449134709e00000000000000000000000000000000000000000000000000000000000000010000000000000000000000005dc6108dc6296b052bbd33000553afe0ea576b5e00000000000000000000000000000000000000000000000000000000", + "v": "0x29", + "r": "0x124587e9c1d16d8bd02fda1221aefbfca8e2f4cd6300ed2077ebf736789179ab", + "s": "0x4309fddc1226dacb877488221a439c4f97d77dc2c3f5c8ea51f34f42417d3bda", + "hash": "0x259dd45c9c4d52137f32b7787e6e1fb6c9faf70ba40b8137bf66ba03abc0da00" }, - "Transaction": { - "nonce": "0x0", - "gasPrice": "0xa", - "gas": "0xf4240", - "to": "0x72058a9d5a8194078ed372b34fa1fb8b8e5b7720", - "value": "0xde0b6b3a7640000", - "input": "0x", - "v": "0xa95", - "r": "0xd171c582139765f44fa1401edcdd377ca1b350ee10c0685073a5d470fc3625c6", - "s": "0x3392d24da2a13449345fbe8210d5af89f01ee21b456dc3c226e7cd7a0509fed", - "hash": "0x56be9c3cf40243e62428a203e37ce2104cfa12bef7d9e47988fd769617d361c6" - }, - "Receipt": { + "receipt": { "root": "0x", "status": "0x1", - "cumulativeGasUsed": "0x5208", - "logsBloom": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", - "logs": [], - "transactionHash": "0x56be9c3cf40243e62428a203e37ce2104cfa12bef7d9e47988fd769617d361c6", + "cumulativeGasUsed": "0x389e1e", + "logsBloom": "0x00000000000000000000000000000000000000000000000000000000000200000000020000000000000000000000000000000000004000000000000000200000000000000020000000000008000000000000000000000000000000000000000000000000020000000000002000000800000000100000000000000010000000000000000000400000000000000001000000000040000000400000000400000000020000000000000008000000000020000000010000000002000000000000020000000002000000000000000000000000000000000200000000000000000020000010000000000000000000000400000000000000000000000000000000000000", + "logs": [ + { + "address": "0xc55cf4b03948d7ebc8b9e8bad92643703811d162", + "topics": [ + "0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925", + "0x0000000000000000000000005dc6108dc6296b052bbd33000553afe0ea576b5e", + "0x00000000000000000000000039d16cdb56b5a6a89e1a397a13fe48034694316e" + ], + "data": "0x0000000000000000000000000000000000000000000000015af1d78b58c40000", + "blockNumber": "0x56caad", + "transactionHash": "0x259dd45c9c4d52137f32b7787e6e1fb6c9faf70ba40b8137bf66ba03abc0da00", + "transactionIndex": "0x10", + "blockHash": "0xcc4553f125be0bc6cc974518368145fcf1344f41e5de238205db0a1c185ea2fc", + "logIndex": "0xd", + "removed": false + }, + { + "address": "0xc55cf4b03948d7ebc8b9e8bad92643703811d162", + "topics": [ + "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", + "0x0000000000000000000000005dc6108dc6296b052bbd33000553afe0ea576b5e", + "0x000000000000000000000000ee55b1661fd24c4760d92026cedb252a5a0f2a4e" + ], + "data": "0x0000000000000000000000000000000000000000000000015af1d78b58c40000", + "blockNumber": "0x56caad", + "transactionHash": "0x259dd45c9c4d52137f32b7787e6e1fb6c9faf70ba40b8137bf66ba03abc0da00", + "transactionIndex": "0x10", + "blockHash": "0xcc4553f125be0bc6cc974518368145fcf1344f41e5de238205db0a1c185ea2fc", + "logIndex": "0xe", + "removed": false + }, + { + "address": "0x39d16cdb56b5a6a89e1a397a13fe48034694316e", + "topics": [ + "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", + "0x0000000000000000000000000000000000000000000000000000000000000000", + "0x0000000000000000000000005dc6108dc6296b052bbd33000553afe0ea576b5e", + "0x0000000000000000000000000000000000000000000000000000000000000044" + ], + "data": "0x", + "blockNumber": "0x56caad", + "transactionHash": "0x259dd45c9c4d52137f32b7787e6e1fb6c9faf70ba40b8137bf66ba03abc0da00", + "transactionIndex": "0x10", + "blockHash": "0xcc4553f125be0bc6cc974518368145fcf1344f41e5de238205db0a1c185ea2fc", + "logIndex": "0xf", + "removed": false + } + ], + "transactionHash": "0x259dd45c9c4d52137f32b7787e6e1fb6c9faf70ba40b8137bf66ba03abc0da00", "contractAddress": "0x0000000000000000000000000000000000000000", - "gasUsed": "0x5208" + "gasUsed": "0x34f42" } } ] ``` +##### Examples + +```json +{"jsonrpc":"2.0","id":14,"method":"wallet_getTransfers","params":[0,20]} +{"jsonrpc":"2.0","id":14,"method":"wallet_getTransfers","params":[0,null]} +{"jsonrpc":"2.0","id":13,"method":"wallet_getTransfers","params":[0]} +``` + +#### wallet_getTransfersByAddress + +Returns avaiable transfers in a given range. + +##### Parameters + +- `address`: `HEX` - ethereum address encoded in hex +- `start`: `BIGINT` - start of the range +- `end`: `BIGINT` - end of the range. if nil query will return all transfers from start. + +##### Examples + +```json +{"jsonrpc":"2.0","id":7,"method":"wallet_getTransfersByAddress","params":["0xb81a6845649fa8c042dfaceb3f7a684873406993","0x0"]} +``` + +##### Returns + +Objects in the same format. + + Signals ------- -Two signals will are emitted: +Two signals can be emitted: 1. `newblock` signal @@ -101,8 +170,12 @@ Client expected to request transfers starting from received block. { "type": "wallet", "event": { - "Type": "newblock", - "BlockNumber": 10 + "type": "newblock", + "blockNumber": 0, + "accounts": [ + "0x42c8f505b4006d417dd4e0ba0e880692986adbd8", + "0x3129mdasmeo132128391fml1130410k312312mll" + ] } } ``` @@ -116,8 +189,11 @@ Client expected to request new transfers from received block and replace transfe { "type": "wallet", "event": { - "Type": "reorg", - "BlockNumber": 10 + "type": "reorg", + "blockNumber": 0, + "accounts": [ + "0x42c8f505b4006d417dd4e0ba0e880692986adbd8" + ] } } -``` \ No newline at end of file +``` diff --git a/services/wallet/api.go b/services/wallet/api.go index 078551c342..535c52cdda 100644 --- a/services/wallet/api.go +++ b/services/wallet/api.go @@ -5,9 +5,15 @@ import ( "errors" "math/big" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/log" ) +func NewAPI(s *Service) *API { + return &API{s} +} + // API is class with methods available over RPC. type API struct { s *Service @@ -16,15 +22,35 @@ type API struct { // GetTransfers returns transfers in range of blocks. If `end` is nil all transfers from `start` will be returned. // TODO(dshulyak) benchmark loading many transfers from database. We can avoid json unmarshal/marshal if we will // read header, tx and receipt as a raw json. -func (api *API) GetTransfers(ctx context.Context, start, end *big.Int) ([]Transfer, error) { +func (api *API) GetTransfers(ctx context.Context, start, end *hexutil.Big) ([]Transfer, error) { log.Debug("call to get transfers", "start", start, "end", end) + if start == nil { + return nil, errors.New("start of the query must be provided. use 0 if you want to load all transfers") + } if api.s.db == nil { return nil, errors.New("wallet service is not initialized") } - rst, err := api.s.db.GetTransfers(start, end) + rst, err := api.s.db.GetTransfers((*big.Int)(start), (*big.Int)(end)) if err != nil { return nil, err } log.Debug("result from database for transfers", "start", start, "end", end, "len", len(rst)) return rst, nil } + +// GetTransfersByAddress returns transfers for a single address between two blocks. +func (api *API) GetTransfersByAddress(ctx context.Context, address common.Address, start, end *hexutil.Big) ([]Transfer, error) { + log.Debug("call to get transfers for an address", "address", address, "start", start, "end", end) + if start == nil { + return nil, errors.New("start of the query must be provided. use 0 if you want to load all transfers") + } + if api.s.db == nil { + return nil, errors.New("wallet service is not initialized") + } + rst, err := api.s.db.GetTransfersByAddress(address, (*big.Int)(start), (*big.Int)(end)) + if err != nil { + return nil, err + } + log.Debug("result from database for address", "address", address, "start", start, "end", end, "len", len(rst)) + return rst, nil +} diff --git a/services/wallet/async.go b/services/wallet/async.go new file mode 100644 index 0000000000..253e1def0a --- /dev/null +++ b/services/wallet/async.go @@ -0,0 +1,152 @@ +package wallet + +import ( + "context" + "sync" + "time" +) + +type Command func(context.Context) error + +// FiniteCommand terminates when error is nil. +type FiniteCommand struct { + Interval time.Duration + Runable func(context.Context) error +} + +func (c FiniteCommand) Run(ctx context.Context) error { + ticker := time.NewTicker(c.Interval) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + err := c.Runable(ctx) + if err == nil { + return nil + } + } + } +} + +// InfiniteCommand runs until context is closed. +type InfiniteCommand struct { + Interval time.Duration + Runable func(context.Context) error +} + +func (c InfiniteCommand) Run(ctx context.Context) error { + ticker := time.NewTicker(c.Interval) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + _ = c.Runable(ctx) + } + } +} + +func NewGroup(parent context.Context) *Group { + ctx, cancel := context.WithCancel(parent) + return &Group{ + ctx: ctx, + cancel: cancel, + } +} + +type Group struct { + ctx context.Context + cancel func() + wg sync.WaitGroup +} + +func (g *Group) Add(cmd Command) { + g.wg.Add(1) + go func() { + _ = cmd(g.ctx) + g.wg.Done() + }() +} + +func (g *Group) Stop() { + g.cancel() +} + +func (g *Group) Wait() { + g.wg.Wait() +} + +func (g *Group) WaitAsync() <-chan struct{} { + ch := make(chan struct{}) + go func() { + g.Wait() + close(ch) + }() + return ch +} + +func NewAtomicGroup(parent context.Context) *AtomicGroup { + ctx, cancel := context.WithCancel(parent) + return &AtomicGroup{ctx: ctx, cancel: cancel} +} + +// AtomicGroup terminates as soon as first goroutine terminates.. +type AtomicGroup struct { + ctx context.Context + cancel func() + wg sync.WaitGroup + + mu sync.Mutex + error error +} + +// Go spawns function in a goroutine and stores results or errors. +func (d *AtomicGroup) Add(cmd Command) { + d.wg.Add(1) + go func() { + defer d.wg.Done() + err := cmd(d.ctx) + d.mu.Lock() + defer d.mu.Unlock() + if err != nil { + // do not overwrite original error by context errors + if d.error != nil { + return + } + d.error = err + d.cancel() + return + } + }() +} + +// Wait for all downloaders to finish. +func (d *AtomicGroup) Wait() { + d.wg.Wait() + if d.Error() == nil { + d.mu.Lock() + defer d.mu.Unlock() + d.cancel() + } +} + +func (d *AtomicGroup) WaitAsync() <-chan struct{} { + ch := make(chan struct{}) + go func() { + d.Wait() + close(ch) + }() + return ch +} + +// Error stores an error that was reported by any of the downloader. Should be called after Wait. +func (d *AtomicGroup) Error() error { + d.mu.Lock() + defer d.mu.Unlock() + return d.error +} + +func (d *AtomicGroup) Stop() { + d.cancel() +} diff --git a/services/wallet/commands.go b/services/wallet/commands.go new file mode 100644 index 0000000000..b6bd6dcfaf --- /dev/null +++ b/services/wallet/commands.go @@ -0,0 +1,462 @@ +package wallet + +import ( + "context" + "errors" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" +) + +type ethHistoricalCommand struct { + db *Database + eth TransferDownloader + address common.Address + client reactorClient + feed *event.Feed + + from, to *big.Int +} + +func (c *ethHistoricalCommand) Command() Command { + return FiniteCommand{ + Interval: 5 * time.Second, + Runable: c.Run, + }.Run +} + +func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) { + if c.from == nil { + from, err := c.db.GetLatestSynced(c.address, ethSync) + if err != nil { + return err + } + if from == nil { + c.from = zero + } else { + c.from = from.Number + } + log.Debug("initialized downloader for eth historical transfers", "address", c.address, "starting at", c.from, "up to", c.to) + } + ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() + concurrent := NewConcurrentDownloader(ctx) + start := time.Now() + downloadEthConcurrently(concurrent, c.client, c.eth, c.address, c.from, c.to) + select { + case <-concurrent.WaitAsync(): + case <-ctx.Done(): + log.Error("eth downloader is stuck") + return errors.New("eth downloader is stuck") + } + if concurrent.Error() != nil { + log.Error("failed to dowload transfers using concurrent downloader", "error", err) + return concurrent.Error() + } + transfers := concurrent.Get() + log.Info("eth historical downloader finished succesfully", "total transfers", len(transfers), "time", time.Since(start)) + err = c.db.ProcessTranfers(transfers, []common.Address{c.address}, headersFromTransfers(transfers), nil, ethSync) + if err != nil { + log.Error("failed to save downloaded erc20 transfers", "error", err) + return err + } + if len(transfers) > 0 { + // we download all or nothing + c.feed.Send(Event{ + Type: EventNewBlock, + BlockNumber: c.from, + Accounts: []common.Address{c.address}, + }) + } + log.Debug("eth transfers were persisted. command is closed") + return nil +} + +type erc20HistoricalCommand struct { + db *Database + erc20 BatchDownloader + address common.Address + client reactorClient + feed *event.Feed + + iterator *IterativeDownloader + to *DBHeader +} + +func (c *erc20HistoricalCommand) Command() Command { + return FiniteCommand{ + Interval: 5 * time.Second, + Runable: c.Run, + }.Run +} + +func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) { + if c.iterator == nil { + c.iterator, err = SetupIterativeDownloader( + c.db, c.client, c.address, erc20Sync, + c.erc20, erc20BatchSize, c.to) + if err != nil { + log.Error("failed to setup historical downloader for erc20") + return err + } + } + for !c.iterator.Finished() { + start := time.Now() + transfers, err := c.iterator.Next(ctx) + if err != nil { + log.Error("failed to get next batch", "error", err) + break + } + headers := headersFromTransfers(transfers) + headers = append(headers, c.iterator.Header()) + err = c.db.ProcessTranfers(transfers, []common.Address{c.address}, headers, nil, erc20Sync) + if err != nil { + c.iterator.Revert() + log.Error("failed to save downloaded erc20 transfers", "error", err) + return err + } + if len(transfers) > 0 { + log.Debug("erc20 downloader imported transfers", "len", len(transfers), "time", time.Since(start)) + c.feed.Send(Event{ + Type: EventNewBlock, + BlockNumber: c.iterator.Header().Number, + Accounts: []common.Address{c.address}, + }) + } + } + log.Info("wallet historical downloader for erc20 transfers finished") + return nil +} + +type newBlocksTransfersCommand struct { + db *Database + accounts []common.Address + chain *big.Int + erc20 *ERC20TransfersDownloader + eth *ETHTransferDownloader + client reactorClient + feed *event.Feed + + from, to *DBHeader +} + +func (c *newBlocksTransfersCommand) Command() Command { + // if both blocks are specified we will use this command to verify that lastly synced blocks are still + // in canonical chain + if c.to != nil && c.from != nil { + return FiniteCommand{ + Interval: 5 * time.Second, + Runable: c.Verify, + }.Run + } + return InfiniteCommand{ + Interval: pollingPeriodByChain(c.chain), + Runable: c.Run, + }.Run +} + +func (c *newBlocksTransfersCommand) Verify(parent context.Context) (err error) { + if c.to == nil || c.from == nil { + return errors.New("`from` and `to` blocks must be specified") + } + for c.from.Number.Cmp(c.to.Number) != 0 { + err = c.Run(parent) + if err != nil { + return err + } + } + return nil +} + +func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) { + if c.from == nil { + ctx, cancel := context.WithTimeout(parent, 3*time.Second) + from, err := c.client.HeaderByNumber(ctx, nil) + cancel() + if err != nil { + log.Error("failed to get last known header", "error", err) + return err + } + c.from = toDBHeader(from) + log.Debug("initialized downloader for new blocks transfers", "starting at", c.from.Number) + } + num := new(big.Int).Add(c.from.Number, one) + ctx, cancel := context.WithTimeout(parent, 5*time.Second) + latest, err := c.client.HeaderByNumber(ctx, num) + cancel() + if err != nil { + log.Warn("failed to get latest block", "number", num, "error", err) + return err + } + log.Debug("reactor received new block", "header", latest.Hash()) + ctx, cancel = context.WithTimeout(parent, 10*time.Second) + added, removed, err := c.onNewBlock(ctx, c.from, latest) + cancel() + if err != nil { + log.Error("failed to process new header", "header", latest, "error", err) + return err + } + if len(added) == 0 && len(removed) == 0 { + log.Debug("new block already in the database", "block", latest.Number) + return nil + } + // for each added block get tranfers from downloaders + all := []Transfer{} + for i := range added { + log.Debug("reactor get transfers", "block", added[i].Hash, "number", added[i].Number) + transfers, err := c.getTransfers(parent, added[i]) + if err != nil { + log.Error("failed to get transfers", "header", added[i].Hash, "error", err) + continue + } + log.Debug("reactor adding transfers", "block", added[i].Hash, "number", added[i].Number, "len", len(transfers)) + all = append(all, transfers...) + } + err = c.db.ProcessTranfers(all, c.accounts, added, removed, erc20Sync|ethSync) + if err != nil { + log.Error("failed to persist transfers", "error", err) + return err + } + c.from = toDBHeader(latest) + if len(added) == 1 && len(removed) == 0 { + c.feed.Send(Event{ + Type: EventNewBlock, + BlockNumber: added[0].Number, + Accounts: uniqueAccountsFromTransfers(all), + }) + } + if len(removed) != 0 { + lth := len(removed) + c.feed.Send(Event{ + Type: EventReorg, + BlockNumber: removed[lth-1].Number, + Accounts: uniqueAccountsFromTransfers(all), + }) + } + return nil +} + +func (c *newBlocksTransfersCommand) onNewBlock(ctx context.Context, from *DBHeader, latest *types.Header) (added, removed []*DBHeader, err error) { + if from == nil { + // first node in the cache + return []*DBHeader{toHead(latest)}, nil, nil + } + if from.Hash == latest.ParentHash { + // parent matching from node in the cache. on the same chain. + return []*DBHeader{toHead(latest)}, nil, nil + } + exists, err := c.db.HeaderExists(latest.Hash()) + if err != nil { + return nil, nil, err + } + if exists { + return nil, nil, nil + } + log.Debug("wallet reactor spotted reorg", "last header in db", from.Hash, "new parent", latest.ParentHash) + for from != nil && from.Hash != latest.ParentHash { + removed = append(removed, from) + added = append(added, toHead(latest)) + latest, err = c.client.HeaderByHash(ctx, latest.ParentHash) + if err != nil { + return nil, nil, err + } + from, err = c.db.GetHeaderByNumber(new(big.Int).Sub(latest.Number, one)) + if err != nil { + return nil, nil, err + } + } + added = append(added, toHead(latest)) + return added, removed, nil +} + +func (c *newBlocksTransfersCommand) getTransfers(parent context.Context, header *DBHeader) ([]Transfer, error) { + ctx, cancel := context.WithTimeout(parent, 5*time.Second) + ethT, err := c.eth.GetTransfers(ctx, header) + cancel() + if err != nil { + return nil, err + } + ctx, cancel = context.WithTimeout(parent, 5*time.Second) + erc20T, err := c.erc20.GetTransfers(ctx, header) + cancel() + if err != nil { + return nil, err + } + return append(ethT, erc20T...), nil +} + +// controlCommand implements following procedure (following parts are executed sequeantially): +// - verifies that the last header that was synced is still in the canonical chain +// - runs fast indexing for each account separately +// - starts listening to new blocks and watches for reorgs +type controlCommand struct { + accounts []common.Address + db *Database + eth *ETHTransferDownloader + erc20 *ERC20TransfersDownloader + chain *big.Int + client *ethclient.Client + feed *event.Feed + safetyDepth *big.Int +} + +// run fast indexing for every accont up to canonical chain head minus safety depth. +// every account will run it from last synced header. +func (c *controlCommand) fastIndex(ctx context.Context, to *DBHeader) error { + start := time.Now() + group := NewGroup(ctx) + for _, address := range c.accounts { + erc20 := &erc20HistoricalCommand{ + db: c.db, + erc20: NewERC20TransfersDownloader(c.client, []common.Address{address}), + client: c.client, + feed: c.feed, + address: address, + to: to, + } + group.Add(erc20.Command()) + eth := ðHistoricalCommand{ + db: c.db, + client: c.client, + address: address, + eth: ÐTransferDownloader{ + client: c.client, + accounts: []common.Address{address}, + signer: types.NewEIP155Signer(c.chain), + }, + feed: c.feed, + to: to.Number, + } + group.Add(eth.Command()) + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-group.WaitAsync(): + log.Debug("fast indexer finished", "in", time.Since(start)) + return nil + } +} + +// verifyLastSynced verifies that last header that was added to the database is still in the canonical chain. +// it is done by downloading configured number of parents for the last header in the db. +func (c *controlCommand) verifyLastSynced(parent context.Context, last *DBHeader, head *types.Header) error { + log.Debug("verifying that previous header is still in canonical chan", "from", last.Number, "chain head", head.Number) + if new(big.Int).Sub(head.Number, last.Number).Cmp(c.safetyDepth) <= 0 { + log.Debug("no need to verify. last block is close enough to chain head") + return nil + } + ctx, cancel := context.WithTimeout(parent, 3*time.Second) + header, err := c.client.HeaderByNumber(ctx, new(big.Int).Add(last.Number, c.safetyDepth)) + cancel() + if err != nil { + return err + } + log.Debug("spawn reorg verifier", "from", last.Number, "to", header.Number) + // TODO(dshulyak) make a standalone command that + // doesn't manage transfers and has an upper limit + cmd := &newBlocksTransfersCommand{ + db: c.db, + chain: c.chain, + client: c.client, + eth: c.eth, + erc20: c.erc20, + feed: c.feed, + + from: last, + to: toDBHeader(header), + } + return cmd.Command()(parent) +} + +func (c *controlCommand) Run(parent context.Context) error { + log.Debug("start control command") + ctx, cancel := context.WithTimeout(parent, 3*time.Second) + head, err := c.client.HeaderByNumber(ctx, nil) + cancel() + if err != nil { + return err + } + log.Debug("current head is", "block number", head.Number) + last, err := c.db.GetLastHead() + if err != nil { + log.Error("failed to load last head from database", "error", err) + return err + } + if last != nil { + err = c.verifyLastSynced(parent, last, head) + if err != nil { + log.Error("failed verification for last header in canonical chain", "error", err) + return err + } + } + target := new(big.Int).Sub(head.Number, c.safetyDepth) + if target.Cmp(zero) <= 0 { + target = zero + } + ctx, cancel = context.WithTimeout(parent, 3*time.Second) + head, err = c.client.HeaderByNumber(ctx, target) + cancel() + if err != nil { + return err + } + log.Debug("run fast indexing for the transfers", "up to", head.Number) + err = c.fastIndex(parent, toDBHeader(head)) + if err != nil { + return err + } + log.Debug("watching new blocks", "start from", head.Number) + cmd := &newBlocksTransfersCommand{ + db: c.db, + chain: c.chain, + client: c.client, + accounts: c.accounts, + eth: c.eth, + erc20: c.erc20, + feed: c.feed, + from: toDBHeader(head), + } + return cmd.Command()(parent) +} + +func (c *controlCommand) Command() Command { + return FiniteCommand{ + Interval: 5 * time.Second, + Runable: c.Run, + }.Run +} + +func headersFromTransfers(transfers []Transfer) []*DBHeader { + byHash := map[common.Hash]struct{}{} + rst := []*DBHeader{} + for i := range transfers { + _, exists := byHash[transfers[i].BlockHash] + if exists { + continue + } + rst = append(rst, &DBHeader{ + Hash: transfers[i].BlockHash, + Number: transfers[i].BlockNumber, + }) + } + return rst +} + +func uniqueAccountsFromTransfers(transfers []Transfer) []common.Address { + accounts := []common.Address{} + unique := map[common.Address]struct{}{} + for i := range transfers { + _, exist := unique[transfers[i].Address] + if exist { + continue + } + unique[transfers[i].Address] = struct{}{} + accounts = append(accounts, transfers[i].Address) + } + return accounts +} diff --git a/services/wallet/commands_test.go b/services/wallet/commands_test.go new file mode 100644 index 0000000000..46f0af01c3 --- /dev/null +++ b/services/wallet/commands_test.go @@ -0,0 +1,231 @@ +package wallet + +import ( + "context" + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/event" + "github.com/status-im/status-go/t/devtests/testchain" + "github.com/stretchr/testify/suite" +) + +func TestNewBlocksSuite(t *testing.T) { + suite.Run(t, new(NewBlocksSuite)) +} + +type NewBlocksSuite struct { + suite.Suite + backend *testchain.Backend + cmd *newBlocksTransfersCommand + address common.Address + db *Database + dbStop func() + feed *event.Feed +} + +func (s *NewBlocksSuite) SetupTest() { + var err error + db, stop := setupTestDB(s.Suite.T()) + s.db = db + s.dbStop = stop + s.backend, err = testchain.NewBackend() + s.Require().NoError(err) + account, err := crypto.GenerateKey() + s.Require().NoError(err) + s.address = crypto.PubkeyToAddress(account.PublicKey) + s.feed = &event.Feed{} + s.cmd = &newBlocksTransfersCommand{ + db: s.db, + accounts: []common.Address{s.address}, + erc20: NewERC20TransfersDownloader(s.backend.Client, []common.Address{s.address}), + eth: ÐTransferDownloader{ + client: s.backend.Client, + signer: s.backend.Signer, + accounts: []common.Address{s.address}, + }, + feed: s.feed, + client: s.backend.Client, + } +} + +func (s *NewBlocksSuite) TearDownTest() { + s.dbStop() + s.Require().NoError(s.backend.Stop()) +} + +func (s *NewBlocksSuite) TestOneBlock() { + ctx := context.Background() + s.Require().EqualError(s.cmd.Run(ctx), "not found") + tx := types.NewTransaction(0, s.address, big.NewInt(1e17), 21000, big.NewInt(1), nil) + tx, err := types.SignTx(tx, s.backend.Signer, s.backend.Faucet) + s.Require().NoError(err) + blocks := s.backend.GenerateBlocks(1, 0, func(n int, gen *core.BlockGen) { + gen.AddTx(tx) + }) + n, err := s.backend.Ethereum.BlockChain().InsertChain(blocks) + s.Require().Equal(1, n) + s.Require().NoError(err) + + events := make(chan Event, 1) + sub := s.feed.Subscribe(events) + defer sub.Unsubscribe() + + s.Require().NoError(s.cmd.Run(ctx)) + + select { + case ev := <-events: + s.Require().Equal(ev.Type, EventNewBlock) + s.Require().Equal(ev.BlockNumber, big.NewInt(1)) + default: + s.Require().FailNow("event wasn't emitted") + } + transfers, err := s.db.GetTransfers(big.NewInt(0), nil) + s.Require().NoError(err) + s.Require().Len(transfers, 1) + s.Require().Equal(tx.Hash(), transfers[0].ID) +} + +func (s *NewBlocksSuite) genTx(nonce int) *types.Transaction { + tx := types.NewTransaction(uint64(nonce), s.address, big.NewInt(1e10), 21000, big.NewInt(1), nil) + tx, err := types.SignTx(tx, s.backend.Signer, s.backend.Faucet) + s.Require().NoError(err) + return tx +} + +func (s *NewBlocksSuite) runCmdUntilError(ctx context.Context) (err error) { + for err == nil { + err = s.cmd.Run(ctx) + } + return err +} + +func (s *NewBlocksSuite) TestReorg() { + blocks := s.backend.GenerateBlocks(20, 0, nil) + n, err := s.backend.Ethereum.BlockChain().InsertChain(blocks) + s.Require().Equal(20, n) + s.Require().NoError(err) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + s.Require().EqualError(s.runCmdUntilError(ctx), "not found") + + blocks = s.backend.GenerateBlocks(3, 20, func(n int, gen *core.BlockGen) { + gen.AddTx(s.genTx(n)) + }) + n, err = s.backend.Ethereum.BlockChain().InsertChain(blocks) + s.Require().Equal(3, n) + s.Require().NoError(err) + + // `not found` returned when we query head+1 block + ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + s.cmd.from = toDBHeader(s.backend.Ethereum.BlockChain().GetHeaderByNumber(15)) + s.Require().EqualError(s.runCmdUntilError(ctx), "not found") + + transfers, err := s.db.GetTransfers(big.NewInt(0), nil) + s.Require().NoError(err) + s.Require().Len(transfers, 3) + + blocks = s.backend.GenerateBlocks(10, 15, func(n int, gen *core.BlockGen) { + gen.AddTx(s.genTx(n)) + }) + n, err = s.backend.Ethereum.BlockChain().InsertChain(blocks) + s.Require().Equal(10, n) + s.Require().NoError(err) + + // it will be less but even if something went wrong we can't get more + events := make(chan Event, 10) + sub := s.feed.Subscribe(events) + defer sub.Unsubscribe() + + ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + s.Require().EqualError(s.runCmdUntilError(ctx), "not found") + + close(events) + expected := []Event{{Type: EventReorg, BlockNumber: big.NewInt(16)}, {Type: EventNewBlock, BlockNumber: big.NewInt(25)}} + i := 0 + for ev := range events { + s.Require().Equal(expected[i].Type, ev.Type) + s.Require().Equal(expected[i].BlockNumber, ev.BlockNumber) + i++ + } + + transfers, err = s.db.GetTransfers(big.NewInt(0), nil) + s.Require().NoError(err) + s.Require().Len(transfers, 10) +} + +func (s *NewBlocksSuite) downloadHistorical() { + blocks := s.backend.GenerateBlocks(40, 0, func(n int, gen *core.BlockGen) { + if n == 36 { + gen.AddTx(s.genTx(0)) + } else if n == 39 { + gen.AddTx(s.genTx(1)) + } + }) + n, err := s.backend.Ethereum.BlockChain().InsertChain(blocks) + s.Require().Equal(40, n) + s.Require().NoError(err) + + eth := ðHistoricalCommand{ + db: s.db, + eth: ÐTransferDownloader{ + client: s.backend.Client, + signer: s.backend.Signer, + accounts: []common.Address{s.address}, + }, + feed: s.feed, + address: s.address, + client: s.backend.Client, + to: s.backend.Ethereum.BlockChain().CurrentBlock().Number(), + } + s.Require().NoError(eth.Run(context.Background()), "eth historical command failed to sync transfers") + transfers, err := s.db.GetTransfers(big.NewInt(0), nil) + s.Require().NoError(err) + s.Require().Len(transfers, 2) +} + +func (s *NewBlocksSuite) reorgHistorical() { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + s.Require().EqualError(s.runCmdUntilError(ctx), "not found") + + blocks := s.backend.GenerateBlocks(10, 35, nil) + n, err := s.backend.Ethereum.BlockChain().InsertChain(blocks) + s.Require().Equal(10, n) + s.Require().NoError(err) + + ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + s.Require().EqualError(s.runCmdUntilError(ctx), "not found") + +} + +func (s *NewBlocksSuite) TestSafetyBufferFailure() { + s.downloadHistorical() + + s.reorgHistorical() + + transfers, err := s.db.GetTransfers(big.NewInt(0), nil) + s.Require().NoError(err) + s.Require().Len(transfers, 1) +} + +func (s *NewBlocksSuite) TestSafetyBufferSuccess() { + s.downloadHistorical() + + safety := new(big.Int).Sub(s.backend.Ethereum.BlockChain().CurrentHeader().Number, big.NewInt(10)) + s.cmd.from = toDBHeader(s.backend.Ethereum.BlockChain().GetHeaderByNumber(safety.Uint64())) + s.reorgHistorical() + + transfers, err := s.db.GetTransfers(big.NewInt(0), nil) + s.Require().NoError(err) + s.Require().Len(transfers, 0) +} diff --git a/services/wallet/concurrent.go b/services/wallet/concurrent.go new file mode 100644 index 0000000000..5472832e89 --- /dev/null +++ b/services/wallet/concurrent.go @@ -0,0 +1,78 @@ +package wallet + +import ( + "context" + "math/big" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" +) + +// NewConcurrentDownloader creates ConcurrentDownloader instance. +func NewConcurrentDownloader(ctx context.Context) *ConcurrentDownloader { + runner := NewAtomicGroup(ctx) + result := &Result{} + return &ConcurrentDownloader{runner, result} +} + +type ConcurrentDownloader struct { + *AtomicGroup + *Result +} + +type Result struct { + mu sync.Mutex + transfers []Transfer +} + +func (r *Result) Push(transfers ...Transfer) { + r.mu.Lock() + defer r.mu.Unlock() + r.transfers = append(r.transfers, transfers...) +} + +func (r *Result) Get() []Transfer { + r.mu.Lock() + defer r.mu.Unlock() + rst := make([]Transfer, len(r.transfers)) + copy(rst, r.transfers) + return rst +} + +// TransferDownloader downloads transfers from single block using number. +type TransferDownloader interface { + GetTransfersByNumber(context.Context, *big.Int) ([]Transfer, error) +} + +func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, downloader TransferDownloader, account common.Address, low, high *big.Int) { + c.Add(func(ctx context.Context) error { + log.Debug("eth transfers comparing blocks", "low", low, "high", high) + lb, err := client.BalanceAt(ctx, account, low) + if err != nil { + return err + } + hb, err := client.BalanceAt(ctx, account, high) + if err != nil { + return err + } + if lb.Cmp(hb) == 0 { + log.Debug("balances are equal", "low", low, "high", high) + return nil + } + if new(big.Int).Sub(high, low).Cmp(one) == 0 { + transfers, err := downloader.GetTransfersByNumber(ctx, high) + if err != nil { + return err + } + c.Push(transfers...) + return nil + } + mid := new(big.Int).Add(low, high) + mid = mid.Div(mid, two) + log.Debug("balances are not equal. spawn two concurrent downloaders", "low", low, "mid", mid, "high", high) + downloadEthConcurrently(c, client, downloader, account, low, mid) + downloadEthConcurrently(c, client, downloader, account, mid, high) + return nil + }) +} diff --git a/services/wallet/concurrent_test.go b/services/wallet/concurrent_test.go new file mode 100644 index 0000000000..d60be339cd --- /dev/null +++ b/services/wallet/concurrent_test.go @@ -0,0 +1,126 @@ +package wallet + +import ( + "context" + "errors" + "math/big" + "sort" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" +) + +func TestConcurrentErrorInterrupts(t *testing.T) { + concurrent := NewConcurrentDownloader(context.Background()) + var interrupted bool + concurrent.Add(func(ctx context.Context) error { + select { + case <-ctx.Done(): + interrupted = true + case <-time.After(10 * time.Second): + } + return nil + }) + err := errors.New("interrupt") + concurrent.Add(func(ctx context.Context) error { + return err + }) + concurrent.Wait() + require.True(t, interrupted) + require.Equal(t, err, concurrent.Error()) +} + +func TestConcurrentCollectsTransfers(t *testing.T) { + concurrent := NewConcurrentDownloader(context.Background()) + concurrent.Add(func(context.Context) error { + concurrent.Push(Transfer{}) + return nil + }) + concurrent.Add(func(context.Context) error { + concurrent.Push(Transfer{}) + return nil + }) + concurrent.Wait() + require.Len(t, concurrent.Get(), 2) +} + +type balancesFixture []*big.Int + +func (f balancesFixture) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) { + index := int(blockNumber.Int64()) + if index > len(f)-1 { + return nil, errors.New("balance unknown") + } + return f[index], nil +} + +type batchesFixture [][]Transfer + +func (f batchesFixture) GetTransfersByNumber(ctx context.Context, number *big.Int) (rst []Transfer, err error) { + index := int(number.Int64()) + if index > len(f)-1 { + return nil, errors.New("unknown block") + } + return f[index], nil +} + +func TestConcurrentEthDownloader(t *testing.T) { + type options struct { + balances balancesFixture + batches batchesFixture + result []Transfer + last *big.Int + } + type testCase struct { + desc string + options options + } + for _, tc := range []testCase{ + { + desc: "NoBalances", + options: options{ + last: big.NewInt(3), + balances: balancesFixture{big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0)}, + }, + }, + { + desc: "LastBlock", + options: options{ + last: big.NewInt(3), + balances: balancesFixture{big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(10)}, + batches: batchesFixture{{}, {}, {}, {{BlockNumber: big.NewInt(3)}, {BlockNumber: big.NewInt(3)}}}, + result: []Transfer{{BlockNumber: big.NewInt(3)}, {BlockNumber: big.NewInt(3)}}, + }, + }, + { + desc: "ChangesInEveryBlock", + options: options{ + last: big.NewInt(3), + balances: balancesFixture{big.NewInt(0), big.NewInt(3), big.NewInt(7), big.NewInt(10)}, + batches: batchesFixture{{}, {{BlockNumber: big.NewInt(1)}}, {{BlockNumber: big.NewInt(2)}}, {{BlockNumber: big.NewInt(3)}}}, + result: []Transfer{{BlockNumber: big.NewInt(1)}, {BlockNumber: big.NewInt(2)}, {BlockNumber: big.NewInt(3)}}, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + concurrent := NewConcurrentDownloader(ctx) + downloadEthConcurrently( + concurrent, tc.options.balances, tc.options.batches, + common.Address{}, zero, tc.options.last) + concurrent.Wait() + require.NoError(t, concurrent.Error()) + rst := concurrent.Get() + require.Len(t, rst, len(tc.options.result)) + sort.Slice(rst, func(i, j int) bool { + return rst[i].BlockNumber.Cmp(rst[j].BlockNumber) < 0 + }) + for i := range rst { + require.Equal(t, tc.options.result[i].BlockNumber, rst[i].BlockNumber) + } + }) + } +} diff --git a/services/wallet/database.go b/services/wallet/database.go index 6fc9ab4a12..936d731599 100644 --- a/services/wallet/database.go +++ b/services/wallet/database.go @@ -13,13 +13,41 @@ import ( "github.com/status-im/status-go/sqlite" ) +// DBHeader fields from header that are stored in database. +type DBHeader struct { + Number *big.Int + Hash common.Hash + // Head is true if the block was a head at the time it was pulled from chain. + Head bool +} + +func toDBHeader(header *types.Header) *DBHeader { + return &DBHeader{ + Hash: header.Hash(), + Number: header.Number, + } +} + +func toHead(header *types.Header) *DBHeader { + return &DBHeader{ + Hash: header.Hash(), + Number: header.Number, + Head: true, + } +} + +// SyncOption is used to specify that application processed transfers for that block. +type SyncOption uint + const ( - errNoRows = "sql: no rows in result set" + // sync options + ethSync SyncOption = 1 + erc20Sync SyncOption = 2 ) // InitializeDB creates db file at a given path and applies migrations. -func InitializeDB(path string) (*Database, error) { - db, err := sqlite.OpenDB(path) +func InitializeDB(path, password string) (*Database, error) { + db, err := sqlite.OpenDB(path, password) if err != nil { return nil, err } @@ -87,12 +115,9 @@ func (db Database) Close() error { } // ProcessTranfers atomically adds/removes blocks and adds new tranfers. -func (db Database) ProcessTranfers(transfers []Transfer, added, removed []*types.Header) (err error) { +func (db Database) ProcessTranfers(transfers []Transfer, accounts []common.Address, added, removed []*DBHeader, option SyncOption) (err error) { var ( - tx *sql.Tx - insert *sql.Stmt - blocks *sql.Stmt - delete *sql.Stmt + tx *sql.Tx ) tx, err = db.db.Begin() if err != nil { @@ -105,77 +130,47 @@ func (db Database) ProcessTranfers(transfers []Transfer, added, removed []*types } _ = tx.Rollback() }() - - insert, err = tx.Prepare("INSERT INTO transfers(hash, blk_hash, tx, receipt, type) VALUES (?, ?, ?, ?, ?)") + err = deleteHeaders(tx, removed) if err != nil { - return err + return } - delete, err = tx.Prepare("DELETE FROM blocks WHERE hash = ?") + err = insertHeaders(tx, added) if err != nil { - return err + return } - blocks, err = tx.Prepare("INSERT INTO blocks(hash, number, header) VALUES (?, ?, ?)") + err = insertTransfers(tx, transfers) if err != nil { - return err - } - for _, header := range removed { - _, err = delete.Exec(header.Hash()) - if err != nil { - return err - } - } - for _, header := range added { - _, err = blocks.Exec(header.Hash(), (*SQLBigInt)(header.Number), &JSONBlob{header}) - if err != nil { - return err - } + return } + err = updateAccounts(tx, accounts, added, option) + return +} - for _, t := range transfers { - _, err = insert.Exec(t.Transaction.Hash(), t.Header.Hash(), &JSONBlob{t.Transaction}, &JSONBlob{t.Receipt}, t.Type) - if err != nil { - return err - } +// GetTransfersByAddress loads transfers for a given address between two blocks. +func (db *Database) GetTransfersByAddress(address common.Address, start, end *big.Int) (rst []Transfer, err error) { + query := newTransfersQuery().FilterAddress(address).FilterStart(start).FilterEnd(end) + rows, err := db.db.Query(query.String(), query.Args()...) + if err != nil { + return } - return err + defer rows.Close() + return query.Scan(rows) } // GetTransfers load transfers transfer betweeen two blocks. func (db *Database) GetTransfers(start, end *big.Int) (rst []Transfer, err error) { - query := "SELECT type, blocks.header, tx, receipt FROM transfers JOIN blocks ON blk_hash = blocks.hash WHERE blocks.number >= ?" - var ( - rows *sql.Rows - ) - if end != nil { - query += " AND blocks.number <= ?" - rows, err = db.db.Query(query, (*SQLBigInt)(start), (*SQLBigInt)(end)) - } else { - rows, err = db.db.Query(query, (*SQLBigInt)(start)) - } + query := newTransfersQuery().FilterStart(start).FilterEnd(end) + rows, err := db.db.Query(query.String(), query.Args()...) if err != nil { return } defer rows.Close() - for rows.Next() { - transfer := Transfer{ - Header: &types.Header{}, - Transaction: &types.Transaction{}, - Receipt: &types.Receipt{}, - } - err = rows.Scan( - &transfer.Type, &JSONBlob{transfer.Header}, - &JSONBlob{transfer.Transaction}, &JSONBlob{transfer.Receipt}) - if err != nil { - return nil, err - } - rst = append(rst, transfer) - } - return + return query.Scan(rows) } // SaveHeader stores a single header. func (db *Database) SaveHeader(header *types.Header) error { - _, err := db.db.Exec("INSERT INTO blocks(number, hash, header) VALUES (?, ?, ?)", (*SQLBigInt)(header.Number), header.Hash(), &JSONBlob{header}) + _, err := db.db.Exec("INSERT INTO blocks(number, hash) VALUES (?, ?)", (*SQLBigInt)(header.Number), header.Hash()) return err } @@ -189,7 +184,7 @@ func (db *Database) SaveHeaders(headers []*types.Header) (err error) { if err != nil { return } - insert, err = tx.Prepare("INSERT INTO blocks(number, hash, header) VALUES (?,?,?)") + insert, err = tx.Prepare("INSERT INTO blocks(number, hash) VALUES (?,?)") if err != nil { return } @@ -202,7 +197,7 @@ func (db *Database) SaveHeaders(headers []*types.Header) (err error) { }() for _, h := range headers { - _, err = insert.Exec((*SQLBigInt)(h.Number), h.Hash(), &JSONBlob{h}) + _, err = insert.Exec((*SQLBigInt)(h.Number), h.Hash()) if err != nil { return } @@ -210,24 +205,31 @@ func (db *Database) SaveHeaders(headers []*types.Header) (err error) { return } -// LastHeader selects last header by block number. -func (db *Database) LastHeader() (header *types.Header, err error) { - rows, err := db.db.Query("SELECT header FROM blocks WHERE number = (SELECT MAX(number) FROM blocks)") +func (db *Database) SaveSyncedHeader(address common.Address, header *types.Header, option SyncOption) (err error) { + var ( + tx *sql.Tx + insert *sql.Stmt + ) + tx, err = db.db.Begin() if err != nil { - return nil, err + return } - defer rows.Close() - for rows.Next() { - header = &types.Header{} - err = rows.Scan(&JSONBlob{header}) - if err != nil { - return nil, err - } - if header != nil { - return header, nil + insert, err = tx.Prepare("INSERT INTO accounts_to_blocks(address, blk_number, sync) VALUES (?,?,?)") + if err != nil { + return + } + defer func() { + if err == nil { + err = tx.Commit() + } else { + _ = tx.Rollback() } + }() + _, err = insert.Exec(address, (*SQLBigInt)(header.Number), option) + if err != nil { + return } - return nil, nil + return err } // HeaderExists checks if header with hash exists in db. @@ -241,14 +243,119 @@ func (db *Database) HeaderExists(hash common.Hash) (bool, error) { } // GetHeaderByNumber selects header using block number. -func (db *Database) GetHeaderByNumber(number *big.Int) (*types.Header, error) { - header := &types.Header{} - err := db.db.QueryRow("SELECT header FROM blocks WHERE number = ?", (*SQLBigInt)(number)).Scan(&JSONBlob{header}) +func (db *Database) GetHeaderByNumber(number *big.Int) (header *DBHeader, err error) { + header = &DBHeader{Hash: common.Hash{}, Number: new(big.Int)} + err = db.db.QueryRow("SELECT hash,number FROM blocks WHERE number = ?", (*SQLBigInt)(number)).Scan(&header.Hash, (*SQLBigInt)(header.Number)) + if err == nil { + return header, nil + } + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err +} + +func (db *Database) GetLastHead() (header *DBHeader, err error) { + header = &DBHeader{Hash: common.Hash{}, Number: new(big.Int)} + err = db.db.QueryRow("SELECT hash,number FROM blocks WHERE head = 1 AND number = (SELECT MAX(number) FROM blocks)").Scan(&header.Hash, (*SQLBigInt)(header.Number)) + if err == nil { + return header, nil + } + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err +} + +// GetLatestSynced downloads last synced block with a given option. +func (db *Database) GetLatestSynced(address common.Address, option SyncOption) (header *DBHeader, err error) { + header = &DBHeader{Hash: common.Hash{}, Number: new(big.Int)} + err = db.db.QueryRow(` +SELECT blocks.hash, blk_number FROM accounts_to_blocks JOIN blocks ON blk_number = blocks.number WHERE address = $1 AND blk_number += (SELECT MAX(blk_number) FROM accounts_to_blocks WHERE address = $1 AND sync & $2 = $2)`, address, option).Scan(&header.Hash, (*SQLBigInt)(header.Number)) if err == nil { return header, nil } - if err.Error() == errNoRows { + if err == sql.ErrNoRows { return nil, nil } return nil, err } + +// statementCreator allows to pass transaction or database to use in consumer. +type statementCreator interface { + Prepare(query string) (*sql.Stmt, error) +} + +func deleteHeaders(creator statementCreator, headers []*DBHeader) error { + delete, err := creator.Prepare("DELETE FROM blocks WHERE hash = ?") + if err != nil { + return err + } + for _, h := range headers { + _, err = delete.Exec(h.Hash) + if err != nil { + return err + } + } + return nil +} + +func insertHeaders(creator statementCreator, headers []*DBHeader) error { + insert, err := creator.Prepare("INSERT OR IGNORE INTO blocks(hash, number, head) VALUES (?, ?, ?)") + if err != nil { + return err + } + for _, h := range headers { + _, err = insert.Exec(h.Hash, (*SQLBigInt)(h.Number), h.Head) + if err != nil { + return err + } + } + return nil +} + +func insertTransfers(creator statementCreator, transfers []Transfer) error { + insert, err := creator.Prepare("INSERT OR IGNORE INTO transfers(hash, blk_hash, address, tx, receipt, type) VALUES (?, ?, ?, ?, ?, ?)") + if err != nil { + return err + } + for _, t := range transfers { + _, err = insert.Exec(t.ID, t.BlockHash, t.Address, &JSONBlob{t.Transaction}, &JSONBlob{t.Receipt}, t.Type) + if err != nil { + return err + } + } + return nil +} + +func updateAccounts(creator statementCreator, accounts []common.Address, headers []*DBHeader, option SyncOption) error { + update, err := creator.Prepare("UPDATE accounts_to_blocks SET sync=sync|? WHERE address=? AND blk_number=?") + if err != nil { + return err + } + insert, err := creator.Prepare("INSERT OR IGNORE INTO accounts_to_blocks(address,blk_number,sync) VALUES(?,?,?)") + if err != nil { + return err + } + for _, acc := range accounts { + for _, h := range headers { + rst, err := update.Exec(option, acc, (*SQLBigInt)(h.Number)) + if err != nil { + return err + } + affected, err := rst.RowsAffected() + if err != nil { + return err + } + if affected > 0 { + continue + } + _, err = insert.Exec(acc, (*SQLBigInt)(h.Number), option) + if err != nil { + return err + } + } + } + return nil +} diff --git a/services/wallet/database_test.go b/services/wallet/database_test.go index c643f8a705..1197324299 100644 --- a/services/wallet/database_test.go +++ b/services/wallet/database_test.go @@ -14,7 +14,7 @@ import ( func setupTestDB(t *testing.T) (*Database, func()) { tmpfile, err := ioutil.TempFile("", "wallet-tests-") require.NoError(t, err) - db, err := InitializeDB(tmpfile.Name()) + db, err := InitializeDB(tmpfile.Name(), "wallet-tests") require.NoError(t, err) return db, func() { require.NoError(t, db.Close()) @@ -22,7 +22,7 @@ func setupTestDB(t *testing.T) (*Database, func()) { } } -func TestGetHeaderByNumber(t *testing.T) { +func TestDBGetHeaderByNumber(t *testing.T) { db, stop := setupTestDB(t) defer stop() header := &types.Header{ @@ -33,10 +33,10 @@ func TestGetHeaderByNumber(t *testing.T) { require.NoError(t, db.SaveHeader(header)) rst, err := db.GetHeaderByNumber(header.Number) require.NoError(t, err) - require.Equal(t, header.Hash(), rst.Hash()) + require.Equal(t, header.Hash(), rst.Hash) } -func TestGetHeaderByNumberNoRows(t *testing.T) { +func TestDBGetHeaderByNumberNoRows(t *testing.T) { db, stop := setupTestDB(t) defer stop() rst, err := db.GetHeaderByNumber(big.NewInt(1)) @@ -44,7 +44,7 @@ func TestGetHeaderByNumberNoRows(t *testing.T) { require.Nil(t, rst) } -func TestHeaderExists(t *testing.T) { +func TestDBHeaderExists(t *testing.T) { db, stop := setupTestDB(t) defer stop() header := &types.Header{ @@ -58,7 +58,7 @@ func TestHeaderExists(t *testing.T) { require.True(t, rst) } -func TestHeaderDoesntExist(t *testing.T) { +func TestDBHeaderDoesntExist(t *testing.T) { db, stop := setupTestDB(t) defer stop() @@ -67,77 +67,48 @@ func TestHeaderDoesntExist(t *testing.T) { require.False(t, rst) } -func TestLastHeader(t *testing.T) { +func TestDBProcessTransfer(t *testing.T) { db, stop := setupTestDB(t) defer stop() - - template := types.Header{ - Difficulty: big.NewInt(1), - Time: big.NewInt(1), - } - first := template - first.Number = big.NewInt(10) - second := template - second.Number = big.NewInt(11) - require.NoError(t, db.SaveHeader(&second)) - require.NoError(t, db.SaveHeader(&first)) - - rst, err := db.LastHeader() - require.NoError(t, err) - require.Equal(t, second.Hash(), rst.Hash()) -} - -func TestNoLastHeader(t *testing.T) { - db, stop := setupTestDB(t) - defer stop() - header, err := db.LastHeader() - require.NoError(t, err) - require.Nil(t, header) -} - -func TestProcessTransfer(t *testing.T) { - db, stop := setupTestDB(t) - defer stop() - header := &types.Header{ - Number: big.NewInt(1), - Difficulty: big.NewInt(1), - Time: big.NewInt(1), + header := &DBHeader{ + Number: big.NewInt(1), + Hash: common.Hash{1}, } tx := types.NewTransaction(1, common.Address{1}, nil, 10, big.NewInt(10), nil) transfers := []Transfer{ { + ID: common.Hash{1}, Type: ethTransfer, - Header: header, + BlockHash: header.Hash, + BlockNumber: header.Number, Transaction: tx, Receipt: types.NewReceipt(nil, false, 100), }, } - require.NoError(t, db.ProcessTranfers(transfers, []*types.Header{header}, nil)) + require.NoError(t, db.ProcessTranfers(transfers, nil, []*DBHeader{header}, nil, 0)) } -func TestReorgTransfers(t *testing.T) { +func TestDBReorgTransfers(t *testing.T) { db, stop := setupTestDB(t) defer stop() rcpt := types.NewReceipt(nil, false, 100) rcpt.Logs = []*types.Log{} - original := &types.Header{ - Number: big.NewInt(1), - Difficulty: big.NewInt(1), - Time: big.NewInt(1), + original := &DBHeader{ + Number: big.NewInt(1), + Hash: common.Hash{1}, } - replaced := &types.Header{ - Number: big.NewInt(1), - Difficulty: big.NewInt(2), - Time: big.NewInt(2), + replaced := &DBHeader{ + Number: big.NewInt(1), + Hash: common.Hash{2}, } originalTX := types.NewTransaction(1, common.Address{1}, nil, 10, big.NewInt(10), nil) replacedTX := types.NewTransaction(2, common.Address{1}, nil, 10, big.NewInt(10), nil) require.NoError(t, db.ProcessTranfers([]Transfer{ - {ethTransfer, original, originalTX, rcpt}, - }, []*types.Header{original}, nil)) + {ethTransfer, common.Hash{1}, *originalTX.To(), original.Number, original.Hash, originalTX, rcpt}, + }, nil, []*DBHeader{original}, nil, 0)) require.NoError(t, db.ProcessTranfers([]Transfer{ - {ethTransfer, replaced, replacedTX, rcpt}, - }, []*types.Header{replaced}, []*types.Header{original})) + {ethTransfer, common.Hash{2}, *replacedTX.To(), replaced.Number, replaced.Hash, replacedTX, rcpt}, + }, nil, []*DBHeader{replaced}, []*DBHeader{original}, 0)) all, err := db.GetTransfers(big.NewInt(0), nil) require.NoError(t, err) @@ -145,30 +116,31 @@ func TestReorgTransfers(t *testing.T) { require.Equal(t, replacedTX.Hash(), all[0].Transaction.Hash()) } -func TestGetTransfersFromBlock(t *testing.T) { +func TestDBGetTransfersFromBlock(t *testing.T) { db, stop := setupTestDB(t) defer stop() - headers := []*types.Header{} + headers := []*DBHeader{} transfers := []Transfer{} for i := 1; i < 10; i++ { - header := &types.Header{ - Number: big.NewInt(int64(i)), - Difficulty: big.NewInt(1), - Time: big.NewInt(1), + header := &DBHeader{ + Number: big.NewInt(int64(i)), + Hash: common.Hash{byte(i)}, } headers = append(headers, header) tx := types.NewTransaction(uint64(i), common.Address{1}, nil, 10, big.NewInt(10), nil) receipt := types.NewReceipt(nil, false, 100) receipt.Logs = []*types.Log{} transfer := Transfer{ + ID: tx.Hash(), Type: ethTransfer, - Header: header, + BlockNumber: header.Number, + BlockHash: header.Hash, Transaction: tx, Receipt: receipt, } transfers = append(transfers, transfer) } - require.NoError(t, db.ProcessTranfers(transfers, headers, nil)) + require.NoError(t, db.ProcessTranfers(transfers, nil, headers, nil, 0)) rst, err := db.GetTransfers(big.NewInt(7), nil) require.NoError(t, err) require.Len(t, rst, 3) @@ -178,3 +150,86 @@ func TestGetTransfersFromBlock(t *testing.T) { require.Len(t, rst, 4) } + +func TestDBLatestSynced(t *testing.T) { + db, stop := setupTestDB(t) + defer stop() + + address := common.Address{1} + h1 := &types.Header{ + Number: big.NewInt(10), + Difficulty: big.NewInt(1), + Time: big.NewInt(1), + } + h2 := &types.Header{ + Number: big.NewInt(9), + Difficulty: big.NewInt(1), + Time: big.NewInt(1), + } + require.NoError(t, db.SaveHeader(h1)) + require.NoError(t, db.SaveHeader(h2)) + require.NoError(t, db.SaveSyncedHeader(address, h1, ethSync)) + require.NoError(t, db.SaveSyncedHeader(address, h2, ethSync)) + + latest, err := db.GetLatestSynced(address, ethSync) + require.NoError(t, err) + require.NotNil(t, latest) + require.Equal(t, h1.Number, latest.Number) + require.Equal(t, h1.Hash(), latest.Hash) +} + +func TestDBLatestSyncedDoesntExist(t *testing.T) { + db, stop := setupTestDB(t) + defer stop() + + latest, err := db.GetLatestSynced(common.Address{1}, ethSync) + require.NoError(t, err) + require.Nil(t, latest) +} + +func TestDBProcessTransfersUpdate(t *testing.T) { + db, stop := setupTestDB(t) + defer stop() + + address := common.Address{1} + header := &DBHeader{ + Number: big.NewInt(10), + Hash: common.Hash{1}, + } + transfer := Transfer{ + ID: common.Hash{1}, + BlockNumber: header.Number, + BlockHash: header.Hash, + Transaction: types.NewTransaction(0, common.Address{}, nil, 0, nil, nil), + Address: address, + } + require.NoError(t, db.ProcessTranfers([]Transfer{transfer}, []common.Address{address}, []*DBHeader{header}, nil, ethSync)) + require.NoError(t, db.ProcessTranfers([]Transfer{transfer}, []common.Address{address}, []*DBHeader{header}, nil, erc20Sync)) + + latest, err := db.GetLatestSynced(address, ethSync|erc20Sync) + require.NoError(t, err) + require.Equal(t, header.Hash, latest.Hash) +} + +func TestDBLastHeadExist(t *testing.T) { + db, stop := setupTestDB(t) + defer stop() + + headers := []*DBHeader{ + {Number: big.NewInt(1), Hash: common.Hash{1}, Head: true}, + {Number: big.NewInt(2), Hash: common.Hash{2}, Head: true}, + {Number: big.NewInt(3), Hash: common.Hash{3}, Head: true}, + } + require.NoError(t, db.ProcessTranfers(nil, nil, headers, nil, 0)) + last, err := db.GetLastHead() + require.NoError(t, err) + require.Equal(t, headers[2].Hash, last.Hash) +} + +func TestDBLastHeadDoesntExist(t *testing.T) { + db, stop := setupTestDB(t) + defer stop() + last, err := db.GetLastHead() + require.NoError(t, err) + require.Nil(t, last) +} diff --git a/services/wallet/downloader.go b/services/wallet/downloader.go index c55fd64a92..fc6a20a2fa 100644 --- a/services/wallet/downloader.go +++ b/services/wallet/downloader.go @@ -2,13 +2,17 @@ package wallet import ( "context" + "encoding/binary" + "errors" "math/big" + "time" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/log" ) // TransferType type of the asset that was transferred. @@ -22,65 +26,103 @@ const ( ) var ( - one = big.NewInt(1) + zero = big.NewInt(0) + one = big.NewInt(1) + two = big.NewInt(2) ) // Transfer stores information about transfer. type Transfer struct { - Type TransferType - Header *types.Header - Transaction *types.Transaction - Receipt *types.Receipt + Type TransferType `json:"type"` + ID common.Hash `json:"-"` + Address common.Address `json:"address"` + BlockNumber *big.Int `json:"blockNumber"` + BlockHash common.Hash `json:"blockhash"` + Transaction *types.Transaction `json:"transaction"` + Receipt *types.Receipt `json:"receipt"` } // ETHTransferDownloader downloads regular eth transfers. type ETHTransferDownloader struct { - client *ethclient.Client - address common.Address - signer types.Signer + client *ethclient.Client + accounts []common.Address + signer types.Signer } // GetTransfers checks if the balance was changed between two blocks. // If so it downloads transaction that transfer ethereum from that block. -func (d *ETHTransferDownloader) GetTransfers(ctx context.Context, header *types.Header) (rst []Transfer, err error) { +func (d *ETHTransferDownloader) GetTransfers(ctx context.Context, header *DBHeader) (rst []Transfer, err error) { // TODO(dshulyak) consider caching balance and reset it on reorg num := new(big.Int).Sub(header.Number, one) - balance, err := d.client.BalanceAt(ctx, d.address, num) + changed := []common.Address{} + for _, address := range d.accounts { + balance, err := d.client.BalanceAt(ctx, address, num) + if err != nil { + return nil, err + } + current, err := d.client.BalanceAt(ctx, address, header.Number) + if err != nil { + return nil, err + } + if current.Cmp(balance) != 0 { + changed = append(changed, address) + } + } + if len(changed) == 0 { + return nil, nil + } + blk, err := d.client.BlockByHash(ctx, header.Hash) if err != nil { return nil, err } - current, err := d.client.BalanceAt(ctx, d.address, header.Number) + rst, err = d.getTransfersInBlock(ctx, blk, changed) if err != nil { return nil, err } - if current.Cmp(balance) == 0 { - return nil, nil + return rst, nil +} + +func (d *ETHTransferDownloader) GetTransfersByNumber(ctx context.Context, number *big.Int) ([]Transfer, error) { + blk, err := d.client.BlockByNumber(ctx, number) + if err != nil { + return nil, err } - blk, err := d.client.BlockByHash(ctx, header.Hash()) + rst, err := d.getTransfersInBlock(ctx, blk, d.accounts) if err != nil { return nil, err } + return rst, err +} + +func (d *ETHTransferDownloader) getTransfersInBlock(ctx context.Context, blk *types.Block, accounts []common.Address) (rst []Transfer, err error) { for _, tx := range blk.Transactions() { - if *tx.To() == d.address { - receipt, err := d.client.TransactionReceipt(ctx, tx.Hash()) - if err != nil { - return nil, err - } - rst = append(rst, Transfer{Type: ethTransfer, Header: header, Transaction: tx, Receipt: receipt}) - continue - } + var address *common.Address from, err := types.Sender(d.signer, tx) if err != nil { return nil, err } - // payload is empty for eth transfers - if from == d.address && len(tx.Data()) == 0 { + if any(from, accounts) { + address = &from + } else if tx.To() != nil && any(*tx.To(), accounts) { + address = tx.To() + } + if address != nil { receipt, err := d.client.TransactionReceipt(ctx, tx.Hash()) if err != nil { return nil, err } - rst = append(rst, Transfer{Type: ethTransfer, Header: header, Transaction: tx, Receipt: receipt}) - continue + if isTokenTransfer(receipt.Logs) { + log.Debug("eth downloader found token transfer", "hash", tx.Hash()) + continue + } + rst = append(rst, Transfer{ + Type: ethTransfer, + ID: tx.Hash(), + Address: *address, + BlockNumber: blk.Number(), + BlockHash: blk.Hash(), + Transaction: tx, Receipt: receipt}) + } } // TODO(dshulyak) test that balance difference was covered by transactions @@ -88,69 +130,175 @@ func (d *ETHTransferDownloader) GetTransfers(ctx context.Context, header *types. } // NewERC20TransfersDownloader returns new instance. -func NewERC20TransfersDownloader(client *ethclient.Client, address common.Address) *ERC20TransfersDownloader { +func NewERC20TransfersDownloader(client *ethclient.Client, accounts []common.Address) *ERC20TransfersDownloader { signature := crypto.Keccak256Hash([]byte(erc20TransferEventSignature)) - target := common.Hash{} - copy(target[12:], address[:]) return &ERC20TransfersDownloader{ client: client, - address: address, + accounts: accounts, signature: signature, - target: target, } } // ERC20TransfersDownloader is a downloader for erc20 tokens transfers. type ERC20TransfersDownloader struct { - client *ethclient.Client - address common.Address + client *ethclient.Client + accounts []common.Address // hash of the Transfer event signature signature common.Hash - // padded address - target common.Hash } -// GetTransfers for erc20 uses eth_getLogs rpc with Transfer event signature and our address acount. -func (d *ERC20TransfersDownloader) GetTransfers(ctx context.Context, header *types.Header) ([]Transfer, error) { - hash := header.Hash() - outbound, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{ - BlockHash: &hash, - Topics: [][]common.Hash{{d.signature}, {d.target}, {}}, - }) +func (d *ERC20TransfersDownloader) paddedAddress(address common.Address) common.Hash { + rst := common.Hash{} + copy(rst[12:], address[:]) + return rst +} + +func (d *ERC20TransfersDownloader) inboundTopics(address common.Address) [][]common.Hash { + return [][]common.Hash{{d.signature}, {}, {d.paddedAddress(address)}} +} + +func (d *ERC20TransfersDownloader) outboundTopics(address common.Address) [][]common.Hash { + return [][]common.Hash{{d.signature}, {d.paddedAddress(address)}, {}} +} + +func (d *ERC20TransfersDownloader) transferFromLog(parent context.Context, log types.Log, address common.Address) (Transfer, error) { + ctx, cancel := context.WithTimeout(parent, 3*time.Second) + tx, _, err := d.client.TransactionByHash(ctx, log.TxHash) + cancel() if err != nil { - return nil, err + return Transfer{}, err } - inbound, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{ - BlockHash: &hash, - Topics: [][]common.Hash{{d.signature}, {}, {d.target}}, - }) + ctx, cancel = context.WithTimeout(parent, 3*time.Second) + receipt, err := d.client.TransactionReceipt(ctx, log.TxHash) + cancel() if err != nil { - return nil, err + return Transfer{}, err } - lth := len(outbound) + len(inbound) - if lth == 0 { - return nil, nil + // TODO(dshulyak) what is the max number of logs? + index := [4]byte{} + binary.BigEndian.PutUint32(index[:], uint32(log.Index)) + id := crypto.Keccak256Hash(log.TxHash.Bytes(), index[:]) + return Transfer{ + Address: address, + ID: id, + Type: erc20Transfer, + BlockNumber: new(big.Int).SetUint64(log.BlockNumber), + BlockHash: log.BlockHash, + Transaction: tx, + Receipt: receipt, + }, nil +} + +func (d *ERC20TransfersDownloader) transfersFromLogs(parent context.Context, logs []types.Log, address common.Address) ([]Transfer, error) { + concurrent := NewConcurrentDownloader(parent) + for i := range logs { + l := logs[i] + concurrent.Add(func(ctx context.Context) error { + transfer, err := d.transferFromLog(ctx, l, address) + if err != nil { + return err + } + concurrent.Push(transfer) + return nil + }) } - all := make([]types.Log, lth) - copy(all, outbound) - copy(all[len(outbound):], inbound) - rst := make([]Transfer, lth) - for i, l := range all { - tx, err := d.client.TransactionInBlock(ctx, hash, l.TxIndex) + select { + case <-concurrent.WaitAsync(): + case <-parent.Done(): + return nil, errors.New("logs downloader stuck") + } + return concurrent.Get(), nil +} + +// GetTransfers for erc20 uses eth_getLogs rpc with Transfer event signature and our address acount. +func (d *ERC20TransfersDownloader) GetTransfers(ctx context.Context, header *DBHeader) ([]Transfer, error) { + hash := header.Hash + transfers := []Transfer{} + for _, address := range d.accounts { + outbound, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{ + BlockHash: &hash, + Topics: d.outboundTopics(address), + }) if err != nil { return nil, err } - receipt, err := d.client.TransactionReceipt(ctx, l.TxHash) + inbound, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{ + BlockHash: &hash, + Topics: d.inboundTopics(address), + }) if err != nil { return nil, err } - rst[i] = Transfer{ - Type: erc20Transfer, - Header: header, - Transaction: tx, - Receipt: receipt, + logs := append(outbound, inbound...) + if len(logs) == 0 { + continue } + rst, err := d.transfersFromLogs(ctx, logs, address) + if err != nil { + return nil, err + } + transfers = append(transfers, rst...) } - return rst, nil + return transfers, nil +} + +// GetTransfersInRange returns transfers between two blocks. +// time to get logs for 100000 blocks = 1.144686979s. with 249 events in the result set. +func (d *ERC20TransfersDownloader) GetTransfersInRange(parent context.Context, from, to *big.Int) ([]Transfer, error) { + start := time.Now() + log.Debug("get erc20 transfers in range", "from", from, "to", to) + transfers := []Transfer{} + for _, address := range d.accounts { + ctx, cancel := context.WithTimeout(parent, 5*time.Second) + outbound, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{ + FromBlock: from, + ToBlock: to, + Topics: d.outboundTopics(address), + }) + cancel() + if err != nil { + return nil, err + } + ctx, cancel = context.WithTimeout(parent, 5*time.Second) + inbound, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{ + FromBlock: from, + ToBlock: to, + Topics: d.inboundTopics(address), + }) + cancel() + if err != nil { + return nil, err + } + logs := append(outbound, inbound...) + if len(logs) == 0 { + continue + } + rst, err := d.transfersFromLogs(parent, logs, address) + if err != nil { + return nil, err + } + transfers = append(transfers, rst...) + } + log.Debug("found erc20 transfers between two blocks", "from", from, "to", to, "lth", len(transfers), "took", time.Since(start)) + return transfers, nil +} + +func any(address common.Address, compare []common.Address) bool { + for _, c := range compare { + if c == address { + return true + } + } + return false +} + +func isTokenTransfer(logs []*types.Log) bool { + signature := crypto.Keccak256Hash([]byte(erc20TransferEventSignature)) + for _, l := range logs { + if len(l.Topics) > 0 && l.Topics[0] == signature { + return true + } + } + return false } diff --git a/services/wallet/downloader_test.go b/services/wallet/downloader_test.go index 0ac5f1df02..084297a188 100644 --- a/services/wallet/downloader_test.go +++ b/services/wallet/downloader_test.go @@ -48,9 +48,9 @@ func (s *ETHTransferSuite) SetupTest() { s.ethclient = ethclient.NewClient(client) s.signer = types.NewEIP155Signer(big.NewInt(1337)) s.downloader = ÐTransferDownloader{ - signer: s.signer, - client: s.ethclient, - address: crypto.PubkeyToAddress(s.identity.PublicKey), + signer: s.signer, + client: s.ethclient, + accounts: []common.Address{crypto.PubkeyToAddress(s.identity.PublicKey)}, } } @@ -67,7 +67,7 @@ func (s *ETHTransferSuite) TestNoBalance() { header, err := s.ethclient.HeaderByNumber(ctx, nil) s.Require().NoError(err) - transfers, err := s.downloader.GetTransfers(ctx, header) + transfers, err := s.downloader.GetTransfers(ctx, toDBHeader(header)) s.Require().NoError(err) s.Require().Empty(transfers) } @@ -86,7 +86,7 @@ func (s *ETHTransferSuite) TestBalanceUpdatedOnInbound() { header, err := s.ethclient.HeaderByNumber(ctx, nil) s.Require().NoError(err) s.Require().Equal(big.NewInt(1), header.Number) - transfers, err := s.downloader.GetTransfers(ctx, header) + transfers, err := s.downloader.GetTransfers(ctx, toDBHeader(header)) s.Require().NoError(err) s.Require().Len(transfers, 1) } @@ -114,7 +114,7 @@ func (s *ETHTransferSuite) TestBalanceUpdatedOnOutbound() { header, err := s.ethclient.HeaderByNumber(ctx, nil) s.Require().NoError(err) s.Require().Equal(big.NewInt(2), header.Number) - transfers, err := s.downloader.GetTransfers(ctx, header) + transfers, err := s.downloader.GetTransfers(ctx, toDBHeader(header)) s.Require().NoError(err) s.Require().Len(transfers, 1) } @@ -150,7 +150,7 @@ func (s *ERC20TransferSuite) SetupTest() { client, err := node.Attach() s.Require().NoError(err) s.ethclient = ethclient.NewClient(client) - s.downloader = NewERC20TransfersDownloader(s.ethclient, crypto.PubkeyToAddress(s.identity.PublicKey)) + s.downloader = NewERC20TransfersDownloader(s.ethclient, []common.Address{crypto.PubkeyToAddress(s.identity.PublicKey)}) _, tx, contract, err := erc20.DeployERC20Transfer(bind.NewKeyedTransactor(s.faucet), s.ethclient) s.Require().NoError(err) @@ -166,7 +166,7 @@ func (s *ERC20TransferSuite) TestNoEvents() { header, err := s.ethclient.HeaderByNumber(context.TODO(), nil) s.Require().NoError(err) - transfers, err := s.downloader.GetTransfers(context.TODO(), header) + transfers, err := s.downloader.GetTransfers(context.TODO(), toDBHeader(header)) s.Require().NoError(err) s.Require().Empty(transfers) } @@ -183,7 +183,7 @@ func (s *ERC20TransferSuite) TestInboundEvent() { header, err := s.ethclient.HeaderByNumber(context.TODO(), nil) s.Require().NoError(err) - transfers, err := s.downloader.GetTransfers(context.TODO(), header) + transfers, err := s.downloader.GetTransfers(context.TODO(), toDBHeader(header)) s.Require().NoError(err) s.Require().Len(transfers, 1) } @@ -212,7 +212,22 @@ func (s *ERC20TransferSuite) TestOutboundEvent() { header, err := s.ethclient.HeaderByNumber(context.TODO(), nil) s.Require().NoError(err) - transfers, err := s.downloader.GetTransfers(context.TODO(), header) + transfers, err := s.downloader.GetTransfers(context.TODO(), toDBHeader(header)) s.Require().NoError(err) s.Require().Len(transfers, 1) } + +func (s *ERC20TransferSuite) TestInRange() { + for i := 0; i < 5; i++ { + tx, err := s.contract.Transfer(bind.NewKeyedTransactor(s.faucet), crypto.PubkeyToAddress(s.identity.PublicKey), + big.NewInt(100)) + s.Require().NoError(err) + timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, err = bind.WaitMined(timeout, s.ethclient, tx) + s.Require().NoError(err) + } + transfers, err := s.downloader.GetTransfersInRange(context.TODO(), big.NewInt(1), nil) + s.Require().NoError(err) + s.Require().Len(transfers, 5) +} diff --git a/services/wallet/events.go b/services/wallet/events.go index d857d1f8c9..f126daab76 100644 --- a/services/wallet/events.go +++ b/services/wallet/events.go @@ -1,19 +1,26 @@ package wallet -import "math/big" +import ( + "math/big" + + "github.com/ethereum/go-ethereum/common" +) // EventType type for event types. type EventType string const ( // EventNewBlock emitted when new block was added to the same canonical chan. - EventNewBlock = "newblock" + EventNewBlock EventType = "newblock" // EventReorg emitted when canonical chain was changed. In this case, BlockNumber will be an earliest added block. - EventReorg = "reorg" + EventReorg EventType = "reorg" + // EventNewHistory emitted if transfer from older block was added. + EventNewHistory EventType = "history" ) // Event is a type for wallet events. type Event struct { - Type EventType - BlockNumber *big.Int + Type EventType `json:"type"` + BlockNumber *big.Int `json:"blockNumber"` + Accounts []common.Address `json:"accounts"` } diff --git a/services/wallet/iterative.go b/services/wallet/iterative.go new file mode 100644 index 0000000000..0cccabdc29 --- /dev/null +++ b/services/wallet/iterative.go @@ -0,0 +1,92 @@ +package wallet + +import ( + "context" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" +) + +// SetupIterativeDownloader configures IterativeDownloader with last known synced block. +func SetupIterativeDownloader( + db *Database, client HeaderReader, address common.Address, option SyncOption, + downloader BatchDownloader, size *big.Int, to *DBHeader) (*IterativeDownloader, error) { + from, err := db.GetLatestSynced(address, option) + if err != nil { + log.Error("failed to get latest synced block", "error", err) + return nil, err + } + if from == nil { + from = &DBHeader{Number: zero} + } + log.Debug("iterative downloader", "address", address, "from", from.Number, "to", to.Number) + d := &IterativeDownloader{ + client: client, + batchSize: size, + downloader: downloader, + from: from, + to: to, + } + return d, nil +} + +// BatchDownloader interface for loading transfers in batches in speificed range of blocks. +type BatchDownloader interface { + GetTransfersInRange(ctx context.Context, from, to *big.Int) ([]Transfer, error) +} + +// IterativeDownloader downloads batches of transfers in a specified size. +type IterativeDownloader struct { + client HeaderReader + + batchSize *big.Int + + downloader BatchDownloader + + from, to *DBHeader + previous *DBHeader +} + +// Finished true when earliest block with given sync option is zero. +func (d *IterativeDownloader) Finished() bool { + return d.from.Number.Cmp(d.to.Number) == 0 +} + +// Header return last synced header. +func (d *IterativeDownloader) Header() *DBHeader { + return d.previous +} + +// Next moves closer to the end on every new iteration. +func (d *IterativeDownloader) Next(parent context.Context) ([]Transfer, error) { + to := new(big.Int).Add(d.from.Number, d.batchSize) + // if start < 0; start = 0 + if to.Cmp(d.to.Number) == 1 { + to = d.to.Number + } + transfers, err := d.downloader.GetTransfersInRange(parent, d.from.Number, to) + if err != nil { + log.Error("failed to get transfer inbetween two bloks", "from", d.from.Number, "to", to, "error", err) + return nil, err + } + // use integers instead of DBHeader + ctx, cancel := context.WithTimeout(parent, 5*time.Second) + header, err := d.client.HeaderByNumber(ctx, to) + cancel() + if err != nil { + log.Error("failed to get header by number", "from", d.from.Number, "to", to, "error", err) + return nil, err + } + d.previous, d.from = d.from, toDBHeader(header) + return transfers, nil +} + +// Revert reverts last step progress. Should be used if application failed to process transfers. +// For example failed to persist them. +func (d *IterativeDownloader) Revert() { + if d.previous != nil { + d.from = d.previous + } +} diff --git a/services/wallet/iterative_test.go b/services/wallet/iterative_test.go new file mode 100644 index 0000000000..6739e046d0 --- /dev/null +++ b/services/wallet/iterative_test.go @@ -0,0 +1,117 @@ +package wallet + +import ( + "context" + "errors" + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/require" +) + +type transfersFixture []Transfer + +func (f transfersFixture) GetTransfersInRange(ctx context.Context, from, to *big.Int) ([]Transfer, error) { + rst := []Transfer{} + for _, t := range f { + if t.BlockNumber.Cmp(from) >= 0 && t.BlockNumber.Cmp(to) <= 0 { + rst = append(rst, t) + } + } + return rst, nil +} + +func TestIterFinished(t *testing.T) { + iterator := IterativeDownloader{ + from: &DBHeader{Number: big.NewInt(10)}, + to: &DBHeader{Number: big.NewInt(10)}, + } + require.True(t, iterator.Finished()) +} + +func TestIterNotFinished(t *testing.T) { + iterator := IterativeDownloader{ + from: &DBHeader{Number: big.NewInt(2)}, + to: &DBHeader{Number: big.NewInt(5)}, + } + require.False(t, iterator.Finished()) +} + +func TestIterRevert(t *testing.T) { + iterator := IterativeDownloader{ + from: &DBHeader{Number: big.NewInt(12)}, + to: &DBHeader{Number: big.NewInt(12)}, + previous: &DBHeader{Number: big.NewInt(9)}, + } + require.True(t, iterator.Finished()) + iterator.Revert() + require.False(t, iterator.Finished()) +} + +func TestIterProgress(t *testing.T) { + var ( + chain headers = genHeadersChain(10, 1) + transfers = make(transfersFixture, 10) + ) + for i := range transfers { + transfers[i] = Transfer{ + BlockNumber: chain[i].Number, + BlockHash: chain[i].Hash(), + } + } + iter := &IterativeDownloader{ + client: chain, + downloader: transfers, + batchSize: big.NewInt(5), + from: &DBHeader{Number: big.NewInt(0)}, + to: &DBHeader{Number: big.NewInt(9)}, + } + batch, err := iter.Next(context.TODO()) + require.NoError(t, err) + require.Len(t, batch, 6) + batch, err = iter.Next(context.TODO()) + require.NoError(t, err) + require.Len(t, batch, 5) + require.True(t, iter.Finished()) +} + +type headers []*types.Header + +func (h headers) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { + for _, item := range h { + if item.Hash() == hash { + return item, nil + } + } + return nil, errors.New("not found") +} + +func (h headers) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + for _, item := range h { + if item.Number.Cmp(number) == 0 { + return item, nil + } + } + return nil, errors.New("not found") +} + +func (h headers) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) { + return nil, errors.New("not implemented") +} + +func genHeadersChain(size, difficulty int) []*types.Header { + rst := make([]*types.Header, size) + for i := 0; i < size; i++ { + rst[i] = &types.Header{ + Number: big.NewInt(int64(i)), + Difficulty: big.NewInt(int64(difficulty)), + Time: big.NewInt(1), + } + if i != 0 { + rst[i].ParentHash = rst[i-1].Hash() + } + } + return rst +} diff --git a/services/wallet/migrations/bindata.go b/services/wallet/migrations/bindata.go index f0847ab3ac..88abbccca0 100644 --- a/services/wallet/migrations/bindata.go +++ b/services/wallet/migrations/bindata.go @@ -34,7 +34,7 @@ func _0001_transfers_down_db_sql() ([]byte, error) { ) } -var __0001_transfers_up_db_sql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x7c\x8f\x4d\x4e\xc3\x30\x10\x85\xf7\x3e\xc5\x5b\x26\x52\x6e\xd0\x95\xed\x4c\xda\x11\xc6\x86\x89\x43\xe9\x0a\x25\xc5\x28\xa8\xa5\x54\x49\x90\xe0\xf6\xa8\x04\x84\xf8\x51\x97\x33\xef\x9b\x79\xfa\xac\x90\x8e\x84\xa8\x8d\x23\x70\x05\x1f\x22\xe8\x96\xeb\x58\x63\x1a\xda\xc3\xf8\x90\x86\x11\x99\xea\xdb\xb1\xc7\x8d\x16\xbb\xd2\x82\xc6\xf3\x75\x43\x1f\xa8\x6f\x9c\x2b\x54\xb7\xdf\xdd\xfd\x20\xbe\xa3\xe9\x15\xc6\x05\x53\xa8\x21\x6d\xd3\xe3\x71\xfa\x9c\xa6\xb7\x63\xfa\x87\xae\x82\x10\x2f\x3d\x2e\x68\x93\x7d\x3d\xcd\x21\x54\x91\x90\xb7\x54\xa3\xdb\x3f\x6f\x77\x63\x36\xef\x83\x47\x49\x8e\x22\xc1\xea\xda\xea\x92\x54\xbe\x50\xea\x8c\xd1\x7c\xfd\x5b\xe7\x4a\xf8\x52\xcb\xe6\x54\x5a\xa8\xc3\xcb\x53\x97\x06\x18\x5e\xb2\x8f\x7f\x4d\xfb\xd4\xde\x9f\x62\x17\x8c\xca\xb1\xe6\xb8\x0a\x4d\x84\x84\x35\x97\x0b\xf5\x1e\x00\x00\xff\xff\x59\x89\x24\x2f\x4c\x01\x00\x00") +var __0001_transfers_up_db_sql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x8c\x91\xd1\x72\xaa\x30\x10\x86\xef\xf3\x14\x7b\x29\x33\x79\x83\x73\x15\x60\xd1\xcc\xc9\x49\x4e\x43\xa8\xf5\x8a\x41\x4c\xab\xa3\x06\x4a\x60\xa6\xbe\x7d\x07\x01\xad\xad\xe3\xf4\x32\xbb\xd9\xdd\xef\xff\xff\x48\x23\x33\x08\x86\x85\x02\x81\x27\x20\x95\x01\x7c\xe1\xa9\x49\xa1\x6d\x0a\xe7\x5f\x6d\xe3\x61\x46\xb6\x85\xdf\xc2\x33\xd3\xd1\x82\x69\xc8\x24\x7f\xca\x90\x92\x62\xb3\x69\xac\xf7\x97\x7a\x3f\x2b\x33\x21\x28\x59\x1f\xf6\xf9\xcd\xc8\xb5\xd5\x7e\x40\x28\x54\x48\x49\x63\x4b\xbb\xab\xdb\xf1\xd5\x9e\x6a\x7b\xe7\x77\xa2\x34\xf2\xb9\x84\xbf\xb8\x9a\x4d\x4b\x03\xd0\x98\xa0\x46\x19\x61\x0a\xeb\x43\x55\xee\xfd\x6c\xa8\x2b\x09\x31\x0a\x34\x08\x11\x4b\x23\x16\x23\x25\x91\x92\xa9\xd1\x8c\x4b\x03\x9d\xdb\xbd\x77\x36\x9f\x64\xe5\x95\x3b\xaf\xcb\x27\x19\x83\x2c\x38\xef\xa2\x63\x31\x20\xc1\x1f\x42\x1e\x98\x34\xdc\xff\xee\xd0\x7f\xcd\xff\x31\xbd\xea\xb1\x29\x71\xdd\x71\x6d\x1b\x08\xf9\xbc\xa7\x18\xaf\x5c\x25\x6e\x6d\xb1\x81\x50\x29\x01\x31\x26\x2c\x13\x06\x12\x26\x52\x24\x01\x2c\xb9\x59\xa8\xcc\x80\x56\x4b\x1e\x3f\xc6\x28\xca\xb2\xea\x5c\xeb\xf3\xb6\xca\x2f\x48\x8f\xf3\xb9\xc5\xba\xf6\xfc\xc9\x95\xc0\xa5\xf9\x69\xfe\x30\x71\xcf\xfe\xa9\xf3\xab\x00\x8e\x45\x5d\xef\xdc\x5b\xef\xff\x48\x38\x20\x4f\x44\x53\x0e\x63\x93\x7e\x39\xdd\xa7\xf1\x19\x00\x00\xff\xff\xfc\x91\xad\x60\xb2\x02\x00\x00") func _0001_transfers_up_db_sql() ([]byte, error) { return bindata_read( diff --git a/services/wallet/migrations/sql/0001_transfers.down.db.sql b/services/wallet/migrations/sql/0001_transfers.down.db.sql index 28d1120889..d82814a90f 100644 --- a/services/wallet/migrations/sql/0001_transfers.down.db.sql +++ b/services/wallet/migrations/sql/0001_transfers.down.db.sql @@ -1,2 +1,3 @@ DROP TABLE transfers; DROP TABLE blocks; +DROP TABLE accounts_to_blocks; diff --git a/services/wallet/migrations/sql/0001_transfers.up.db.sql b/services/wallet/migrations/sql/0001_transfers.up.db.sql index 213fc631b9..9b5a380aca 100644 --- a/services/wallet/migrations/sql/0001_transfers.up.db.sql +++ b/services/wallet/migrations/sql/0001_transfers.up.db.sql @@ -1,14 +1,24 @@ CREATE TABLE IF NOT EXISTS transfers ( -hash VARCHAR UNIQUE NOT NULL, +hash VARCHAR UNIQUE, +address VARCHAR NOT NULL, blk_hash VARCHAR NOT NULL, tx BLOB, receipt BLOB, type VARCHAR NOT NULL, -FOREIGN KEY(blk_hash) REFERENCES blocks(hash) ON DELETE CASCADE +FOREIGN KEY(blk_hash) REFERENCES blocks(hash) ON DELETE CASCADE, +CONSTRAINT unique_transfer_on_hash_address UNIQUE (hash,address) ); CREATE TABLE IF NOT EXISTS blocks ( hash VARCHAR PRIMARY KEY, number BIGINT UNIQUE NOT NULL, -header BLOB +head BOOL DEFAULT FALSE ) WITHOUT ROWID; + +CREATE TABLE IF NOT EXISTS accounts_to_blocks ( +address VARCHAR NOT NULL, +blk_number BIGINT NOT NULL, +sync INT, +FOREIGN KEY(blk_number) REFERENCES blocks(number) ON DELETE CASCADE, +CONSTRAINT unique_mapping_on_address_block_number UNIQUE (address,blk_number) +); diff --git a/services/wallet/query.go b/services/wallet/query.go new file mode 100644 index 0000000000..73d94659eb --- /dev/null +++ b/services/wallet/query.go @@ -0,0 +1,86 @@ +package wallet + +import ( + "bytes" + "database/sql" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +const baseTransfersQuery = "SELECT transfers.hash, type, blocks.hash, blocks.number, address, tx, receipt FROM transfers JOIN blocks ON blk_hash = blocks.hash" + +func newTransfersQuery() *transfersQuery { + buf := bytes.NewBuffer(nil) + buf.WriteString(baseTransfersQuery) + return &transfersQuery{buf: buf} +} + +type transfersQuery struct { + buf *bytes.Buffer + args []interface{} + added bool +} + +func (q *transfersQuery) andOrWhere() { + if q.added { + q.buf.WriteString(" AND") + } else { + q.buf.WriteString(" WHERE") + } +} + +func (q *transfersQuery) FilterStart(start *big.Int) *transfersQuery { + if start != nil { + q.andOrWhere() + q.added = true + q.buf.WriteString(" blocks.number >= ?") + q.args = append(q.args, (*SQLBigInt)(start)) + } + return q +} + +func (q *transfersQuery) FilterEnd(end *big.Int) *transfersQuery { + if end != nil { + q.andOrWhere() + q.added = true + q.buf.WriteString(" blocks.number <= ?") + q.args = append(q.args, (*SQLBigInt)(end)) + } + return q +} + +func (q *transfersQuery) FilterAddress(address common.Address) *transfersQuery { + q.andOrWhere() + q.added = true + q.buf.WriteString(" address = ?") + q.args = append(q.args, address) + return q +} + +func (q *transfersQuery) String() string { + return q.buf.String() +} + +func (q *transfersQuery) Args() []interface{} { + return q.args +} + +func (q *transfersQuery) Scan(rows *sql.Rows) (rst []Transfer, err error) { + for rows.Next() { + transfer := Transfer{ + BlockNumber: &big.Int{}, + Transaction: &types.Transaction{}, + Receipt: &types.Receipt{}, + } + err = rows.Scan( + &transfer.ID, &transfer.Type, &transfer.BlockHash, (*SQLBigInt)(transfer.BlockNumber), &transfer.Address, + &JSONBlob{transfer.Transaction}, &JSONBlob{transfer.Receipt}) + if err != nil { + return nil, err + } + rst = append(rst, transfer) + } + return rst, nil +} diff --git a/services/wallet/reactor.go b/services/wallet/reactor.go index f1d0d6e70e..abb412d21a 100644 --- a/services/wallet/reactor.go +++ b/services/wallet/reactor.go @@ -11,7 +11,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/log" "github.com/status-im/status-go/params" ) @@ -22,198 +21,94 @@ func pollingPeriodByChain(chain *big.Int) time.Duration { switch chain.Int64() { case int64(params.MainNetworkID): return 10 * time.Second + case int64(params.RopstenNetworkID): + return 2 * time.Second default: return 500 * time.Millisecond } } +var ( + reorgSafetyDepth = big.NewInt(15) + erc20BatchSize = big.NewInt(100000) +) + // HeaderReader interface for reading headers using block number or hash. type HeaderReader interface { HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) } +// BalanceReader interface for reading balance at a specifeid address. +type BalanceReader interface { + BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) +} + +type reactorClient interface { + HeaderReader + BalanceReader +} + // NewReactor creates instance of the Reactor. -func NewReactor(db *Database, feed *event.Feed, client *ethclient.Client, address common.Address, chain *big.Int) *Reactor { - reactor := &Reactor{ - db: db, - client: client, - feed: feed, - address: address, - chain: chain, +func NewReactor(db *Database, feed *event.Feed, client *ethclient.Client, accounts []common.Address, chain *big.Int) *Reactor { + return &Reactor{ + db: db, + client: client, + feed: feed, + accounts: accounts, + chain: chain, } - reactor.erc20 = NewERC20TransfersDownloader(client, address) - reactor.eth = ÐTransferDownloader{ - client: client, - address: address, - signer: types.NewEIP155Signer(chain), - } - return reactor } // Reactor listens to new blocks and stores transfers into the database. type Reactor struct { - client HeaderReader - db *Database - feed *event.Feed - address common.Address - chain *big.Int - - eth *ETHTransferDownloader - erc20 *ERC20TransfersDownloader + client *ethclient.Client + db *Database + feed *event.Feed + accounts []common.Address + chain *big.Int - wg sync.WaitGroup - quit chan struct{} + mu sync.Mutex + group *Group } // Start runs reactor loop in background. func (r *Reactor) Start() error { - if r.quit != nil { + r.mu.Lock() + defer r.mu.Unlock() + if r.group != nil { return errors.New("already running") } - r.quit = make(chan struct{}) - r.wg.Add(1) - go func() { - log.Info("wallet reactor started", "address", r.address.String()) - r.loop() - r.wg.Done() - }() + r.group = NewGroup(context.Background()) + // TODO(dshulyak) to support adding accounts in runtime implement keyed group + // and export private api to start downloaders from accounts + // private api should have access only to reactor + ctl := &controlCommand{ + db: r.db, + chain: r.chain, + client: r.client, + accounts: r.accounts, + eth: ÐTransferDownloader{ + client: r.client, + accounts: r.accounts, + signer: types.NewEIP155Signer(r.chain), + }, + erc20: NewERC20TransfersDownloader(r.client, r.accounts), + feed: r.feed, + safetyDepth: reorgSafetyDepth, + } + r.group.Add(ctl.Command()) return nil } // Stop stops reactor loop and waits till it exits. func (r *Reactor) Stop() { - if r.quit == nil { + r.mu.Lock() + defer r.mu.Unlock() + if r.group == nil { return } - close(r.quit) - r.wg.Wait() - r.quit = nil -} - -func (r *Reactor) loop() { - var ( - ticker = time.NewTicker(pollingPeriodByChain(r.chain)) - latest *types.Header - err error - ) - defer ticker.Stop() - for { - select { - case <-r.quit: - return - case <-ticker.C: - var num *big.Int - if latest == nil { - latest, err = r.db.LastHeader() - if err != nil { - log.Error("failed to read last header from database", "error", err) - continue - } - } - if latest != nil { - num = new(big.Int).Add(latest.Number, one) - } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - header, err := r.client.HeaderByNumber(ctx, num) - cancel() - if err != nil { - log.Error("failed to get latest block", "number", latest, "error", err) - continue - } - log.Debug("reactor received new block", "header", header.Hash()) - ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) - added, removed, err := r.onNewBlock(ctx, latest, header) - cancel() - if err != nil { - log.Error("failed to process new header", "header", header, "error", err) - continue - } - // for each added block get tranfers from downloaders - all := []Transfer{} - for i := range added { - log.Debug("reactor get transfers", "block", added[i].Hash(), "number", added[i].Number) - transfers, err := r.getTransfers(added[i]) - if err != nil { - log.Error("failed to get transfers", "header", header, "error", err) - continue - } - log.Debug("reactor adding transfers", "block", added[i].Hash(), "number", added[i].Number, "len", len(transfers)) - all = append(all, transfers...) - } - err = r.db.ProcessTranfers(all, added, removed) - if err != nil { - log.Error("failed to persist transfers", "error", err) - continue - } - latest = header - - if len(added) == 1 && len(removed) == 0 { - r.feed.Send(Event{ - Type: EventNewBlock, - BlockNumber: added[0].Number, - }) - } - if len(removed) != 0 { - lth := len(removed) - r.feed.Send(Event{ - Type: EventReorg, - BlockNumber: removed[lth-1].Number, - }) - } - } - } -} - -// getTransfers fetches erc20 and eth transfers and returns single slice with them. -func (r *Reactor) getTransfers(header *types.Header) ([]Transfer, error) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - ethT, err := r.eth.GetTransfers(ctx, header) - cancel() - if err != nil { - return nil, err - } - ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) - erc20T, err := r.erc20.GetTransfers(ctx, header) - cancel() - if err != nil { - return nil, err - } - return append(ethT, erc20T...), nil -} - -// onNewBlock verifies if latest block extends current canonical chain view. In case if it doesn't it will find common -// parrent and replace all blocks after that parent. -func (r *Reactor) onNewBlock(ctx context.Context, previous, latest *types.Header) (added, removed []*types.Header, err error) { - if previous == nil { - // first node in the cache - return []*types.Header{latest}, nil, nil - } - if previous.Hash() == latest.ParentHash { - // parent matching previous node in the cache. on the same chain. - return []*types.Header{latest}, nil, nil - } - exists, err := r.db.HeaderExists(latest.Hash()) - if err != nil { - return nil, nil, err - } - if exists { - return nil, nil, nil - } - // reorg - log.Debug("wallet reactor spotted reorg", "last header in db", previous.Hash(), "new parent", latest.ParentHash) - for previous != nil && previous.Hash() != latest.ParentHash { - removed = append(removed, previous) - added = append(added, latest) - latest, err = r.client.HeaderByHash(ctx, latest.ParentHash) - if err != nil { - return nil, nil, err - } - previous, err = r.db.GetHeaderByNumber(new(big.Int).Sub(latest.Number, one)) - if err != nil { - return nil, nil, err - } - } - added = append(added, latest) - return added, removed, nil + r.group.Stop() + r.group.Wait() + r.group = nil } diff --git a/services/wallet/reactor_test.go b/services/wallet/reactor_test.go deleted file mode 100644 index bdf547695d..0000000000 --- a/services/wallet/reactor_test.go +++ /dev/null @@ -1,121 +0,0 @@ -package wallet - -import ( - "context" - "errors" - "math/big" - "sort" - "testing" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/stretchr/testify/require" -) - -type headers []*types.Header - -func (h headers) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { - for _, item := range h { - if item.Hash() == hash { - return item, nil - } - } - return nil, errors.New("not found") -} - -func (h headers) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { - for _, item := range h { - if item.Number.Cmp(number) == 0 { - return item, nil - } - } - return nil, errors.New("not found") -} - -func TestReactorReorgOnNewBlock(t *testing.T) { - db, stop := setupTestDB(t) - defer stop() - original := genHeadersChain(5, 1) - require.NoError(t, db.SaveHeaders(original)) - // rewrite parents ater 2nd block - reorg := make(headers, 5) - copy(reorg, original[:2]) - for i := 2; i < len(reorg); i++ { - reorg[i] = &types.Header{ - Number: big.NewInt(int64(i)), - Difficulty: big.NewInt(2), // actual difficulty is not important. using it to change a hash - Time: big.NewInt(1), - ParentHash: reorg[i-1].Hash(), - } - } - reactor := Reactor{ - client: reorg, - db: db, - } - latest := &types.Header{ - Number: big.NewInt(5), - Difficulty: big.NewInt(2), - Time: big.NewInt(1), - ParentHash: reorg[len(reorg)-1].Hash(), - } - previous := original[len(original)-1] - added, removed, err := reactor.onNewBlock(context.TODO(), previous, latest) - require.NoError(t, err) - require.Len(t, added, 4) - require.Len(t, removed, 3) - - sort.Slice(removed, func(i, j int) bool { - return removed[i].Number.Cmp(removed[j].Number) < 1 - }) - for i, h := range original[2:] { - require.Equal(t, h.Hash(), removed[i].Hash()) - } - - expected := make([]*types.Header, 4) - copy(expected, reorg[2:]) - expected[3] = latest - sort.Slice(added, func(i, j int) bool { - return added[i].Number.Cmp(added[j].Number) < 1 - }) - for i, h := range expected { - require.Equal(t, h.Hash(), added[i].Hash()) - } -} - -func TestReactorReorgAllKnownHeaders(t *testing.T) { - db, stop := setupTestDB(t) - defer stop() - original := genHeadersChain(2, 1) - reorg := make(headers, 2) - copy(reorg, genHeadersChain(2, 2)) - latest := &types.Header{ - Number: big.NewInt(2), - Difficulty: big.NewInt(2), - Time: big.NewInt(1), - ParentHash: reorg[len(reorg)-1].Hash(), - } - require.NoError(t, db.SaveHeaders(original)) - reactor := Reactor{ - client: reorg, - db: db, - } - added, removed, err := reactor.onNewBlock(context.TODO(), original[len(original)-1], latest) - require.NoError(t, err) - require.Len(t, added, 3) - require.Len(t, removed, 2) -} - -func genHeadersChain(size, difficulty int) []*types.Header { - rst := make([]*types.Header, size) - for i := 0; i < size; i++ { - rst[i] = &types.Header{ - Number: big.NewInt(int64(i)), - Difficulty: big.NewInt(int64(difficulty)), - Time: big.NewInt(1), - } - if i != 0 { - rst[i].ParentHash = rst[i-1].Hash() - } - } - return rst -} diff --git a/services/wallet/service.go b/services/wallet/service.go index 043632ab90..17670884f7 100644 --- a/services/wallet/service.go +++ b/services/wallet/service.go @@ -6,6 +6,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rpc" ) @@ -33,12 +34,12 @@ func (s *Service) Start(*p2p.Server) error { } // StartReactor separately because it requires known ethereum address, which will become available only after login. -func (s *Service) StartReactor(dbpath string, client *ethclient.Client, address common.Address, chain *big.Int) error { - db, err := InitializeDB(dbpath) +func (s *Service) StartReactor(dbpath, password string, client *ethclient.Client, accounts []common.Address, chain *big.Int) error { + db, err := InitializeDB(dbpath, password) if err != nil { return err } - reactor := NewReactor(db, s.feed, client, address, chain) + reactor := NewReactor(db, s.feed, client, accounts, chain) err = reactor.Start() if err != nil { return err @@ -62,8 +63,11 @@ func (s *Service) StopReactor() error { // Stop reactor, signals transmitter and close db. func (s *Service) Stop() error { + log.Info("wallet will be stopped") + err := s.StopReactor() s.signals.Stop() - return s.StopReactor() + log.Info("wallet stopped") + return err } // APIs returns list of available RPC APIs. @@ -72,7 +76,7 @@ func (s *Service) APIs() []rpc.API { { Namespace: "wallet", Version: "0.1.0", - Service: &API{s}, + Service: NewAPI(s), Public: true, }, } diff --git a/sqlite/sqlite.go b/sqlite/sqlite.go index fc9ba6ea12..afe173c5d2 100644 --- a/sqlite/sqlite.go +++ b/sqlite/sqlite.go @@ -2,12 +2,13 @@ package sqlite import ( "database/sql" + "errors" "fmt" _ "github.com/mutecomm/go-sqlcipher" // We require go sqlcipher that overrides default implementation ) -func openDB(path string) (*sql.DB, error) { +func openDB(path, key string) (*sql.DB, error) { db, err := sql.Open("sqlite3", path) if err != nil { return nil, err @@ -19,6 +20,11 @@ func openDB(path string) (*sql.DB, error) { if _, err = db.Exec("PRAGMA foreign_keys=ON"); err != nil { return nil, err } + keyString := fmt.Sprintf("PRAGMA key = '%s'", key) + if _, err = db.Exec(keyString); err != nil { + return nil, errors.New("failed to set key pragma") + } + // readers do not block writers and faster i/o operations // https://www.sqlite.org/draft/wal.html // must be set after db is encrypted @@ -35,6 +41,6 @@ func openDB(path string) (*sql.DB, error) { } // OpenDB opens not-encrypted database. -func OpenDB(path string) (*sql.DB, error) { - return openDB(path) +func OpenDB(path, key string) (*sql.DB, error) { + return openDB(path, key) } diff --git a/t/devtests/testchain/node.go b/t/devtests/testchain/node.go new file mode 100644 index 0000000000..a26578db21 --- /dev/null +++ b/t/devtests/testchain/node.go @@ -0,0 +1,80 @@ +package testchain + +import ( + "crypto/ecdsa" + "math/big" + + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/params" +) + +type Backend struct { + Node *node.Node + Client *ethclient.Client + genesis *core.Genesis + Ethereum *eth.Ethereum + Faucet *ecdsa.PrivateKey + Signer types.Signer +} + +func NewBackend() (*Backend, error) { + faucet, err := crypto.GenerateKey() + if err != nil { + return nil, err + } + config := params.AllEthashProtocolChanges + genesis := &core.Genesis{ + Config: config, + Alloc: core.GenesisAlloc{crypto.PubkeyToAddress(faucet.PublicKey): {Balance: big.NewInt(1e18)}}, + ExtraData: []byte("test genesis"), + Timestamp: 9000, + } + var ethservice *eth.Ethereum + n, err := node.New(&node.Config{}) + if err != nil { + return nil, err + } + err = n.Register(func(ctx *node.ServiceContext) (node.Service, error) { + config := ð.Config{Genesis: genesis} + config.Ethash.PowMode = ethash.ModeFake + ethservice, err = eth.New(ctx, config) + return ethservice, err + }) + if err != nil { + return nil, err + } + + if err := n.Start(); err != nil { + return nil, err + } + client, err := n.Attach() + if err != nil { + return nil, err + } + return &Backend{ + Node: n, + Client: ethclient.NewClient(client), + Ethereum: ethservice, + Faucet: faucet, + Signer: types.NewEIP155Signer(config.ChainID), + genesis: genesis, + }, nil +} + +// GenerateBlocks generate n blocks starting from genesis. +func (b *Backend) GenerateBlocks(n int, start uint64, gen func(int, *core.BlockGen)) []*types.Block { + block := b.Ethereum.BlockChain().GetBlockByNumber(start) + engine := ethash.NewFaker() + blocks, _ := core.GenerateChain(b.genesis.Config, block, engine, b.Ethereum.ChainDb(), n, gen) + return blocks +} + +func (b *Backend) Stop() error { + return b.Node.Stop() +} diff --git a/t/devtests/tranfers_test.go b/t/devtests/tranfers_test.go index 246759d1b9..097d5271f0 100644 --- a/t/devtests/tranfers_test.go +++ b/t/devtests/tranfers_test.go @@ -9,7 +9,9 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" + "github.com/status-im/status-go/account" "github.com/status-im/status-go/services/wallet" "github.com/status-im/status-go/t/utils" "github.com/stretchr/testify/suite" @@ -22,22 +24,33 @@ func TestTransfersSuite(t *testing.T) { type TransfersSuite struct { DevNodeSuite - Address common.Address + Password string + Info account.Info + Address common.Address +} + +func (s *TransfersSuite) SelectAccount() { + s.Require().NoError(s.backend.SelectAccount(s.Info.WalletAddress, s.Info.ChatAddress, s.Password)) + _, err := s.backend.AccountManager().SelectedWalletAccount() + s.Require().NoError(err) } func (s *TransfersSuite) SetupTest() { s.DevNodeSuite.SetupTest() - password := "test" - info, _, err := s.backend.AccountManager().CreateAccount(password) - s.Require().NoError(err) - s.Require().NoError(s.backend.SelectAccount(info.WalletAddress, info.ChatAddress, password)) - account, err := s.backend.AccountManager().SelectedWalletAccount() + s.Password = "test" + info, _, err := s.backend.AccountManager().CreateAccount(s.Password) s.Require().NoError(err) - s.Address = account.Address + s.Info = info + s.Address = common.HexToAddress(info.WalletAddress) +} + +func (s *TransfersSuite) TearDownTest() { + s.Require().NoError(s.backend.Logout()) + s.DevNodeSuite.TearDownTest() } func (s *TransfersSuite) getAllTranfers() (rst []wallet.Transfer, err error) { - return rst, s.Local.Call(&rst, "wallet_getTransfers", big.NewInt(0)) + return rst, s.Local.Call(&rst, "wallet_getTransfersByAddress", s.Address, (*hexutil.Big)(big.NewInt(0))) } func (s *TransfersSuite) sendTx(nonce uint64, to common.Address) { @@ -52,7 +65,8 @@ func (s *TransfersSuite) sendTx(nonce uint64, to common.Address) { s.Require().NoError(err) } -func (s *TransfersSuite) TestEventuallySynced() { +func (s *TransfersSuite) TestNewTransfers() { + s.SelectAccount() s.sendTx(0, s.Address) s.Require().NoError(utils.Eventually(func() error { all, err := s.getAllTranfers() @@ -63,7 +77,7 @@ func (s *TransfersSuite) TestEventuallySynced() { return fmt.Errorf("waiting for one transfer") } return nil - }, 10*time.Second, 1*time.Second)) + }, 20*time.Second, 1*time.Second)) go func() { for i := 1; i < 10; i++ { @@ -76,7 +90,24 @@ func (s *TransfersSuite) TestEventuallySynced() { return err } if len(all) != 10 { - return fmt.Errorf("waiting for ten transfers") + return fmt.Errorf("waiting for 10 transfers") + } + return nil + }, 30*time.Second, 1*time.Second)) +} + +func (s *TransfersSuite) TestHistoricalTransfers() { + for i := 0; i < 30; i++ { + s.sendTx(uint64(i), s.Address) + } + s.SelectAccount() + s.Require().NoError(utils.Eventually(func() error { + all, err := s.getAllTranfers() + if err != nil { + return err + } + if len(all) >= 30 { + return fmt.Errorf("waiting for atleast 30 transfers") } return nil }, 30*time.Second, 1*time.Second))