-
Notifications
You must be signed in to change notification settings - Fork 247
/
iterative.go
92 lines (80 loc) · 2.61 KB
/
iterative.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package wallet
import (
"context"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
// SetupIterativeDownloader configures IterativeDownloader with last known synced block.
func SetupIterativeDownloader(
db *Database, client HeaderReader, address common.Address, option SyncOption,
downloader BatchDownloader, size *big.Int, to *DBHeader) (*IterativeDownloader, error) {
from, err := db.GetLatestSynced(address, option)
if err != nil {
log.Error("failed to get latest synced block", "error", err)
return nil, err
}
if from == nil {
from = &DBHeader{Number: zero}
}
log.Debug("iterative downloader", "address", address, "from", from.Number, "to", to.Number)
d := &IterativeDownloader{
client: client,
batchSize: size,
downloader: downloader,
from: from,
to: to,
}
return d, nil
}
// BatchDownloader interface for loading transfers in batches in speificed range of blocks.
type BatchDownloader interface {
GetTransfersInRange(ctx context.Context, from, to *big.Int) ([]Transfer, error)
}
// IterativeDownloader downloads batches of transfers in a specified size.
type IterativeDownloader struct {
client HeaderReader
batchSize *big.Int
downloader BatchDownloader
from, to *DBHeader
previous *DBHeader
}
// Finished true when earliest block with given sync option is zero.
func (d *IterativeDownloader) Finished() bool {
return d.from.Number.Cmp(d.to.Number) == 0
}
// Header return last synced header.
func (d *IterativeDownloader) Header() *DBHeader {
return d.previous
}
// Next moves closer to the end on every new iteration.
func (d *IterativeDownloader) Next(parent context.Context) ([]Transfer, error) {
to := new(big.Int).Add(d.from.Number, d.batchSize)
// if start < 0; start = 0
if to.Cmp(d.to.Number) == 1 {
to = d.to.Number
}
transfers, err := d.downloader.GetTransfersInRange(parent, d.from.Number, to)
if err != nil {
log.Error("failed to get transfer in between two bloks", "from", d.from.Number, "to", to, "error", err)
return nil, err
}
// use integers instead of DBHeader
ctx, cancel := context.WithTimeout(parent, 5*time.Second)
header, err := d.client.HeaderByNumber(ctx, to)
cancel()
if err != nil {
log.Error("failed to get header by number", "from", d.from.Number, "to", to, "error", err)
return nil, err
}
d.previous, d.from = d.from, toDBHeader(header)
return transfers, nil
}
// Revert reverts last step progress. Should be used if application failed to process transfers.
// For example failed to persist them.
func (d *IterativeDownloader) Revert() {
if d.previous != nil {
d.from = d.previous
}
}