Skip to content

Commit

Permalink
Generate unique ID for every transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
dshulyak committed Jun 6, 2019
1 parent 55744d3 commit dbb7499
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 57 deletions.
1 change: 1 addition & 0 deletions services/wallet/concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (r *Result) Get() []Transfer {
}

func NewConcurrentRunner(ctx context.Context) *ConcurrentRunner {
// TODO(dshulyak) rename to atomic group and keep interface consistent with regular Group.
ctx, cancel := context.WithCancel(ctx)
return &ConcurrentRunner{
ctx: ctx,
Expand Down
27 changes: 3 additions & 24 deletions services/wallet/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (db *Database) GetTransfersByAddress(address common.Address, start, end *bi
return
}
defer rows.Close()
return scanTransfers(rows)
return query.Scan(rows)
}

// GetTransfers load transfers transfer betweeen two blocks.
Expand All @@ -154,11 +154,8 @@ func (db *Database) GetTransfers(start, end *big.Int) (rst []Transfer, err error
if err != nil {
return
}
if err != nil {
return
}
defer rows.Close()
return scanTransfers(rows)
return query.Scan(rows)
}

// SaveHeader stores a single header.
Expand Down Expand Up @@ -289,24 +286,6 @@ SELECT blocks.hash, blk_number FROM accounts_to_blocks JOIN blocks ON blk_number
return nil, nil
}

func scanTransfers(rows *sql.Rows) (rst []Transfer, err error) {
for rows.Next() {
transfer := Transfer{
BlockNumber: &big.Int{},
Transaction: &types.Transaction{},
Receipt: &types.Receipt{},
}
err = rows.Scan(
&transfer.Type, &transfer.BlockHash, (*SQLBigInt)(transfer.BlockNumber), &transfer.Address,
&JSONBlob{transfer.Transaction}, &JSONBlob{transfer.Receipt})
if err != nil {
return nil, err
}
rst = append(rst, transfer)
}
return rst, nil
}

// statementCreator allows to pass transaction or database to use in consumer.
type statementCreator interface {
Prepare(query string) (*sql.Stmt, error)
Expand Down Expand Up @@ -346,7 +325,7 @@ func insertTransfers(creator statementCreator, transfers []Transfer) error {
return err
}
for _, t := range transfers {
_, err = insert.Exec(t.Transaction.Hash(), t.BlockHash, t.Address, &JSONBlob{t.Transaction}, &JSONBlob{t.Receipt}, t.Type)
_, err = insert.Exec(t.ID, t.BlockHash, t.Address, &JSONBlob{t.Transaction}, &JSONBlob{t.Receipt}, t.Type)
if err != nil {
return err
}
Expand Down
7 changes: 5 additions & 2 deletions services/wallet/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func TestDBProcessTransfer(t *testing.T) {
tx := types.NewTransaction(1, common.Address{1}, nil, 10, big.NewInt(10), nil)
transfers := []Transfer{
{
ID: common.Hash{1},
Type: ethTransfer,
BlockHash: header.Hash,
BlockNumber: header.Number,
Expand All @@ -131,10 +132,10 @@ func TestDBReorgTransfers(t *testing.T) {
originalTX := types.NewTransaction(1, common.Address{1}, nil, 10, big.NewInt(10), nil)
replacedTX := types.NewTransaction(2, common.Address{1}, nil, 10, big.NewInt(10), nil)
require.NoError(t, db.ProcessTranfers([]Transfer{
{ethTransfer, *originalTX.To(), original.Number, original.Hash, originalTX, rcpt},
{ethTransfer, common.Hash{1}, *originalTX.To(), original.Number, original.Hash, originalTX, rcpt},
}, []*DBHeader{original}, nil, 0))
require.NoError(t, db.ProcessTranfers([]Transfer{
{ethTransfer, *replacedTX.To(), replaced.Number, replaced.Hash, replacedTX, rcpt},
{ethTransfer, common.Hash{2}, *replacedTX.To(), replaced.Number, replaced.Hash, replacedTX, rcpt},
}, []*DBHeader{replaced}, []*DBHeader{original}, 0))

all, err := db.GetTransfers(big.NewInt(0), nil)
Expand All @@ -158,6 +159,7 @@ func TestDBGetTransfersFromBlock(t *testing.T) {
receipt := types.NewReceipt(nil, false, 100)
receipt.Logs = []*types.Log{}
transfer := Transfer{
ID: tx.Hash(),
Type: ethTransfer,
BlockNumber: header.Number,
BlockHash: header.Hash,
Expand Down Expand Up @@ -223,6 +225,7 @@ func TestDBProcessTransfersUpdate(t *testing.T) {
Hash: common.Hash{1},
}
transfer := Transfer{
ID: common.Hash{1},
BlockNumber: header.Number,
BlockHash: header.Hash,
Transaction: types.NewTransaction(0, common.Address{}, nil, 0, nil, nil),
Expand Down
73 changes: 43 additions & 30 deletions services/wallet/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package wallet

import (
"context"
"encoding/binary"
"errors"
"math/big"
"time"
Expand All @@ -25,14 +26,15 @@ const (
)

var (
one = big.NewInt(1)
zero = big.NewInt(0)
one = big.NewInt(1)
two = big.NewInt(2)
)

// Transfer stores information about transfer.
type Transfer struct {
Type TransferType `json:"type"`
ID common.Hash `json:"-"`
Address common.Address `json:"address"`
BlockNumber *big.Int `json:"blockNumber"`
BlockHash common.Hash `json:"blockhash"`
Expand Down Expand Up @@ -94,37 +96,33 @@ func (d *ETHTransferDownloader) GetTransfersByNumber(ctx context.Context, number

func (d *ETHTransferDownloader) getTransfersInBlock(ctx context.Context, blk *types.Block, accounts []common.Address) (rst []Transfer, err error) {
for _, tx := range blk.Transactions() {
var address *common.Address
from, err := types.Sender(d.signer, tx)
if err != nil {
return nil, err
}
// payload is empty for eth transfers
if any(from, accounts) {
receipt, err := d.client.TransactionReceipt(ctx, tx.Hash())
if err != nil {
return nil, err
}
rst = append(rst, Transfer{Type: ethTransfer,
Address: from,
BlockNumber: blk.Number(),
BlockHash: blk.Hash(),
Transaction: tx, Receipt: receipt})
continue
address = &from
} else if tx.To() != nil && any(*tx.To(), accounts) {
address = tx.To()
}
if tx.To() == nil {
continue
}
if any(*tx.To(), accounts) {
if address != nil {
receipt, err := d.client.TransactionReceipt(ctx, tx.Hash())
if err != nil {
return nil, err
}
rst = append(rst, Transfer{Type: ethTransfer,
Address: *tx.To(),
if isTokenTransfer(receipt.Logs) {
log.Debug("eth downloader found token transfer", "hash", tx.Hash())
continue
}
rst = append(rst, Transfer{
Type: ethTransfer,
ID: tx.Hash(),
Address: *address,
BlockNumber: blk.Number(),
BlockHash: blk.Hash(),
Transaction: tx, Receipt: receipt})
continue

}
}
// TODO(dshulyak) test that balance difference was covered by transactions
Expand Down Expand Up @@ -164,7 +162,7 @@ func (d *ERC20TransfersDownloader) outboundTopics(address common.Address) [][]co
return [][]common.Hash{{d.signature}, {d.paddedAddress(address)}, {}}
}

func (d *ERC20TransfersDownloader) tranasferFromLogs(parent context.Context, log types.Log, address common.Address) (Transfer, error) {
func (d *ERC20TransfersDownloader) transferFromLog(parent context.Context, log types.Log, address common.Address) (Transfer, error) {
ctx, cancel := context.WithTimeout(parent, 3*time.Second)
tx, _, err := d.client.TransactionByHash(ctx, log.TxHash)
cancel()
Expand All @@ -177,8 +175,13 @@ func (d *ERC20TransfersDownloader) tranasferFromLogs(parent context.Context, log
if err != nil {
return Transfer{}, err
}
// TODO(dshulyak) what is the max number of logs?
index := [4]byte{}
binary.BigEndian.PutUint32(index[:], uint32(log.Index))
id := crypto.Keccak256Hash(log.TxHash.Bytes(), index[:])
return Transfer{
Address: address,
ID: id,
Type: erc20Transfer,
BlockNumber: new(big.Int).SetUint64(log.BlockNumber),
BlockHash: log.BlockHash,
Expand All @@ -192,7 +195,7 @@ func (d *ERC20TransfersDownloader) transfersFromLogs(parent context.Context, log
for i := range logs {
l := logs[i]
concurrent.Go(func(ctx context.Context) error {
transfer, err := d.tranasferFromLogs(ctx, l, address)
transfer, err := d.transferFromLog(ctx, l, address)
if err != nil {
return err
}
Expand All @@ -208,15 +211,6 @@ func (d *ERC20TransfersDownloader) transfersFromLogs(parent context.Context, log
return concurrent.Get(), nil
}

func any(address common.Address, compare []common.Address) bool {
for _, c := range compare {
if c == address {
return true
}
}
return false
}

// GetTransfers for erc20 uses eth_getLogs rpc with Transfer event signature and our address acount.
func (d *ERC20TransfersDownloader) GetTransfers(ctx context.Context, header *DBHeader) ([]Transfer, error) {
hash := header.Hash
Expand Down Expand Up @@ -289,3 +283,22 @@ func (d *ERC20TransfersDownloader) GetTransfersInRange(parent context.Context, f
log.Debug("found erc20 transfers between two blocks", "from", from, "to", to, "lth", len(transfers), "took", time.Since(start))
return transfers, nil
}

func any(address common.Address, compare []common.Address) bool {
for _, c := range compare {
if c == address {
return true
}
}
return false
}

func isTokenTransfer(logs []*types.Log) bool {
signature := crypto.Keccak256Hash([]byte(erc20TransferEventSignature))
for _, l := range logs {
if len(l.Topics) > 0 && l.Topics[0] == signature {
return true
}
}
return false
}
22 changes: 21 additions & 1 deletion services/wallet/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package wallet

import (
"bytes"
"database/sql"
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)

const baseTransfersQuery = "SELECT type, blocks.hash, blocks.number, address, tx, receipt FROM transfers JOIN blocks ON blk_hash = blocks.hash"
const baseTransfersQuery = "SELECT transfers.hash, type, blocks.hash, blocks.number, address, tx, receipt FROM transfers JOIN blocks ON blk_hash = blocks.hash"

func newTransfersQuery() *transfersQuery {
buf := bytes.NewBuffer(nil)
Expand Down Expand Up @@ -64,3 +66,21 @@ func (q *transfersQuery) String() string {
func (q *transfersQuery) Args() []interface{} {
return q.args
}

func (q *transfersQuery) Scan(rows *sql.Rows) (rst []Transfer, err error) {
for rows.Next() {
transfer := Transfer{
BlockNumber: &big.Int{},
Transaction: &types.Transaction{},
Receipt: &types.Receipt{},
}
err = rows.Scan(
&transfer.ID, &transfer.Type, &transfer.BlockHash, (*SQLBigInt)(transfer.BlockNumber), &transfer.Address,
&JSONBlob{transfer.Transaction}, &JSONBlob{transfer.Receipt})
if err != nil {
return nil, err
}
rst = append(rst, transfer)
}
return rst, nil
}

0 comments on commit dbb7499

Please sign in to comment.