Skip to content

Commit

Permalink
feat(serializers): Add CloudEvents serializer (influxdata#13224)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan committed May 5, 2023
1 parent fc4e896 commit 1bcc279
Show file tree
Hide file tree
Showing 28 changed files with 2,082 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/DATA_FORMATS_OUTPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ plugins.

1. [InfluxDB Line Protocol](/plugins/serializers/influx)
1. [Carbon2](/plugins/serializers/carbon2)
1. [CloudEvents](/plugins/serializers/cloudevents)
1. [CSV](/plugins/serializers/csv)
1. [Graphite](/plugins/serializers/graphite)
1. [JSON](/plugins/serializers/json)
Expand Down
1 change: 1 addition & 0 deletions docs/LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ following works:
- github.com/cenkalti/backoff [MIT License](https://github.com/cenkalti/backoff/blob/master/LICENSE)
- github.com/cespare/xxhash [MIT License](https://github.com/cespare/xxhash/blob/master/LICENSE.txt)
- github.com/cisco-ie/nx-telemetry-proto [Apache License 2.0](https://github.com/cisco-ie/nx-telemetry-proto/blob/master/LICENSE)
- github.com/cloudevents/sdk-go [Apache License 2.0](https://github.com/cloudevents/sdk-go/blob/main/LICENSE)
- github.com/containerd/containerd [Apache License 2.0](https://github.com/containerd/containerd/blob/master/LICENSE)
- github.com/coocood/freecache [MIT License](https://github.com/coocood/freecache/blob/master/LICENSE)
- github.com/coreos/go-semver [Apache License 2.0](https://github.com/coreos/go-semver/blob/main/LICENSE)
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ require (
github.com/riemann/riemann-go-client v0.5.1-0.20211206220514-f58f10cdce16
github.com/robbiet480/go.nut v0.0.0-20220219091450-bd8f121e1fa1
github.com/safchain/ethtool v0.3.0
github.com/santhosh-tekuri/jsonschema/v5 v5.3.0
github.com/sensu/sensu-go/api/core/v2 v2.16.0
github.com/shirou/gopsutil/v3 v3.23.3
github.com/showwin/speedtest-go v1.5.2
Expand Down Expand Up @@ -261,6 +262,7 @@ require (
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cloudevents/sdk-go/v2 v2.14.0
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect
github.com/containerd/containerd v1.6.18 // indirect
github.com/couchbase/gomemcached v0.1.3 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,8 @@ github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp
github.com/cisco-ie/nx-telemetry-proto v0.0.0-20230117155933-f64c045c77df h1:GmrltUp5Qf5XhT+LmqMDizsgm/6VHTSxPWRdrq21yRo=
github.com/cisco-ie/nx-telemetry-proto v0.0.0-20230117155933-f64c045c77df/go.mod h1:rJDd05J5hqWVU9MjJ+5jw1CuLn/jRhvU0xtFEzzqjwM=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudevents/sdk-go/v2 v2.14.0 h1:Nrob4FwVgi5L4tV9lhjzZcjYqFVyJzsA56CwPaPfv6s=
github.com/cloudevents/sdk-go/v2 v2.14.0/go.mod h1:xDmKfzNjM8gBvjaF8ijFjM1VYOVUEeUfapHMUX1T5To=
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 h1:F1EaeKL/ta07PY/k9Os/UFtwERei2/XzGemhpGnBKNg=
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
Expand Down Expand Up @@ -1399,6 +1401,8 @@ github.com/safchain/ethtool v0.3.0 h1:gimQJpsI6sc1yIqP/y8GYgiXn/NjgvpM0RNoWLVVmP
github.com/safchain/ethtool v0.3.0/go.mod h1:SA9BwrgyAqNo7M+uaL6IYbxpm5wk3L7Mm6ocLW+CJUs=
github.com/samuel/go-zookeeper v0.0.0-20200724154423-2164a8ac840e h1:CGjiMQ0wMH4wtNWrlj6kiTbkPt2F3rbYnhGX6TWLfco=
github.com/samuel/go-zookeeper v0.0.0-20200724154423-2164a8ac840e/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/santhosh-tekuri/jsonschema/v5 v5.3.0 h1:uIkTLo0AGRc8l7h5l9r+GcYi9qfVPt6lD4/bhmzfiKo=
github.com/santhosh-tekuri/jsonschema/v5 v5.3.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
Expand Down Expand Up @@ -1522,6 +1526,7 @@ github.com/ulikunitz/xz v0.5.8/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oW
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli/v2 v2.23.5 h1:xbrU7tAYviSpqeR3X4nEFWUdB/uDZ6DE+HxmRU7Xtyw=
github.com/urfave/cli/v2 v2.23.5/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6fmdJLxc=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/vapourismo/knx-go v0.0.0-20220829185957-fb5458a5389d h1:BJMc7MNW/p80cCkC46JimNuowOWCnSSW5IHjtUrXzNk=
github.com/vapourismo/knx-go v0.0.0-20220829185957-fb5458a5389d/go.mod h1:43Jz/tjx4Ehy/CmohTtTIM33hIwYm/6ccdpddnK8KVY=
github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE=
Expand Down
7 changes: 7 additions & 0 deletions plugins/serializers/all/cloudevents.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
//go:build !custom || serializers || serializers.cloudevents

package all

import (
_ "github.com/influxdata/telegraf/plugins/serializers/cloudevents" // register plugin
)
64 changes: 64 additions & 0 deletions plugins/serializers/cloudevents/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# CloudEvents Serializer

The `cloudevents` data format outputs metrics as [CloudEvents][CloudEvents] in
[JSON format][JSON Spec]. Currently, versions v1.0 and v0.3 of the specification
are supported with the former being the default.

[CloudEvents]: https://cloudevents.io
[JSON Spec]: https://github.com/cloudevents/spec/blob/v1.0/json-format.md

## Configuration

```toml
[[outputs.file]]
## Files to write to, "stdout" is a specially handled file
files = ["stdout", "/tmp/metrics.out"]

## Data format to output
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "cloudevents"

## Specification version to use for events
## Currently versions "0.3" and "1.0" are supported.
# cloudevents_version = "1.0"

## Event source specifier
## This allows to overwrite the source header-field with the given value.
# cloudevents_source = "telegraf"

## Tag to use as event source specifier
## This allows to overwrite the source header-field with the value of the
## specified tag. If both 'cloudevents_source' and 'cloudevents_source_tag'
## are set, the this setting will take precedence. In case the specified tag
## value does not exist for a metric, the serializer will fallback to
## 'cloudevents_source'.
# cloudevents_source_tag = ""

## Event-type specifier to overwrite the default value
## By default, events (and event batches) containing a single metric will
## set the event-type to 'com.influxdata.telegraf.metric' while events
## containing a batch of metrics will set the event-type to
## 'com.influxdata.telegraf.metric' (plural).
# cloudevents_event_type = ""

## Set time header of the event
## Supported values are:
## none -- do not set event time
## earliest -- use timestamp of the earliest metric
## latest -- use timestamp of the latest metric
## creation -- use timestamp of event creation
## For events containing only a single metric, earliest and latest are
## equivalent.
# cloudevents_event_time = "latest"

## Batch format of the output when running in batch mode
## If set to 'events' the resulting output will contain a list of events,
## each with a single metric according to the JSON Batch Format of the
## specification. Use 'application/cloudevents-batch+json' for this format.
##
## When set to 'metrics', a single event will be generated containing a list
## of metrics as payload. Use 'application/cloudevents+json' for this format.
# cloudevents_batch_format = "events"
```
199 changes: 199 additions & 0 deletions plugins/serializers/cloudevents/cloudevents.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package cloudevents

import (
"encoding/json"
"errors"
"fmt"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/gofrs/uuid/v5"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/serializers"
)

const (
EventTypeSingle = "com.influxdata.telegraf.metric"
EventTypeBatch = "com.influxdata.telegraf.metrics"
)

type Serializer struct {
Version string `toml:"cloudevents_version"`
Source string `toml:"cloudevents_source"`
SourceTag string `toml:"cloudevents_source_tag"`
EventType string `toml:"cloudevents_event_type"`
EventTime string `toml:"cloudevents_event_time"`
BatchFormat string `toml:"cloudevents_batch_format"`
Log telegraf.Logger `toml:"-"`

idgen uuid.Generator
}

func (s *Serializer) Init() error {
switch s.Version {
case "":
s.Version = event.CloudEventsVersionV1
case event.CloudEventsVersionV03, event.CloudEventsVersionV1:
default:
return errors.New("invalid 'cloudevents_version'")
}

switch s.EventTime {
case "":
s.EventTime = "latest"
case "none", "earliest", "latest", "creation":
default:
return errors.New("invalid 'cloudevents_event_time'")
}

switch s.BatchFormat {
case "":
s.BatchFormat = "events"
case "metrics", "events":
default:
return errors.New("invalid 'cloudevents_batch_format'")
}

if s.Source == "" {
s.Source = "telegraf"
}

s.idgen = uuid.NewGen()

return nil
}

func (s *Serializer) Serialize(m telegraf.Metric) ([]byte, error) {
// Create the event that forms the envelop around the metric
evt, err := s.createEvent(m)
if err != nil {
return nil, err
}
return evt.MarshalJSON()
}

func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
switch s.BatchFormat {
case "metrics":
return s.batchMetrics(metrics)
case "events":
return s.batchEvents(metrics)
}
return nil, fmt.Errorf("unexpected batch-format %q", s.BatchFormat)
}

func (s *Serializer) batchMetrics(metrics []telegraf.Metric) ([]byte, error) {
// Determine the necessary information
eventType := EventTypeBatch
if s.EventType != "" {
eventType = s.EventType
}
id, err := s.idgen.NewV1()
if err != nil {
return nil, fmt.Errorf("generating ID failed: %w", err)
}

// Serialize the metrics
var earliest, latest time.Time
data := make([]map[string]interface{}, 0, len(metrics))
for _, m := range metrics {
ts := m.Time()
data = append(data, map[string]interface{}{
"name": m.Name(),
"tags": m.Tags(),
"fields": m.Fields(),
"timestamp": ts.UnixNano(),
})
if ts.Before(earliest) {
earliest = ts
}
if ts.After(latest) {
latest = ts
}
}

// Create the event that forms the envelop around the metric
evt := cloudevents.NewEvent(s.Version)
evt.SetSource(s.Source)
evt.SetID(id.String())
evt.SetType(eventType)
if err := evt.SetData(cloudevents.ApplicationJSON, data); err != nil {
return nil, fmt.Errorf("setting data failed: %w", err)
}
switch s.EventTime {
case "creation":
evt.SetTime(time.Now())
case "earliest":
evt.SetTime(earliest)
case "latest":
evt.SetTime(latest)
}

return json.Marshal(evt)
}

func (s *Serializer) batchEvents(metrics []telegraf.Metric) ([]byte, error) {
events := make([]*cloudevents.Event, 0, len(metrics))
for _, m := range metrics {
e, err := s.createEvent(m)
if err != nil {
s.Log.Errorf("creating event for %v failed: %w", m, err)
continue
}
events = append(events, e)
}
return json.Marshal(events)
}

func (s *Serializer) createEvent(m telegraf.Metric) (*cloudevents.Event, error) {
// Determine the necessary information
source := s.Source
if s.SourceTag != "" {
if v, ok := m.GetTag(s.SourceTag); ok {
source = v
}
}
eventType := EventTypeSingle
if s.EventType != "" {
eventType = s.EventType
}
id, err := s.idgen.NewV1()
if err != nil {
return nil, fmt.Errorf("generating ID failed: %w", err)
}

// Serialize the metric
data := map[string]interface{}{
"name": m.Name(),
"tags": m.Tags(),
"fields": m.Fields(),
"timestamp": m.Time().UnixNano(),
}

// Create the event that forms the envelop around the metric
evt := cloudevents.NewEvent(s.Version)
evt.SetSource(source)
evt.SetID(id.String())
evt.SetType(eventType)
if err := evt.SetData(cloudevents.ApplicationJSON, data); err != nil {
return nil, fmt.Errorf("setting data failed: %w", err)
}
switch s.EventTime {
case "creation":
evt.SetTime(time.Now())
case "earliest", "latest":
evt.SetTime(m.Time())
}

return &evt, nil
}

func init() {
serializers.Add("cloudevents",
func() serializers.Serializer {
return &Serializer{}
},
)
}
Loading

0 comments on commit 1bcc279

Please sign in to comment.