Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unique ID for every transfer (multiple token transfers per tx) and concurrent erc20 downloads #1482

Merged
merged 12 commits into from
Jun 13, 2019
Merged
8 changes: 4 additions & 4 deletions api/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ func (b *StatusBackend) reSelectAccount() error {
default:
return err
}
return b.startWallet()
return nil
}

// SelectAccount selects current wallet and chat accounts, by verifying that each address has corresponding account which can be decrypted
Expand Down Expand Up @@ -551,10 +551,10 @@ func (b *StatusBackend) SelectAccount(walletAddress, chatAddress, password strin
return err
}
}
return b.startWallet()
return b.startWallet(password)
}

func (b *StatusBackend) startWallet() error {
func (b *StatusBackend) startWallet(password string) error {
if !b.statusNode.Config().WalletConfig.Enabled {
return nil
}
Expand All @@ -567,7 +567,7 @@ func (b *StatusBackend) startWallet() error {
return err
}
path := path.Join(b.statusNode.Config().DataDir, fmt.Sprintf("wallet-%x.sql", account.Address))
return wallet.StartReactor(path,
return wallet.StartReactor(path, password,
b.statusNode.RPCClient().Ethclient(),
[]common.Address{account.Address},
new(big.Int).SetUint64(b.statusNode.Config().NetworkID))
Expand Down
20 changes: 1 addition & 19 deletions services/wallet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ Objects in the same format.
Signals
-------

Three signals will are emitted:
Two signals can be emitted:

1. `newblock` signal

Expand Down Expand Up @@ -197,21 +197,3 @@ Client expected to request new transfers from received block and replace transfe
}
}
```

3. `history` signal.

Emmited if new transfer in old block was found.
Client expected to request transfers starting from this new block till the earliest known block.

```json
{
"type": "wallet",
"event": {
"type": "history",
"blockNumber": 0,
"accounts": [
"0x42c8f505b4006d417dd4e0ba0e880692986adbd8"
]
}
}
```
4 changes: 4 additions & 0 deletions services/wallet/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (
"github.com/ethereum/go-ethereum/log"
)

func NewAPI(s *Service) *API {
return &API{s}
}

// API is class with methods available over RPC.
type API struct {
s *Service
Expand Down
152 changes: 152 additions & 0 deletions services/wallet/async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package wallet

import (
"context"
"sync"
"time"
)

type Command func(context.Context) error

// FiniteCommand terminates when error is nil.
type FiniteCommand struct {
Interval time.Duration
Runable func(context.Context) error
}

func (c FiniteCommand) Run(ctx context.Context) error {
ticker := time.NewTicker(c.Interval)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
err := c.Runable(ctx)
if err == nil {
return nil
}
}
}
}

// InfiniteCommand runs until context is closed.
type InfiniteCommand struct {
Interval time.Duration
Runable func(context.Context) error
}

func (c InfiniteCommand) Run(ctx context.Context) error {
ticker := time.NewTicker(c.Interval)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
_ = c.Runable(ctx)
}
}
}

func NewGroup(parent context.Context) *Group {
ctx, cancel := context.WithCancel(parent)
return &Group{
ctx: ctx,
cancel: cancel,
}
}

type Group struct {
ctx context.Context
cancel func()
wg sync.WaitGroup
}

func (g *Group) Add(cmd Command) {
g.wg.Add(1)
go func() {
_ = cmd(g.ctx)
g.wg.Done()
}()
}

func (g *Group) Stop() {
g.cancel()
}

func (g *Group) Wait() {
g.wg.Wait()
}

func (g *Group) WaitAsync() <-chan struct{} {
ch := make(chan struct{})
go func() {
g.Wait()
close(ch)
}()
return ch
}

func NewAtomicGroup(parent context.Context) *AtomicGroup {
ctx, cancel := context.WithCancel(parent)
return &AtomicGroup{ctx: ctx, cancel: cancel}
}

// AtomicGroup terminates as soon as first goroutine terminates..
type AtomicGroup struct {
ctx context.Context
cancel func()
wg sync.WaitGroup

mu sync.Mutex
error error
}

// Go spawns function in a goroutine and stores results or errors.
func (d *AtomicGroup) Add(cmd Command) {
d.wg.Add(1)
go func() {
defer d.wg.Done()
err := cmd(d.ctx)
d.mu.Lock()
defer d.mu.Unlock()
if err != nil {
// do not overwrite original error by context errors
if d.error != nil {
return
}
d.error = err
d.cancel()
return
}
}()
}

// Wait for all downloaders to finish.
func (d *AtomicGroup) Wait() {
d.wg.Wait()
if d.Error() == nil {
d.mu.Lock()
defer d.mu.Unlock()
d.cancel()
}
}

func (d *AtomicGroup) WaitAsync() <-chan struct{} {
ch := make(chan struct{})
go func() {
d.Wait()
close(ch)
}()
return ch
}

// Error stores an error that was reported by any of the downloader. Should be called after Wait.
func (d *AtomicGroup) Error() error {
d.mu.Lock()
defer d.mu.Unlock()
return d.error
}

func (d *AtomicGroup) Stop() {
d.cancel()
}
Loading