-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathstorage.go
236 lines (212 loc) · 6.17 KB
/
storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
)
// Metric holds one metric data point
type Metric struct {
Job string
URL string
Timing string
Value float64
Timestamp time.Time
Tags map[string]string
}
// Event holds one monitoring event
type Event struct {
Name string
ServerStatus int
Timestamp time.Time
Tags map[string]string
}
// Storage holds our active storage backends
type Storage struct {
Engines []StorageEngine
MetricDistributor chan Metric
EventDistributor chan Event
}
// StorageEngine holds a backend storage engine's interface as well as
// a channel for passing metrics to the engine
type StorageEngine struct {
I StorageEngineInterface
M chan<- Metric
E chan<- Event
AcceptsMetrics bool
AcceptsEvents bool
}
// StorageEngineInterface is an interface that provides a few standardized
// methods for various storage backends
type StorageEngineInterface interface {
sendMetric(Metric) error
sendEvent(Event) error
StartStorageEngine(context.Context, *sync.WaitGroup) (chan<- Metric, chan<- Event)
}
// NewStorage creats a Storage object, populated with all configured
// StorageEngines
func NewStorage(ctx context.Context, wg *sync.WaitGroup, c ServiceConfig) (*Storage, error) {
var err error
s := Storage{}
// Initialize our channel for passing metrics to the StorageDistributor
s.MetricDistributor = make(chan Metric, 20)
// Initialize our channel for passing events to the StorageDistributor
s.EventDistributor = make(chan Event, 20)
// Check the configuration file for various supported storage backends
// and enable them if found
if c.Storage.Graphite.Host != "" {
err = s.AddEngine(ctx, wg, "graphite", c)
if err != nil {
return &s, fmt.Errorf("could not add Graphite storage backend: %v", err)
}
}
if c.Storage.Dogstatsd.Host != "" {
err = s.AddEngine(ctx, wg, "dogstatsd", c)
if err != nil {
return &s, fmt.Errorf("could not add dogstatsd storage backend: %v", err)
}
}
if c.Storage.Prometheus.ListenAddr != "" {
err = s.AddEngine(ctx, wg, "prometheus", c)
if err != nil {
return &s, fmt.Errorf("could not start Prometheus storage backend: %v", err)
}
}
if c.Storage.Riemann.Host != "" {
err = s.AddEngine(ctx, wg, "riemann", c)
if err != nil {
return &s, fmt.Errorf("could not start Riemann storage backend: %v", err)
}
}
if c.Storage.InfluxDB.Host != "" {
err = s.AddEngine(ctx, wg, "influxdb", c)
if err != nil {
return &s, fmt.Errorf("could not start InfluxDB storage backend: %v", err)
}
}
if c.Storage.Log.File != "" {
err = s.AddEngine(ctx, wg, "log", c)
if err != nil {
return &s, fmt.Errorf("could not start Log storage backend: %v", err)
}
}
if c.Storage.PagerDuty.RoutingKey != "" {
err = s.AddEngine(ctx, wg, "pagerduty", c)
if err != nil {
return &s, fmt.Errorf("could not start PagerDuty storage backend: %v", err)
}
}
if c.Storage.SplunkHec.HecURL != "" {
err = s.AddEngine(ctx, wg, "splunk-hec", c)
if err != nil {
return &s, fmt.Errorf("could not start Splunk HEC storage backend: %v", err)
}
}
// Start our storage distributor to distribute received metrics and events
// to storage backends
go s.storageDistributor(ctx, wg)
return &s, nil
}
// AddEngine adds a new StorageEngine of name engineName to our Storage object
func (s *Storage) AddEngine(ctx context.Context, wg *sync.WaitGroup, engineName string, c ServiceConfig) error {
var err error
switch engineName {
case "graphite":
se := StorageEngine{}
se.I = NewGraphiteStorage(c)
se.AcceptsEvents = false
se.AcceptsMetrics = true
se.M, se.E = se.I.StartStorageEngine(ctx, wg)
s.Engines = append(s.Engines, se)
case "dogstatsd":
se := StorageEngine{}
se.I = NewDogstatsdStorage(c)
se.AcceptsEvents = true
se.AcceptsMetrics = true
se.M, se.E = se.I.StartStorageEngine(ctx, wg)
s.Engines = append(s.Engines, se)
case "prometheus":
se := StorageEngine{}
se.I = NewPrometheusStorage(c)
se.AcceptsEvents = false
se.AcceptsMetrics = true
se.M, se.E = se.I.StartStorageEngine(ctx, wg)
s.Engines = append(s.Engines, se)
case "riemann":
se := StorageEngine{}
se.I, err = NewRiemannStorage(c)
if err != nil {
log.Fatalln("Could not connect to Riemann storage backend:", err)
}
se.AcceptsEvents = true
se.AcceptsMetrics = true
se.M, se.E = se.I.StartStorageEngine(ctx, wg)
s.Engines = append(s.Engines, se)
case "influxdb":
se := StorageEngine{}
se.I = NewInfluxDBStorage(c)
se.AcceptsEvents = false
se.AcceptsMetrics = true
se.M, se.E = se.I.StartStorageEngine(ctx, wg)
s.Engines = append(s.Engines, se)
case "log":
se := StorageEngine{}
se.I, err = NewLogStorage(c)
if err != nil {
return err
}
se.AcceptsEvents = true
se.AcceptsMetrics = true
se.M, se.E = se.I.StartStorageEngine(ctx, wg)
s.Engines = append(s.Engines, se)
case "pagerduty":
se := StorageEngine{}
se.I, err = NewPagerDutyStorage(c)
if err != nil {
log.Fatalln("Could not start PagerDuty storage backend")
}
se.AcceptsEvents = true
se.AcceptsMetrics = false
se.M, se.E = se.I.StartStorageEngine(ctx, wg)
s.Engines = append(s.Engines, se)
case "splunk-hec":
se := StorageEngine{}
se.I, err = NewSplunkHecStorage(c)
if err != nil {
log.Fatalln("Could not start Splunk HEC storage backend")
}
se.AcceptsEvents = true
se.AcceptsMetrics = true
se.M, se.E = se.I.StartStorageEngine(ctx, wg)
s.Engines = append(s.Engines, se)
}
return nil
}
// storageDistributor receives metrics from gathers and fans them out to the various
// storage backends
func (s *Storage) storageDistributor(ctx context.Context, wg *sync.WaitGroup) error {
wg.Add(1)
defer wg.Done()
for {
select {
case e := <-s.EventDistributor:
for _, en := range s.Engines {
// We only forward events onward if the engine supports events
if en.AcceptsEvents {
en.E <- e
}
}
case m := <-s.MetricDistributor:
for _, en := range s.Engines {
// We only forward metrics onwards if the engine supports metrics
if en.AcceptsMetrics {
en.M <- m
}
}
case <-ctx.Done():
log.Println("Cancellation request received. Cancelling metric distributor.")
return nil
}
}
}