forked from lightninglabs/neutrino
-
Notifications
You must be signed in to change notification settings - Fork 5
/
utxoscanner.go
441 lines (361 loc) · 11.7 KB
/
utxoscanner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
package neutrino
import (
"container/heap"
"sync"
"sync/atomic"
"time"
"github.com/ltcsuite/ltcd/chaincfg/chainhash"
"github.com/ltcsuite/ltcd/ltcutil"
"github.com/ltcsuite/neutrino/headerfs"
)
// getUtxoResult is a simple pair type holding a spend report and error.
type getUtxoResult struct {
report *SpendReport
err error
}
// GetUtxoRequest is a request to scan for InputWithScript from the height
// BirthHeight.
type GetUtxoRequest struct {
// Input is the target outpoint with script to watch for spentness.
Input *InputWithScript
// BirthHeight is the height at which we expect to find the original
// unspent outpoint. This is also the height used when starting the
// search for spends.
BirthHeight uint32
// resultChan either the spend report or error for this request.
resultChan chan *getUtxoResult
// result caches the first spend report or error returned for this
// request.
result *getUtxoResult
// onProgress is the method to be used by the scanner to report
// its progress. It can be nil if not specified by the caller.
onProgress ScanProgressHandler
// mu ensures the first response delivered via resultChan is in fact
// what gets cached in result.
mu sync.Mutex
quit chan struct{}
}
// deliver tries to deliver the report or error to any subscribers. If
// resultChan cannot accept a new update, this method will not block.
func (r *GetUtxoRequest) deliver(report *SpendReport, err error) {
select {
case r.resultChan <- &getUtxoResult{report, err}:
default:
log.Warnf("duplicate getutxo result delivered for "+
"outpoint=%v, spend=%v, err=%v",
r.Input.OutPoint, report, err)
}
}
// Result is callback returning either a spend report or an error.
func (r *GetUtxoRequest) Result(cancel <-chan struct{}) (*SpendReport, error) {
r.mu.Lock()
defer r.mu.Unlock()
select {
case result := <-r.resultChan:
// Cache the first result returned, in case we have multiple
// readers calling Result.
if r.result == nil {
r.result = result
}
return r.result.report, r.result.err
case <-cancel:
return nil, ErrGetUtxoCancelled
case <-r.quit:
return nil, ErrShuttingDown
}
}
// UtxoScannerConfig exposes configurable methods for interacting with the blockchain.
type UtxoScannerConfig struct {
// BestSnapshot returns the block stamp of the current chain tip.
BestSnapshot func() (*headerfs.BlockStamp, error)
// GetBlockHash returns the block hash at given height in main chain.
GetBlockHash func(height int64) (*chainhash.Hash, error)
// BlockFilterMatches checks the cfilter for the block hash for matches
// against the rescan options.
BlockFilterMatches func(ro *rescanOptions, blockHash *chainhash.Hash) (bool, error)
// GetBlock fetches a block from the p2p network.
GetBlock func(chainhash.Hash, ...QueryOption) (*ltcutil.Block, error)
}
// UtxoScanner batches calls to GetUtxo so that a single scan can search for
// multiple outpoints. If a scan is in progress when a new element is added, we
// check whether it can safely be added to the current batch, if not it will be
// included in the next batch.
type UtxoScanner struct {
started uint32
stopped uint32
cfg *UtxoScannerConfig
pq GetUtxoRequestPQ
nextBatch []*GetUtxoRequest
mu sync.Mutex
cv *sync.Cond
wg sync.WaitGroup
quit chan struct{}
shutdown chan struct{}
}
// NewUtxoScanner creates a new instance of UtxoScanner using the given chain
// interface.
func NewUtxoScanner(cfg *UtxoScannerConfig) *UtxoScanner {
scanner := &UtxoScanner{
cfg: cfg,
quit: make(chan struct{}),
shutdown: make(chan struct{}),
}
scanner.cv = sync.NewCond(&scanner.mu)
return scanner
}
// Start begins running scan batches.
func (s *UtxoScanner) Start() error {
if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
return nil
}
s.wg.Add(1)
go s.batchManager()
return nil
}
// Stop any in-progress scan.
func (s *UtxoScanner) Stop() error {
if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
return nil
}
close(s.quit)
batchShutdown:
for {
select {
case <-s.shutdown:
break batchShutdown
case <-time.After(50 * time.Millisecond):
s.cv.Signal()
}
}
// Cancel all pending get utxo requests that were not pulled into the
// batchManager's main goroutine.
for !s.pq.IsEmpty() {
pendingReq := heap.Pop(&s.pq).(*GetUtxoRequest)
pendingReq.deliver(nil, ErrShuttingDown)
}
return nil
}
// Enqueue takes a GetUtxoRequest and adds it to the next applicable batch.
func (s *UtxoScanner) Enqueue(input *InputWithScript,
birthHeight uint32,
progressHandler ScanProgressHandler) (*GetUtxoRequest, error) {
log.Debugf("Enqueuing request for %s with birth height %d",
input.OutPoint.String(), birthHeight)
req := &GetUtxoRequest{
Input: input,
BirthHeight: birthHeight,
resultChan: make(chan *getUtxoResult, 1),
onProgress: progressHandler,
quit: s.quit,
}
s.cv.L.Lock()
select {
case <-s.quit:
s.cv.L.Unlock()
return nil, ErrShuttingDown
default:
}
// Insert the request into the queue and signal any threads that might be
// waiting for new elements.
heap.Push(&s.pq, req)
s.cv.L.Unlock()
s.cv.Signal()
return req, nil
}
// batchManager is responsible for scheduling batches of UTXOs to scan. Any
// incoming requests whose start height has already been passed will be added to
// the next batch, which gets scheduled after the current batch finishes.
//
// NOTE: This method MUST be spawned as a goroutine.
func (s *UtxoScanner) batchManager() {
defer close(s.shutdown)
for {
s.cv.L.Lock()
// Re-queue previously skipped requests for next batch.
for _, request := range s.nextBatch {
heap.Push(&s.pq, request)
}
s.nextBatch = nil
// Wait for the queue to be non-empty.
for s.pq.IsEmpty() {
s.cv.Wait()
select {
case <-s.quit:
s.cv.L.Unlock()
return
default:
}
}
req := s.pq.Peek()
s.cv.L.Unlock()
// Break out now before starting a scan if a shutdown was
// requested.
select {
case <-s.quit:
return
default:
}
// Initiate a scan, starting from the birth height of the
// least-height request currently in the queue.
err := s.scanFromHeight(req.BirthHeight)
if err != nil {
log.Errorf("utxo scan failed: %v", err)
}
}
}
// dequeueAtHeight returns all GetUtxoRequests that have starting height of the
// given height.
func (s *UtxoScanner) dequeueAtHeight(height uint32) []*GetUtxoRequest {
s.cv.L.Lock()
defer s.cv.L.Unlock()
// Take any requests that are too old to go in this batch and keep them for
// the next batch.
for !s.pq.IsEmpty() && s.pq.Peek().BirthHeight < height {
item := heap.Pop(&s.pq).(*GetUtxoRequest)
s.nextBatch = append(s.nextBatch, item)
}
var requests []*GetUtxoRequest
for !s.pq.IsEmpty() && s.pq.Peek().BirthHeight == height {
item := heap.Pop(&s.pq).(*GetUtxoRequest)
requests = append(requests, item)
}
return requests
}
// scanFromHeight runs a single batch, pulling in any requests that get added
// above the batch's last processed height. If there was an error, then return
// the outstanding requests.
func (s *UtxoScanner) scanFromHeight(initHeight uint32) error {
// Before beginning the scan, grab the best block stamp we know of,
// which will serve as an initial estimate for the end height of the
// scan.
bestStamp, err := s.cfg.BestSnapshot()
if err != nil {
return err
}
var (
// startHeight and endHeight bound the range of the current
// scan. If more blocks are found while a scan is running,
// these values will be updated afterwards to scan for the new
// blocks.
startHeight = initHeight
endHeight = uint32(bestStamp.Height)
)
reporter := newBatchSpendReporter()
options := defaultRescanOptions()
scanToEnd:
// Scan forward through the blockchain and look for any transactions that
// might spend the given UTXOs.
for height := startHeight; height <= endHeight; height++ {
// Before beginning to scan this height, check to see if the
// utxoscanner has been signaled to exit.
select {
case <-s.quit:
return reporter.FailRemaining(ErrShuttingDown)
default:
}
hash, err := s.cfg.GetBlockHash(int64(height))
if err != nil {
return reporter.FailRemaining(err)
}
// If there are any new requests that can safely be added to this batch,
// then try and fetch them.
newReqs := s.dequeueAtHeight(height)
// If an outpoint is created in this block, then fetch it regardless.
// Otherwise check to see if the filter matches any of our watched
// outpoints.
fetch := len(newReqs) > 0
if !fetch {
options.watchList = reporter.filterEntries
match, err := s.cfg.BlockFilterMatches(options, hash)
if err != nil {
return reporter.FailRemaining(err)
}
// If still no match is found, we have no reason to
// fetch this block, and can continue to next height.
if !match {
reporter.NotifyProgress(height)
continue
}
}
// At this point, we've determined that we either (1) have new
// requests which we need the block to scan for originating
// UTXOs, or (2) the watchlist triggered a match against the
// neutrino filter. Before fetching the block, check to see if
// the utxoscanner has been signaled to exit so that we can exit
// the rescan before performing an expensive operation.
select {
case <-s.quit:
return reporter.FailRemaining(ErrShuttingDown)
default:
}
log.Debugf("Fetching block height=%d hash=%s", height, hash)
block, err := s.cfg.GetBlock(*hash)
if err != nil {
return reporter.FailRemaining(err)
}
// Check again to see if the utxoscanner has been signaled to exit.
select {
case <-s.quit:
return reporter.FailRemaining(ErrShuttingDown)
default:
}
log.Debugf("Processing block height=%d hash=%s", height, hash)
reporter.ProcessBlock(block.MsgBlock(), newReqs, height)
reporter.NotifyProgress(height)
}
// We've scanned up to the end height, now perform a check to see if we
// still have any new blocks to process. If this is the first time
// through, we might have a few blocks that were added since the
// scan started.
currStamp, err := s.cfg.BestSnapshot()
if err != nil {
return reporter.FailRemaining(err)
}
// If the returned height is higher, we still have more blocks to go.
// Shift the start and end heights and continue scanning.
if uint32(currStamp.Height) > endHeight {
startHeight = endHeight + 1
endHeight = uint32(currStamp.Height)
goto scanToEnd
}
reporter.NotifyUnspentAndUnfound()
return nil
}
// A GetUtxoRequestPQ implements heap.Interface and holds GetUtxoRequests. The
// queue maintains that heap.Pop() will always return the GetUtxo request with
// the least starting height. This allows us to add new GetUtxo requests to
// an already running batch.
type GetUtxoRequestPQ []*GetUtxoRequest
func (pq GetUtxoRequestPQ) Len() int { return len(pq) }
func (pq GetUtxoRequestPQ) Less(i, j int) bool {
// We want Pop to give us the least BirthHeight.
return pq[i].BirthHeight < pq[j].BirthHeight
}
func (pq GetUtxoRequestPQ) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}
// Push is called by the heap.Interface implementation to add an element to the
// end of the backing store. The heap library will then maintain the heap
// invariant.
func (pq *GetUtxoRequestPQ) Push(x interface{}) {
item := x.(*GetUtxoRequest)
*pq = append(*pq, item)
}
// Peek returns the least height element in the queue without removing it.
func (pq *GetUtxoRequestPQ) Peek() *GetUtxoRequest {
return (*pq)[0]
}
// Pop is called by the heap.Interface implementation to remove an element from
// the end of the backing store. The heap library will then maintain the heap
// invariant.
func (pq *GetUtxoRequestPQ) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
*pq = old[0 : n-1]
return item
}
// IsEmpty returns true if the queue has no elements.
func (pq *GetUtxoRequestPQ) IsEmpty() bool {
return pq.Len() == 0
}