Skip to content

Commit

Permalink
Update accounts with all blocks in the set
Browse files Browse the repository at this point in the history
  • Loading branch information
dshulyak committed Jun 7, 2019
1 parent a059cf8 commit 72ca011
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 37 deletions.
17 changes: 10 additions & 7 deletions services/wallet/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) {
return err
}
}
log.Info("initialized downloader for eth historical transfers", "address", c.address, "starting at", c.previous.Number)
log.Debug("initialized downloader for eth historical transfers", "address", c.address, "starting at", c.previous.Number)
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
Expand All @@ -62,7 +62,7 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) {
transfers := concurrent.Get()
log.Info("eth historical downloader finished succesfully", "total transfers", len(transfers), "time", time.Since(start))
// TODO(dshulyak) insert 0 block number with transfers
err = c.db.ProcessTranfers(transfers, headersFromTransfers(transfers), nil, ethSync)
err = c.db.ProcessTranfers(transfers, []common.Address{c.address}, headersFromTransfers(transfers), nil, ethSync)
if err != nil {
log.Error("failed to save downloaded erc20 transfers", "error", err)
return err
Expand Down Expand Up @@ -106,23 +106,26 @@ func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) {
log.Error("failed to setup historical downloader for erc20")
return err
}
log.Info("initialized downloader for erc20 historical transfers", "address", c.address, "starting at", c.iterator.Header().Number)
log.Debug("initialized downloader for erc20 historical transfers", "address", c.address, "starting at", c.iterator.Header().Number)
}
for !c.iterator.Finished() {
start := time.Now()
transfers, err := c.iterator.Next(ctx)
if err != nil {
log.Error("failed to get next batch", "error", err)
break
}
headers := headersFromTransfers(transfers)
log.Info("storing header of the iterator", "header", c.iterator.Header().Number)
headers = append(headers, c.iterator.Header())
err = c.db.ProcessTranfers(transfers, headers, nil, erc20Sync)
err = c.db.ProcessTranfers(transfers, []common.Address{c.address}, headers, nil, erc20Sync)
if err != nil {
c.iterator.Revert()
log.Error("failed to save downloaded erc20 transfers", "error", err)
return err
}
if len(transfers) > 0 {
log.Debug("erc20 downloader imported transfers", "len", len(transfers), "time", time.Since(start))
c.feed.Send(Event{
Type: EventNewHistory,
BlockNumber: c.iterator.Header().Number,
Expand All @@ -136,6 +139,7 @@ func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) {

type newBlocksTransfersCommand struct {
db *Database
accounts []common.Address
chain *big.Int
erc20 *ERC20TransfersDownloader
eth *ETHTransferDownloader
Expand All @@ -160,7 +164,7 @@ func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) {
log.Error("failed to get last known header", "error", err)
return err
}
log.Info("initialized downloader for new blocks transfers", "starting at", c.previous.Number)
log.Debug("initialized downloader for new blocks transfers", "starting at", c.previous.Number)
}
num := new(big.Int).Add(c.previous.Number, one)
ctx, cancel := context.WithTimeout(parent, 5*time.Second)
Expand Down Expand Up @@ -190,7 +194,7 @@ func (c *newBlocksTransfersCommand) Run(parent context.Context) (err error) {
log.Debug("reactor adding transfers", "block", added[i].Hash, "number", added[i].Number, "len", len(transfers))
all = append(all, transfers...)
}
err = c.db.ProcessTranfers(all, added, removed, erc20Sync|ethSync)
err = c.db.ProcessTranfers(all, c.accounts, added, removed, erc20Sync|ethSync)
if err != nil {
log.Error("failed to persist transfers", "error", err)
return err
Expand Down Expand Up @@ -299,7 +303,6 @@ func lastKnownHeader(parent context.Context, db *Database, client HeaderReader,
if err != nil {
return nil, err
}
log.Info("head of the chain", "number", header.Number)
latest := toDBHeader(header)
diff := new(big.Int).Sub(latest.Number, safetyLimit)
if diff.Cmp(zero) <= 0 {
Expand Down
5 changes: 3 additions & 2 deletions services/wallet/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ func (s *NewBlocksSuite) SetupTest() {
s.address = crypto.PubkeyToAddress(account.PublicKey)
s.feed = &event.Feed{}
s.cmd = &newBlocksTransfersCommand{
db: s.db,
erc20: NewERC20TransfersDownloader(s.backend.Client, []common.Address{s.address}),
db: s.db,
accounts: []common.Address{s.address},
erc20: NewERC20TransfersDownloader(s.backend.Client, []common.Address{s.address}),
eth: &ETHTransferDownloader{
client: s.backend.Client,
signer: s.backend.Signer,
Expand Down
38 changes: 20 additions & 18 deletions services/wallet/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (db Database) Close() error {
}

// ProcessTranfers atomically adds/removes blocks and adds new tranfers.
func (db Database) ProcessTranfers(transfers []Transfer, added, removed []*DBHeader, option SyncOption) (err error) {
func (db Database) ProcessTranfers(transfers []Transfer, accounts []common.Address, added, removed []*DBHeader, option SyncOption) (err error) {
var (
tx *sql.Tx
)
Expand All @@ -132,7 +132,7 @@ func (db Database) ProcessTranfers(transfers []Transfer, added, removed []*DBHea
if err != nil {
return
}
err = updateAccounts(tx, transfers, option)
err = updateAccounts(tx, accounts, added, option)
return
}

Expand Down Expand Up @@ -351,7 +351,7 @@ func insertTransfers(creator statementCreator, transfers []Transfer) error {
return nil
}

func updateAccounts(creator statementCreator, transfers []Transfer, option SyncOption) error {
func updateAccounts(creator statementCreator, accounts []common.Address, headers []*DBHeader, option SyncOption) error {
update, err := creator.Prepare("UPDATE accounts_to_blocks SET sync=sync|? WHERE address=? AND blk_number=?")
if err != nil {
return err
Expand All @@ -360,21 +360,23 @@ func updateAccounts(creator statementCreator, transfers []Transfer, option SyncO
if err != nil {
return err
}
for _, t := range transfers {
rst, err := update.Exec(option, t.Address, (*SQLBigInt)(t.BlockNumber))
if err != nil {
return err
}
affected, err := rst.RowsAffected()
if err != nil {
return err
}
if affected > 0 {
continue
}
_, err = insert.Exec(t.Address, (*SQLBigInt)(t.BlockNumber), option)
if err != nil {
return err
for _, acc := range accounts {
for _, h := range headers {
rst, err := update.Exec(option, acc, (*SQLBigInt)(h.Number))
if err != nil {
return err
}
affected, err := rst.RowsAffected()
if err != nil {
return err
}
if affected > 0 {
continue
}
_, err = insert.Exec(acc, (*SQLBigInt)(h.Number), option)
if err != nil {
return err
}
}
}
return nil
Expand Down
14 changes: 7 additions & 7 deletions services/wallet/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestDBProcessTransfer(t *testing.T) {
Receipt: types.NewReceipt(nil, false, 100),
},
}
require.NoError(t, db.ProcessTranfers(transfers, []*DBHeader{header}, nil, 0))
require.NoError(t, db.ProcessTranfers(transfers, nil, []*DBHeader{header}, nil, 0))
}

func TestDBReorgTransfers(t *testing.T) {
Expand All @@ -134,10 +134,10 @@ func TestDBReorgTransfers(t *testing.T) {
replacedTX := types.NewTransaction(2, common.Address{1}, nil, 10, big.NewInt(10), nil)
require.NoError(t, db.ProcessTranfers([]Transfer{
{ethTransfer, common.Hash{1}, *originalTX.To(), original.Number, original.Hash, originalTX, rcpt},
}, []*DBHeader{original}, nil, 0))
}, nil, []*DBHeader{original}, nil, 0))
require.NoError(t, db.ProcessTranfers([]Transfer{
{ethTransfer, common.Hash{2}, *replacedTX.To(), replaced.Number, replaced.Hash, replacedTX, rcpt},
}, []*DBHeader{replaced}, []*DBHeader{original}, 0))
}, nil, []*DBHeader{replaced}, []*DBHeader{original}, 0))

all, err := db.GetTransfers(big.NewInt(0), nil)
require.NoError(t, err)
Expand Down Expand Up @@ -169,7 +169,7 @@ func TestDBGetTransfersFromBlock(t *testing.T) {
}
transfers = append(transfers, transfer)
}
require.NoError(t, db.ProcessTranfers(transfers, headers, nil, 0))
require.NoError(t, db.ProcessTranfers(transfers, nil, headers, nil, 0))
rst, err := db.GetTransfers(big.NewInt(7), nil)
require.NoError(t, err)
require.Len(t, rst, 3)
Expand Down Expand Up @@ -232,8 +232,8 @@ func TestDBProcessTransfersUpdate(t *testing.T) {
Transaction: types.NewTransaction(0, common.Address{}, nil, 0, nil, nil),
Address: address,
}
require.NoError(t, db.ProcessTranfers([]Transfer{transfer}, []*DBHeader{header}, nil, ethSync))
require.NoError(t, db.ProcessTranfers([]Transfer{transfer}, []*DBHeader{header}, nil, erc20Sync))
require.NoError(t, db.ProcessTranfers([]Transfer{transfer}, []common.Address{address}, []*DBHeader{header}, nil, ethSync))
require.NoError(t, db.ProcessTranfers([]Transfer{transfer}, []common.Address{address}, []*DBHeader{header}, nil, erc20Sync))

earliest, err := db.GetEarliestSynced(address, ethSync|erc20Sync)
require.NoError(t, err)
Expand All @@ -248,7 +248,7 @@ func TestDBLastHeadersReverseSorted(t *testing.T) {
for i := range headers {
headers[i] = &DBHeader{Hash: common.Hash{byte(i)}, Number: big.NewInt(int64(i))}
}
require.NoError(t, db.ProcessTranfers(nil, headers, nil, ethSync))
require.NoError(t, db.ProcessTranfers(nil, nil, headers, nil, ethSync))

headers, err := db.LastHeaders(big.NewInt(5))
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions services/wallet/iterative.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func SetupIterativeDownloader(
downloader: downloader,
}
earliest, err := db.GetEarliestSynced(address, option)
log.Info("earleist synced erc20 block", "address", address, "block", earliest)
if err != nil {
log.Error("failed to get earliest synced block", "error", err)
return nil, err
Expand Down
7 changes: 4 additions & 3 deletions services/wallet/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,10 @@ func (r *Reactor) Start() error {
r.group.Add(eth.Command())
}
newBlocks := &newBlocksTransfersCommand{
db: r.db,
chain: r.chain,
client: r.client,
db: r.db,
chain: r.chain,
client: r.client,
accounts: r.accounts,
eth: &ETHTransferDownloader{
client: r.client,
accounts: r.accounts,
Expand Down

0 comments on commit 72ca011

Please sign in to comment.