-
Notifications
You must be signed in to change notification settings - Fork 3
/
validator.go
125 lines (96 loc) · 3.9 KB
/
validator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package main
import (
"context"
"crypto/sha256"
"fmt"
"log"
"math/rand"
"sync"
"time"
dht "github.com/libp2p/go-libp2p-kad-dht"
)
func StartValidatorSampling(blockID int, blockDimension int, parcelSize int, s *Service, ctx context.Context, stats *Stats, dht *dht.IpfsDHT) {
startTime := time.Now()
samplesPerRow := blockDimension
rowColParcelsNeededCount := samplesPerRow / parcelSize
if samplesPerRow%parcelSize != 0 {
rowColParcelsNeededCount++
}
// randomParcelsNeededCount := 75
// allParcels := SplitSamplesIntoParcels(blockDimension, parcelSize, "all")
rowParcels := SplitSamplesIntoParcels(blockDimension, parcelSize, "row")
colParcels := SplitSamplesIntoParcels(blockDimension, parcelSize, "col")
randomRowParcels := pickRandomParcels(rowParcels, rowColParcelsNeededCount)
randomColParcels := pickRandomParcels(colParcels, rowColParcelsNeededCount)
// randomParcels := pickRandomParcels(allParcels, randomParcelsNeededCount)
allRandomParcels := append(randomRowParcels, randomColParcels...)
// allRandomParcels = append(allRandomParcels, randomParcels...)
// Randomize allRandomParcels
rand.Shuffle(len(allRandomParcels), func(i, j int) {
allRandomParcels[i], allRandomParcels[j] = allRandomParcels[j], allRandomParcels[i]
})
log.Printf(
"[V - %s] Sampling %d parcels (%d/%d Rows, %d/%d Cols) for Block %d...\n",
s.host.ID().String()[0:5],
len(allRandomParcels),
len(randomRowParcels),
rowColParcelsNeededCount,
len(randomColParcels),
rowColParcelsNeededCount,
blockID,
)
sampledParcelIDs := make([]int, 0)
var parcelWg sync.WaitGroup
for _, parcel := range allRandomParcels {
parcelWg.Add(1)
go func(p Parcel) {
defer parcelWg.Done()
parcelType := "col"
if p.IsRow {
parcelType = "row"
}
startTime := time.Now()
for !contains(sampledParcelIDs, p.StartingIndex) {
returnedPayload, err := dht.GetValue(
ctx,
"/das/sample/"+fmt.Sprint(blockID)+"/"+parcelType+"/"+fmt.Sprint(p.StartingIndex),
)
getLatency := time.Since(startTime)
getTimestamp := time.Now()
keyHash := sha256.Sum256([]byte("/das/sample/" + fmt.Sprint(blockID) + "/" + parcelType + "/" + fmt.Sprint(p.StartingIndex)))
keyHashString := fmt.Sprintf("%x", keyHash)
if err != nil {
// log.Printf("[V - %s] Failed to get parcel %d: %s\n", s.host.ID()[0:5].Pretty(), p.StartingIndex, err.Error())
parcelStatus := "fail"
if err.Error() == "context deadline exceeded" {
parcelStatus = "timeout"
}
stats.GetLatencies = append(stats.GetLatencies, getLatency)
stats.GetHops = append(stats.GetHops, 0)
stats.GetTimestamps = append(stats.GetTimestamps, getTimestamp)
stats.BlockIDs = append(stats.BlockIDs, fmt.Sprint(blockID))
stats.ParcelKeyHashes = append(stats.ParcelKeyHashes, keyHashString)
stats.ParcelStatuses = append(stats.ParcelStatuses, parcelStatus)
stats.ParcelDataLengths = append(stats.ParcelDataLengths, len(returnedPayload))
stats.TotalFailedGets += 1
stats.TotalGetMessages += 1
time.Sleep(1000 * time.Millisecond)
} else {
stats.GetLatencies = append(stats.GetLatencies, getLatency)
stats.GetHops = append(stats.GetHops, 0)
stats.GetTimestamps = append(stats.GetTimestamps, getTimestamp)
stats.BlockIDs = append(stats.BlockIDs, fmt.Sprint(blockID))
stats.ParcelKeyHashes = append(stats.ParcelKeyHashes, keyHashString)
stats.ParcelStatuses = append(stats.ParcelStatuses, "success")
stats.ParcelDataLengths = append(stats.ParcelDataLengths, len(returnedPayload))
stats.TotalGetMessages += 1
stats.TotalSuccessGets += 1
sampledParcelIDs = append(sampledParcelIDs, p.StartingIndex)
}
}
}(parcel)
}
parcelWg.Wait()
stats.TotalSamplingLatencies = append(stats.TotalSamplingLatencies, time.Since(startTime))
log.Printf("[V - %s] Block %d sampling took %.2f seconds.\n", s.host.ID().String()[0:5], blockID, time.Since(startTime).Seconds())
}