-
Notifications
You must be signed in to change notification settings - Fork 0
/
tx_analyzer.go
506 lines (421 loc) · 13.2 KB
/
tx_analyzer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
package dashboard
import (
"flag"
"fmt"
"github.com/btcsuite/btcd/rpcclient"
influxClient "github.com/influxdata/influxdb/client/v2"
"io/ioutil"
"log"
"os"
"strconv"
"strings"
"sync"
"time"
)
var N_WORKERS int
const N_WORKERS_DEFAULT = 2
const DB_WAIT_TIME = 30
// A Dashboard contains all the components necessary to make RPC calls to bitcoind, and
// to place data into influxdb.
type Dashboard struct {
client *rpcclient.Client
iClient influxClient.Client
bp influxClient.BatchPoints
DB string
}
// Assumes enviroment variables: DB, DB_USERNAME, DB_PASSWORD, BITCOIND_HOST, BITCOIND_USERNAME, BITCOIND_PASSWORD, are all set.
// influxd and bitcoind should already be started.
func setupDashboard() Dashboard {
DB := os.Getenv("DB")
DB_USERNAME := os.Getenv("DB_USERNAME")
DB_PASSWORD := os.Getenv("DB_PASSWORD")
BITCOIND_HOST := os.Getenv("BITCOIND_HOST")
BITCOIND_USERNAME := os.Getenv("BITCOIND_USERNAME")
BITCOIND_PASSWORD := os.Getenv("BITCOIND_PASSWORD")
// Connect to local bitcoin core RPC server using HTTP POST mode.
connCfg := &rpcclient.ConnConfig{
Host: BITCOIND_HOST,
User: BITCOIND_USERNAME,
Pass: BITCOIND_PASSWORD,
HTTPPostMode: true, // Bitcoin core only supports HTTP POST mode
DisableTLS: true, // Bitcoin core does not provide TLS by default
}
// Notice the notification parameter is nil since notifications are
// not supported in HTTP POST mode.
client, err := rpcclient.New(connCfg, nil)
if err != nil {
log.Fatal(err)
}
// Setup influxdb client.
ic, err := influxClient.NewHTTPClient(influxClient.HTTPConfig{
Addr: "http://localhost:8086",
Username: DB_USERNAME,
Password: DB_PASSWORD,
})
if err != nil {
log.Fatal(err)
}
// Setup influx batchpoints.
bp, err := influxClient.NewBatchPoints(influxClient.BatchPointsConfig{
Database: DB,
})
if err != nil {
log.Fatal(err)
}
dash := Dashboard{
client,
ic,
bp,
DB,
}
return dash
}
func (dash *Dashboard) shutdown() {
dash.client.Shutdown()
dash.iClient.Close()
}
func main() {
recoveryFlagPtr := flag.Bool("recovery", false, "Set to true to start workers on files in ./worker-progress")
startPtr := flag.Int("start", 0, "Starting blockheight.")
endPtr := flag.Int("end", 0, "Last blockheight to analyze.")
nWorkersPtr := flag.Int("workers", N_WORKERS_DEFAULT, "Number of concurrent RPC workers.")
flag.Parse()
N_WORKERS = *nWorkersPtr
if *recoveryFlagPtr {
recoverFromFailure()
}
// If both a start and end are given, analyze that range.
if (*startPtr > 0) && (*endPtr > 0) {
analyze(*startPtr, *endPtr)
return
}
// Given no arguments, start live analysis.
doLiveAnalysis(*startPtr)
}
// Splits up work across N_WORKERS workers,each with their own RPC/db clients.
func analyze(start, end int) {
var wg sync.WaitGroup
workSplit := (end - start) / N_WORKERS
log.Println("work split: ", workSplit, end-start, start, end)
currentDir, err := os.Getwd()
if err != nil {
log.Fatal(err)
}
// Create the progress directory if it doesn't already exist.
workerProgressDir := currentDir + "/worker-progress"
if _, err := os.Stat(workerProgressDir); os.IsNotExist(err) {
log.Printf("Creating worker progress directory at: %v\n", workerProgressDir)
err := os.Mkdir(workerProgressDir, 0777)
if err != nil {
log.Fatal(err)
}
}
formattedTime := time.Now().Format("01-02:15:04")
for i := 0; i < N_WORKERS; i++ {
wg.Add(1)
go func(i int) {
// Get name for this worker's progress file.
workFile := fmt.Sprintf("%v/worker-%v_%v", workerProgressDir, i, formattedTime)
analyzeBlockRange(i, start+(workSplit*i), start+(workSplit*(i+1)), workFile)
wg.Done()
}(i)
}
wg.Wait()
}
// Analyzes all blocks from in the interval [start, end)
func analyzeBlockRange(workerID, start, end int, workFile string) {
dash := setupDashboard()
defer dash.shutdown()
log.Println(start, end)
// Keep track of time since last write.
// If it was less than DB_WAIT_TIME seconds ago. don't write yet.
// prevents us from overwhelming influxdb
lastWriteTime := time.Now()
lastWriteTime = lastWriteTime.Add(DB_WAIT_TIME * time.Second)
startTime := time.Now()
// Create file to record progress in.
// If the file already exists, this truncates it which is ok.
file, err := os.Create(workFile)
defer file.Close()
if err != nil {
log.Fatal(err)
}
// Record progress in file.
progress := fmt.Sprintf("Start=%v\nLast=%v\nEnd=%v", start, start, end)
_, err = file.WriteAt([]byte(progress), 0)
if err != nil {
log.Fatal(err)
}
for i := start; i < end; i++ {
startBlock := time.Now()
dash.analyzeBlock(int64(i))
log.Printf("Worker %v: Done with %v blocks total (height=%v) after %v (%v) \n", workerID, i-start+1, i, time.Since(startTime), time.Since(startBlock))
// Only perform the write to influxDB if there hasn't been a write in the last 5 seconds.
// And make sure to do the write before finishing.
if !time.Now().After(lastWriteTime) && (i != end-1) {
continue
}
writeSuccessful := false
for attempts := 0; attempts <= MAX_ATTEMPTS; attempts++ {
err := dash.iClient.Write(dash.bp)
if err != nil {
log.Println("DB WRITE ERR: ", err)
log.Println("Trying DB write again...")
time.Sleep(1 * time.Second) // Sleep to give DB a break.
continue
}
writeSuccessful = true
break
}
if !writeSuccessful {
log.Printf("DB write failed!")
return
}
log.Printf("\n\n STORED INTO INFLUXDB \n\n")
// Setup influx batchpoints.
bp, err := influxClient.NewBatchPoints(influxClient.BatchPointsConfig{
Database: dash.DB,
})
if err != nil {
log.Fatal("Error creating new batchpoints", err)
}
dash.bp = bp
lastWriteTime = time.Now().Add(DB_WAIT_TIME * time.Second)
// Record progress in file, overwriting previous record.
progress = fmt.Sprintf("Start=%v\nLast=%v\nEnd=%v", start, i, end)
_, err = file.WriteAt([]byte(progress), 0)
if err != nil {
log.Fatal("Error writing progress: ", err, progress)
}
}
// Worker finished successfully so its progress record is unneeded.
err = os.Remove(workFile)
if err != nil {
log.Printf("Error removing %v: %v\n", workFile, err)
}
log.Printf("Worker %v done analyzing %v blocks (height=%v) after %v\n", workerID, end-start, end, time.Since(startTime))
}
// analyzeBlock uses the getblockstats RPC to compute metrics of a single block.
// It then stores the results in a batchpoint in the Dashboard's influx client.
func (dash *Dashboard) analyzeBlock(blockHeight int64) {
tags := make(map[string]string) // for influxdb
fields := make(map[string]interface{}) // for influxdb
// Use getblockstats RPC and merge results into the metrics struct.
blockStatsRes, err := dash.client.GetBlockStats(blockHeight, nil)
if err != nil {
log.Fatal(err)
}
blockStats := BlockStats{blockStatsRes}
// Set influx tags and fields based off of the block stats computed.
blockStats.setInfluxTags(tags, blockHeight)
blockStats.setInfluxFields(fields)
// Create and add new influxdb point for this block.
blockTime := time.Unix(blockStats.Time, 0)
pt, err := influxClient.NewPoint(
"block_metrics",
tags,
fields,
blockTime,
)
if err != nil {
log.Fatal("Error creating new point", err)
}
dash.bp.AddPoint(pt)
}
// recoverFromFailure checks the worker-progress directory for any unfinished work from a previous job.
// If there is any, it starts a new worker to continue the work for each previously failed worker.
func recoverFromFailure() {
log.Println("Starting Recovery Process.")
currentDir, err := os.Getwd()
if err != nil {
log.Fatal(err)
}
// If there is no worker-progress directory, then there aren't any failures :)
workerProgressDir := currentDir + "/worker-progress"
if _, err := os.Stat(workerProgressDir); os.IsNotExist(err) {
return
}
files, err := ioutil.ReadDir(workerProgressDir)
if err != nil {
log.Fatal(err)
}
var wg sync.WaitGroup
wg.Add(len(files))
nWorkersBusy := 0
doneCh := make(chan struct{}, N_WORKERS)
i := 0 // index into files, incremented at bottom of loop.
for i < len(files) {
// Check if any workers are free.
select {
case <-doneCh:
nWorkersBusy--
default:
}
// If all workers are busy, wait and continue.
if nWorkersBusy >= N_WORKERS {
time.Sleep(250 * time.Millisecond)
continue
}
// Assign work to a free worker.
nWorkersBusy++
file := files[i]
contentsBytes, err := ioutil.ReadFile(workerProgressDir + "/" + file.Name())
if err != nil {
log.Fatal(err)
}
contents := string(contentsBytes)
progress := parseProgress(contents)
if len(progress) == 3 {
log.Printf("Starting recovery worker %v on range [%v, %v) at height %v\n", i, progress[0], progress[2], progress[1])
go func(i int, file os.FileInfo) {
analyzeBlockRange(i, progress[1], progress[2], workerProgressDir+"/"+file.Name())
doneCh <- struct{}{}
wg.Done()
}(i, file)
} else if len(progress) == 1 {
// Finish work done during a live analysis.
log.Printf("Starting recovery worker %v on block %v\n", i, progress[0])
go func(i int, file os.FileInfo) {
analyzeBlockLive(int64(progress[0]), workerProgressDir+"/"+file.Name())
doneCh <- struct{}{}
wg.Done()
}(i, file)
} else {
log.Fatal("Bad progress given: ", progress)
}
i++
}
wg.Wait()
log.Println("Finished with Recovery.")
}
// parseProgress takes in the contents of a worker-progress file
// and returns the starting height, the last height completed, and the end height.
func parseProgress(contents string) []int {
lines := strings.Split(contents, "\n")
result := make([]int, 0)
for _, line := range lines {
split := strings.Split(line, "=")
if len(split) < 2 {
continue
}
height, err := strconv.Atoi(split[1])
if err != nil {
log.Fatal(err)
}
result = append(result, height)
}
return result
}
// doLiveAnalysis does an analysis of blocks as they come in live.
// In order to avoid dealing with re-org in this code-base, it should
// stay at least 6 blocks behind.
func doLiveAnalysis(height int) {
log.Println("Starting a live analysis of the blockchain.")
formattedTime := time.Now().Format("01-02:15:04")
dash := setupDashboard()
defer dash.shutdown()
currentDir, err := os.Getwd()
if err != nil {
log.Fatal(err)
}
// Create the progress directory if it doesn't already exist.
workerProgressDir := currentDir + "/worker-progress"
if _, err := os.Stat(workerProgressDir); os.IsNotExist(err) {
log.Printf("Creating worker progress directory at: %v\n", workerProgressDir)
err := os.Mkdir(workerProgressDir, 0777)
if err != nil {
log.Fatal(err)
}
}
blockCount, err := dash.client.GetBlockCount()
if err != nil {
log.Fatal(err)
}
workFile := fmt.Sprintf("%v/live-worker_%v", workerProgressDir, formattedTime)
var lastAnalysisStarted int64
if height == 0 {
lastAnalysisStarted = blockCount - 6
} else {
lastAnalysisStarted = int64(height)
}
heightInRangeOfTip := (blockCount - lastAnalysisStarted) <= 6
for {
if heightInRangeOfTip {
time.Sleep(500 * time.Millisecond)
blockCount, err = dash.client.GetBlockCount()
if err != nil {
log.Fatal(err)
}
} else {
analyzeBlockLive(lastAnalysisStarted, workFile)
lastAnalysisStarted += 1
}
heightInRangeOfTip = (blockCount - lastAnalysisStarted) <= 6
}
}
// analyzeBlock uses the getblockstats RPC to compute metrics of a single block.
// It then stores the results in a batchpoint in the Dashboard's influx client.
func analyzeBlockLive(blockHeight int64, workFile string) {
dash := setupDashboard()
defer dash.shutdown()
tags := make(map[string]string) // for influxdb
fields := make(map[string]interface{}) // for influxdb
start := time.Now()
// Create file to record progress in.
file, err := os.Create(workFile)
defer file.Close()
if err != nil {
log.Fatal(err)
}
// Record progress in file.
progress := fmt.Sprintf("Height=%v", blockHeight)
_, err = file.WriteAt([]byte(progress), 0)
if err != nil {
log.Fatal(err)
}
// Use getblockstats RPC and merge results into the metrics struct.
blockStatsRes, err := dash.client.GetBlockStats(blockHeight, nil)
if err != nil {
log.Fatal(err)
}
blockStats := BlockStats{blockStatsRes}
// Set influx tags and fields based off of the block stats computed.
blockStats.setInfluxTags(tags, blockHeight)
blockStats.setInfluxFields(fields)
// Create and add new influxdb point for this block.
blockTime := time.Unix(blockStats.Time, 0)
pt, err := influxClient.NewPoint(
"block_metrics",
tags,
fields,
blockTime,
)
if err != nil {
log.Fatal("Error creating new point", err)
}
dash.bp.AddPoint(pt)
// Try writing the point to influxdb.
writeSuccessful := false
for attempts := 0; attempts <= MAX_ATTEMPTS; attempts++ {
err := dash.iClient.Write(dash.bp)
if err != nil {
log.Println("DB WRITE ERR: ", err)
log.Println("Trying DB write again...")
time.Sleep(1 * time.Second) // Sleep to give DB a break.
continue
}
log.Printf("\n\n STORED INTO INFLUXDB \n\n")
writeSuccessful = true
break
}
if !writeSuccessful {
log.Printf("DB write failed!")
return
}
// Worker finished successfully so its progress record is unneeded.
err = os.Remove(workFile)
if err != nil {
log.Printf("Error removing %v: %v\n", workFile, err)
}
log.Printf("Done with block %v after %v\n", blockHeight, time.Since(start))
}