-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathbenchmark.go
134 lines (122 loc) · 2.74 KB
/
benchmark.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package main
import (
"fmt"
"log"
"sync"
"time"
)
// Benchmark wraps a Collection and evaluates how
// quickly the collection can iterate over its data
// set while writes are being applied.
type Benchmark struct {
id string
c Collection
mu *sync.RWMutex
wg *sync.WaitGroup
done chan bool
}
// NewBenchmark returns a initialized Benchmark
// with an underlying Collection based on the specified
// id and database path.
func NewBenchmark(id string, path string) (b *Benchmark, err error) {
b = &Benchmark{
id: id,
wg: &sync.WaitGroup{},
done: make(chan bool),
}
switch id {
case "bolt":
b.c, err = NewBoltCollection(path)
case "kv":
b.c, err = NewKVCollection(path)
case "kv-mu":
b.c, err = NewKVCollection(path)
b.mu = &sync.RWMutex{}
case "leveldb":
b.c, err = NewLevelDBCollection(path)
case "noop":
b.c, err = NewNoopCollection()
default:
err = fmt.Errorf("unknown benchmark id: %s", id)
}
return
}
// Wait blocks until the Run method has completed.
func (b *Benchmark) Wait() {
b.wg.Wait()
}
// Run launches two goroutines, one to read row sets
// from ch, writing those rows to the collection, and another
// to wake up every dur interval and poll the collection for
// number of records and the time it took to iterate over
// those records.
func (b *Benchmark) Run(ch chan []*Row, dur time.Duration) {
b.wg.Add(1)
go b.Writer(ch)
go b.Poll(dur)
}
// Writer reads rows from ch and writes them to
// the underlying Collection
func (b *Benchmark) Writer(ch chan []*Row) {
n := int64(0) // row set counter
ns := int64(0) // elapsed time in nanoseconds
var t0, t1 time.Time // time between arrivals
for rows := range ch {
n, t1 = n+1, time.Now()
if n > 1 {
ns += t1.Sub(t0).Nanoseconds()
}
t0 = t1
if b.mu != nil {
b.mu.Lock()
}
err := b.c.Set(rows)
if b.mu != nil {
b.mu.Unlock()
}
if err != nil {
log.Println(err)
return
}
}
b.done <- true
if n > 0 {
log.Printf("%d row sets arrived at an average inter-arrival rate of %s",
n, time.Duration(ns/n))
}
}
// Poll wakes up every dur duration and polls the
// underlying Collection Timer.
func (b *Benchmark) Poll(dur time.Duration) {
defer b.wg.Done()
for {
select {
case _ = <-b.done:
if b.mu != nil {
b.mu.RLock()
}
n, t := b.c.Timing()
if b.mu != nil {
b.mu.RUnlock()
}
ms := t.Nanoseconds() / 1e6
opsms := int64(n) / ms
log.Printf("%s: %d ops in %d ms: %d ops/ms\n",
b.id, n, ms, opsms)
return
default:
time.Sleep(dur)
if b.mu != nil {
b.mu.RLock()
}
n, t := b.c.Timing()
if b.mu != nil {
b.mu.RUnlock()
}
ms := t.Nanoseconds() / 1e6
opsms := int64(n) / ms
log.Printf("%s: %d ops in %d ms: %d ops/ms\n",
b.id, n, ms, opsms)
}
}
}