Skip to content

Commit

Permalink
Adds 'main' and 'metrics' packages. (#3)
Browse files Browse the repository at this point in the history
* Adds 'main' and 'metrics' packages.

* Changes in response to comments and requests in PR #3.

* Changes from comments and suggestions in PR #3.

* Fixes a few tests for metrics package.

* Renames struct element intervalSeries to just interval. Renames Prom metric lable 'node' to 'machine'.

* Adds two new Prometheus metrics: disco_collect_duration_seconds (a histogram) and disco_collect_errors_total.

* Outputs JSONL instead of indented JSON. Removes the node/machine label from Prom metrics (k8s auto-discovery should add it automatically).

* Tests archive.Write by writing a set of models as JSONL to a file, then reading that file back into new set of models, and then checking for equality between the two.

* Moves collectDuration and collectErrors out of the Metrics struct and instead as global package variables. Handles and unhandled error case.
  • Loading branch information
nkinkade authored Jun 18, 2020
1 parent ae57847 commit ca7aacf
Show file tree
Hide file tree
Showing 7 changed files with 854 additions and 63 deletions.
4 changes: 2 additions & 2 deletions archive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Model struct {

// MustMarshalJSON accepts a Model object and returns marshalled JSON.
func MustMarshalJSON(m Model) []byte {
data, err := json.MarshalIndent(m, "", " ")
data, err := json.Marshal(m)
rtx.Must(err, "ERROR: failed to marshal archive.Model to JSON. This should never happen")
return data
}
Expand All @@ -42,7 +42,7 @@ func GetPath(start time.Time, end time.Time, hostname string) string {

startTimeStr := start.Format("2006-01-02T15:04:05")
endTimeStr := end.Format("2006-01-02T15:04:05")
archiveName := fmt.Sprintf("%v-to-%v-switch.json", startTimeStr, endTimeStr)
archiveName := fmt.Sprintf("%v-to-%v-switch.jsonl", startTimeStr, endTimeStr)
archivePath := fmt.Sprintf("%v/%v", dirs, archiveName)

return archivePath
Expand Down
108 changes: 59 additions & 49 deletions archive/archive_test.go
Original file line number Diff line number Diff line change
@@ -1,63 +1,59 @@
package archive

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"reflect"
"testing"
"time"

"github.com/m-lab/go/rtx"
)

var testModel = Model{
Experiment: "s1-abc0t.measurement-lab.org",
Hostname: "mlab2-abc0t.mlab-sandbox.measurement-lab.org",
Metric: "switch.unicast.uplink.tx",
Samples: []Sample{
Sample{
Timestamp: 1591845348,
CollectStart: 1592344911240000000,
CollectEnd: 1592344912360000000,
Value: 158,
var testModels = []Model{
Model{
Experiment: "s1-abc0t.measurement-lab.org",
Hostname: "mlab2-abc0t.mlab-sandbox.measurement-lab.org",
Metric: "switch.unicast.uplink.tx",
Samples: []Sample{
Sample{
Timestamp: 1591845348,
CollectStart: 1592344911240000000,
CollectEnd: 1592344912360000000,
Value: 158,
},
Sample{
Timestamp: 1591845358,
CollectStart: 1592344911240000000,
CollectEnd: 1592344912360000000,
Value: 132,
},
},
Sample{
Timestamp: 1591845358,
CollectStart: 1592344911240000000,
CollectEnd: 1592344912360000000,
Value: 132,
},
Model{
Experiment: "s1-abc0t.measurement-lab.org",
Hostname: "mlab2-abc0t.mlab-sandbox.measurement-lab.org",
Metric: "switch.octets.local.rx",
Samples: []Sample{
Sample{
Timestamp: 1591845348,
CollectStart: 1592344911240000000,
CollectEnd: 1592344912360000000,
Value: 3256,
},
Sample{
Timestamp: 1591845358,
CollectStart: 1592344911240000000,
CollectEnd: 1592344912360000000,
Value: 2789,
},
},
},
}

var expectedJSON = `{
"experiment": "s1-abc0t.measurement-lab.org",
"hostname": "mlab2-abc0t.mlab-sandbox.measurement-lab.org",
"metric": "switch.unicast.uplink.tx",
"sample": [
{
"timestamp": 1591845348,
"collectstart": 1592344911240000000,
"collectend": 1592344912360000000,
"value": 158
},
{
"timestamp": 1591845358,
"collectstart": 1592344911240000000,
"collectend": 1592344912360000000,
"value": 132
}
]
}`

func Test_MustMarshalJSON(t *testing.T) {
jsonData := MustMarshalJSON(testModel)

if string(jsonData) != expectedJSON {
t.Errorf("The collected JSON data does not match what was expected. Got: %v. Expected: %v", string(jsonData), expectedJSON)
}
}

func Test_GetPath(t *testing.T) {
tests := []struct {
end time.Time
Expand All @@ -69,19 +65,19 @@ func Test_GetPath(t *testing.T) {
end: time.Date(2010, 04, 18, 20, 34, 50, 0, time.UTC),
interval: 60,
hostname: "mlab2-abc0t.mlab-sandbox.measurement-lab.org",
expect: "2010/04/18/mlab2-abc0t.mlab-sandbox.measurement-lab.org/2010-04-18T20:33:50-to-2010-04-18T20:34:50-switch.json",
expect: "2010/04/18/mlab2-abc0t.mlab-sandbox.measurement-lab.org/2010-04-18T20:33:50-to-2010-04-18T20:34:50-switch.jsonl",
},
{
end: time.Date(1972, 07, 03, 11, 14, 10, 0, time.UTC),
interval: 600,
hostname: "mlab4-xyz03.mlab-staging.measurement-lab.org",
expect: "1972/07/03/mlab4-xyz03.mlab-staging.measurement-lab.org/1972-07-03T11:04:10-to-1972-07-03T11:14:10-switch.json",
expect: "1972/07/03/mlab4-xyz03.mlab-staging.measurement-lab.org/1972-07-03T11:04:10-to-1972-07-03T11:14:10-switch.jsonl",
},
{
end: time.Date(2020, 06, 11, 18, 18, 30, 0, time.UTC),
interval: 300,
hostname: "mlab1-qrs0t.mlab-sandbox.measurement-lab.org",
expect: "2020/06/11/mlab1-qrs0t.mlab-sandbox.measurement-lab.org/2020-06-11T18:13:30-to-2020-06-11T18:18:30-switch.json",
expect: "2020/06/11/mlab1-qrs0t.mlab-sandbox.measurement-lab.org/2020-06-11T18:13:30-to-2020-06-11T18:18:30-switch.jsonl",
},
}

Expand Down Expand Up @@ -113,20 +109,34 @@ func Test_Write(t *testing.T) {
rtx.Must(err, "Could not create tempdir")
defer os.RemoveAll(dir)

jsonData := MustMarshalJSON(testModel)
jsonData := []byte{}
for _, model := range testModels {
jsonData = append(jsonData, MustMarshalJSON(model)...)
jsonData = append(jsonData, '\n')
}

endTime := time.Now()
startTime := endTime.Add(time.Duration(10) * -time.Second)
archivePath := GetPath(startTime, endTime, "mlab2-abc0t.mlab-sandbox.measurement-lab.org")
testArchivePath := fmt.Sprintf("%v/%v", dir, archivePath)

err = Write(testArchivePath, jsonData)
rtx.Must(err, "Failed to Write test jsonData to testARchivePath")

contents, err := ioutil.ReadFile(testArchivePath)
rtx.Must(err, "Could not read test archive file")

if string(contents) != expectedJSON {
t.Error("Contents of written archive file do match expected contents")
readModels := []Model{}
data := bytes.NewReader(contents)
dec := json.NewDecoder(data)
for dec.More() {
model := Model{}
err := dec.Decode(&model)
rtx.Must(err, "Failed to Decode JSON")
readModels = append(readModels, model)
}

if !reflect.DeepEqual(testModels, readModels) {
t.Errorf("Expected testModels:\n%v\nGot readModels:\n%v", testModels, readModels)
}
}
96 changes: 96 additions & 0 deletions disco.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package main

import (
"context"
"flag"
"log"
"time"

"github.com/m-lab/disco/config"
"github.com/m-lab/disco/metrics"
"github.com/m-lab/disco/snmp"
"github.com/m-lab/go/flagx"
"github.com/m-lab/go/prometheusx"
"github.com/m-lab/go/rtx"
"github.com/soniah/gosnmp"
)

var (
fCommunity = flag.String("community", "", "The SNMP community string for the switch.")
fHostname = flag.String("hostname", "", "The FQDN of the node.")
fListenAddress = flag.String("listen-address", ":8888", "Address to listen on for telemetry.")
fMetricsFile = flag.String("metrics", "", "Path to YAML file defining metrics to scrape.")
fWriteInterval = flag.Duration("write-interval", 300*time.Second, "Interval to write out JSON files e.g, 300s, 10m.")
fTarget = flag.String("target", "", "Switch FQDN to scrape metrics from.")
logFatal = log.Fatal
mainCtx, mainCancel = context.WithCancel(context.Background())
)

func main() {
flag.Parse()
rtx.Must(flagx.ArgsFromEnv(flag.CommandLine), "Could not parse env args")

if len(*fCommunity) <= 0 {
log.Fatal("SNMP community string must be passed as arg or env variable.")
}

if len(*fHostname) <= 0 {
log.Fatal("Node's FQDN must be passed as an arg or env variable.")
}

goSNMP := &gosnmp.GoSNMP{
Target: *fTarget,
Port: uint16(161),
Community: *fCommunity,
Version: gosnmp.Version2c,
Timeout: time.Duration(5) * time.Second,
Retries: 1,
}
err := goSNMP.Connect()
rtx.Must(err, "Failed to connect to the SNMP server")

config, err := config.New(*fMetricsFile)
rtx.Must(err, "Could not create new metrics configuration")
client := snmp.New(goSNMP)
metrics := metrics.New(client, config, *fTarget, *fHostname)

promSrv := prometheusx.MustServeMetrics()

go func() {
<-mainCtx.Done()
goSNMP.Conn.Close()
promSrv.Close()
}()

// Start scraping on a clean 10s boundary within a minute. Run in an very
// tight loop to be sure we start things as early in the 10s boundary as
// possible.
for time.Now().Second()%10 != 0 {
time.Sleep(1 * time.Millisecond)
}

writeTicker := time.NewTicker(*fWriteInterval)
metrics.IntervalStart = time.Now()

collectTicker := time.NewTicker(10 * time.Second)
// Tickers wait for the configured duration before their first tick. We want
// Collect() to run immedately, so manually kick off Collect() once
// immediately after the ticker is created.
metrics.Collect(client, config)

for {
select {
case <-mainCtx.Done():
collectTicker.Stop()
writeTicker.Stop()
return
case <-writeTicker.C:
start := metrics.IntervalStart
metrics.IntervalStart = time.Now()
metrics.Write(start, time.Now())
case <-collectTicker.C:
metrics.CollectStart = time.Now()
metrics.Collect(client, config)
}
}
}
Loading

0 comments on commit ca7aacf

Please sign in to comment.