-
Notifications
You must be signed in to change notification settings - Fork 0
/
exporter.go
151 lines (140 loc) · 6.31 KB
/
exporter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package main
import (
"context"
"sync"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/prometheus/client_golang/prometheus"
)
const (
namespace = "ebs_snapshots"
)
// NewExporter returns an initialized `Exporter`.
func (hub *Hub) NewExporter(job *Job) (*Exporter, error) {
ec2, err := hub.NewEC2Client(&job.AWSCreds)
if err != nil {
hub.logger.Errorf("Error initializing EC2 Client")
return nil, err
}
return &Exporter{
Mutex: sync.Mutex{},
client: ec2,
job: job,
hub: hub,
up: prometheus.NewDesc(
prometheus.BuildFQName(namespace, job.Name, "up"),
"Could the AWS EC2 API be reached.",
nil,
nil,
),
version: prometheus.NewDesc(
prometheus.BuildFQName(namespace, job.Name, "version"),
"Version of ebs-snapshot-exporter",
[]string{"build"},
nil,
),
snapshotsCount: constructPromMetric("count", job.Name, "The total number of snapshots", job.ExportedTags),
snapshotVolumeSize: constructPromMetric("volume_size", job.Name, "Size of volume assosicated with the EBS snapshot", job.ExportedTags),
snapshotStartTime: constructPromMetric("start_time", job.Name, "Start Timestamp of EBS Snapshot", job.ExportedTags),
}, nil
}
// sendSafeMetric is a concurrent safe method to send metrics to a channel. Since we are collecting metrics from AWS API, there might be possibility where
// a timeout occurs from Prometheus' collection context and the channel is closed but Goroutines running in background can still
// send metrics to this closed channel which would result in panic and crash. To solve that we use context and check if the channel is not closed
// and only send the metrics in that case. Else it logs the error and returns in a safe way.
func (hub *Hub) sendSafeMetric(ctx context.Context, ch chan<- prometheus.Metric, metric prometheus.Metric) error {
// Check if collection context is finished
select {
case <-ctx.Done():
// don't send metrics, instead return in a "safe" way
hub.logger.Errorf("Attempted to send metrics to a closed channel after collection context had finished: %s", metric)
return ctx.Err()
default: // continue
}
// Send metrics if collection context is still open
ch <- metric
return nil
}
// Describe describes all the metrics ever exported by the exporter. It implements `prometheus.Collector`.
func (p *Exporter) Describe(ch chan<- *prometheus.Desc) {
ch <- p.version
ch <- p.up
ch <- p.snapshotsCount
ch <- p.snapshotStartTime
ch <- p.snapshotVolumeSize
}
// Collect is called by the Prometheus registry when collecting
// metrics. This method may be called concurrently and must therefore be
// implemented in a concurrency safe way. It implements `prometheus.Collector`
func (p *Exporter) Collect(ch chan<- prometheus.Metric) {
// Initialize context to keep track of the collection.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Lock the exporter for one iteration of collection as `Collect` can be called concurrently.
p.Lock()
defer p.Unlock()
// Fetch snapshots data from EC2 API.
snaps, err := p.client.GetSnapshots(p.job.Filters)
if err != nil {
p.hub.logger.Errorf("Error collecting metrics from EC2 API: %v", err)
p.hub.sendSafeMetric(ctx, ch, prometheus.MustNewConstMetric(p.up, prometheus.GaugeValue, 0))
return
}
p.collectSnapshotMetrics(ctx, ch, snaps)
// Send default metrics data.
p.hub.sendSafeMetric(ctx, ch, prometheus.MustNewConstMetric(p.version, prometheus.GaugeValue, 1, p.hub.version))
p.hub.sendSafeMetric(ctx, ch, prometheus.MustNewConstMetric(p.up, prometheus.GaugeValue, 1))
}
func (p *Exporter) collectSnapshotMetrics(ctx context.Context, ch chan<- prometheus.Metric, snaps *ec2.DescribeSnapshotsOutput) {
// Iterate through all snapshots and collect only those which match the user defined tags.
for _, s := range snaps.Snapshots {
// Initialize common label values for all metrics exported below.
exportedLabelValues := []string{*s.SnapshotId, *s.VolumeId, p.job.AWSCreds.Region, *s.Progress, *s.State}
// Initialize an empty slice for all additional label names.
exportedLabelNames := []string{}
// Iterate through a set of tags and append to the slice of exportedLabelValues if the tag key matches.
// with user defined tags
for _, t := range s.Tags {
for _, k := range p.job.ExportedTags {
if *t.Key == k {
// If the tag matches, add the value to exportedLabelValues.
exportedLabelValues = append(exportedLabelValues, *t.Value)
// Also add the tag key to exported label names of the metric.
exportedLabelNames = append(exportedLabelNames, k)
}
}
}
// Create a map to indicate if a label name is present.
set := make(map[string]bool)
for _, v := range exportedLabelNames {
set[v] = true
}
// For all the other user defined tags if there is no value present send empty string to maintain
// label cardinality.
for _, k := range p.job.ExportedTags {
if !set[k] {
exportedLabelValues = append(exportedLabelValues, "")
exportedLabelNames = append(exportedLabelNames, k)
}
}
// At this point exportedLabelNames should have all label names for the metric and exportedLabelValues should have consistent data.
snapVol := constructPromMetric("volume_size", p.job.Name, "Size of volume assosicated with the EBS snapshot", exportedLabelNames)
p.hub.sendSafeMetric(ctx, ch, prometheus.MustNewConstMetric(snapVol, prometheus.GaugeValue, float64(*s.VolumeSize), exportedLabelValues...))
snapStartTime := constructPromMetric("start_time", p.job.Name, "Start Timestamp of EBS Snapshot", exportedLabelNames)
p.hub.sendSafeMetric(ctx, ch, prometheus.MustNewConstMetric(snapStartTime, prometheus.GaugeValue, float64(s.StartTime.Unix()), exportedLabelValues...))
}
}
// Returns an intialized prometheus.Desc instance
func constructPromMetric(metricName string, jobName string, helpText string, additionalLabels []string) *prometheus.Desc {
// Default labels for any metric constructed with this function.
labels := []string{"snapshot_id", "vol_id", "region", "progress", "state"}
// Iterate through a slice of additional labels to be exported.
for _, k := range additionalLabels {
// Replace all tags with underscores if present to make it a valid Prometheus label name.
labels = append(labels, replaceWithUnderscores(k))
}
return prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", metricName),
helpText,
labels, prometheus.Labels{"job": jobName},
)
}