Skip to content

Commit

Permalink
feat(parsers.json): Allow JSONata based transformations in JSON seria…
Browse files Browse the repository at this point in the history
…lizer (#11251)
  • Loading branch information
srebhan authored Jul 21, 2022
1 parent ffb06c2 commit 9f3a741
Show file tree
Hide file tree
Showing 17 changed files with 536 additions and 37 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1355,6 +1355,7 @@ func (c *Config) buildSerializer(tbl *ast.Table) (serializers.Serializer, error)

c.getFieldDuration(tbl, "json_timestamp_units", &sc.TimestampUnits)
c.getFieldString(tbl, "json_timestamp_format", &sc.TimestampFormat)
c.getFieldString(tbl, "json_transformation", &sc.Transformation)

c.getFieldBool(tbl, "splunkmetric_hec_routing", &sc.HecRouting)
c.getFieldBool(tbl, "splunkmetric_multimetric", &sc.SplunkmetricMultiMetric)
Expand Down
1 change: 1 addition & 0 deletions docs/LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ following works:
- github.com/awslabs/kinesis-aggregation/go [Apache License 2.0](https://github.com/awslabs/kinesis-aggregation/blob/master/LICENSE.txt)
- github.com/benbjohnson/clock [MIT License](https://github.com/benbjohnson/clock/blob/master/LICENSE)
- github.com/beorn7/perks [MIT License](https://github.com/beorn7/perks/blob/master/LICENSE)
- github.com/blues/jsonata-go [MIT License](https://github.com/blues/jsonata-go/blob/main/LICENSE)
- github.com/bmatcuk/doublestar [MIT License](https://github.com/bmatcuk/doublestar/blob/master/LICENSE)
- github.com/caio/go-tdigest [MIT License](https://github.com/caio/go-tdigest/blob/master/LICENSE)
- github.com/cenkalti/backoff [MIT License](https://github.com/cenkalti/backoff/blob/master/LICENSE)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.13.6
github.com/aws/smithy-go v1.11.3
github.com/benbjohnson/clock v1.3.0
github.com/blues/jsonata-go v1.5.4
github.com/bmatcuk/doublestar/v3 v3.0.0
github.com/caio/go-tdigest v3.1.0+incompatible
github.com/cisco-ie/nx-telemetry-proto v0.0.0-20220628142927-f4160bcb943c
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,8 @@ github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqO
github.com/bkielbasa/cyclop v1.2.0/go.mod h1:qOI0yy6A7dYC4Zgsa72Ppm9kONl0RoIlPbzot9mhmeI=
github.com/blang/semver v3.1.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
github.com/blues/jsonata-go v1.5.4 h1:XCsXaVVMrt4lcpKeJw6mNJHqQpWU751cnHdCFUq3xd8=
github.com/blues/jsonata-go v1.5.4/go.mod h1:uns2jymDrnI7y+UFYCqsRTEiAH22GyHnNXrkupAVFWI=
github.com/bmatcuk/doublestar/v3 v3.0.0 h1:TQtVPlDnAYwcrVNB2JiGuMc++H5qzWZd9PhkNo5WyHI=
github.com/bmatcuk/doublestar/v3 v3.0.0/go.mod h1:6PcTVMw80pCY1RVuoqu3V++99uQB3vsSYKPTd8AWA0k=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
Expand Down
2 changes: 1 addition & 1 deletion plugins/outputs/azure_data_explorer/azure_data_explorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (adx *AzureDataExplorer) Init() error {
return errors.New("Metrics grouping type is not valid")
}

serializer, err := json.NewSerializer(time.Nanosecond, time.RFC3339Nano)
serializer, err := json.NewSerializer(time.Nanosecond, time.RFC3339Nano, "")
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestWrite(t *testing.T) {

for _, tC := range testCases {
t.Run(tC.name, func(t *testing.T) {
serializer, err := telegrafJson.NewSerializer(time.Second, "")
serializer, err := telegrafJson.NewSerializer(time.Second, "", "")
require.NoError(t, err)

plugin := AzureDataExplorer{
Expand Down
16 changes: 7 additions & 9 deletions plugins/outputs/event_hubs/event_hubs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func (eh *mockEventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIt
/* End wrapper interface */

func TestInitAndWrite(t *testing.T) {
serializer, _ := json.NewSerializer(time.Second, "")
serializer, err := json.NewSerializer(time.Second, "", "")
require.NoError(t, err)
mockHub := &mockEventHub{}
e := &EventHubs{
Hub: mockHub,
Expand All @@ -52,8 +53,7 @@ func TestInitAndWrite(t *testing.T) {
}

mockHub.On("GetHub", mock.Anything).Return(nil).Once()
err := e.Init()
require.NoError(t, err)
require.NoError(t, e.Init())
mockHub.AssertExpectations(t)

metrics := testutil.MockMetrics()
Expand Down Expand Up @@ -100,8 +100,8 @@ func TestInitAndWriteIntegration(t *testing.T) {
testHubCS := os.Getenv("EVENTHUB_CONNECTION_STRING") + ";EntityPath=" + entity.Name

// Configure the plugin to target the newly created hub
serializer, _ := json.NewSerializer(time.Second, "")

serializer, err := json.NewSerializer(time.Second, "", "")
require.NoError(t, err)
e := &EventHubs{
Hub: &eventHub{},
ConnectionString: testHubCS,
Expand All @@ -110,13 +110,11 @@ func TestInitAndWriteIntegration(t *testing.T) {
}

// Verify that we can connect to Event Hubs
err = e.Init()
require.NoError(t, err)
require.NoError(t, e.Init())

// Verify that we can successfully write data to Event Hubs
metrics := testutil.MockMetrics()
err = e.Write(metrics)
require.NoError(t, err)
require.NoError(t, e.Write(metrics))

/*
** Verify we can read data back from the test hub
Expand Down
9 changes: 4 additions & 5 deletions plugins/outputs/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,12 +640,11 @@ func TestBatchedUnbatched(t *testing.T) {
Method: defaultMethod,
}

var s = map[string]serializers.Serializer{
jsonSerializer, err := json.NewSerializer(time.Second, "", "")
require.NoError(t, err)
s := map[string]serializers.Serializer{
"influx": influx.NewSerializer(),
"json": func(s serializers.Serializer, err error) serializers.Serializer {
require.NoError(t, err)
return s
}(json.NewSerializer(time.Second, "")),
"json": jsonSerializer,
}

for name, serializer := range s {
Expand Down
2 changes: 1 addition & 1 deletion plugins/outputs/stomp/stomp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestConnectAndWrite(t *testing.T) {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
s, err := serializers.NewJSONSerializer(10*time.Second, "yyy-dd-mmThh:mm:ss")
s, err := serializers.NewJSONSerializer(10*time.Second, "yyy-dd-mmThh:mm:ss", "")
require.NoError(t, err)
st := &STOMP{
Host: url,
Expand Down
209 changes: 209 additions & 0 deletions plugins/serializers/json/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ The `json` output data format converts metrics into JSON documents.
# layout specification from https://golang.org/pkg/time/#Time.Format
# e.g.: json_timestamp_format = "2006-01-02T15:04:05Z07:00"
#json_timestamp_format = ""

## A [JSONata](https://jsonata.org/) transformation of the JSON in [standard-form](#examples).
## This allows to generate an arbitrary output form based on the metric(s). Please use
## multiline strings (starting and ending with three single-quotes) if needed.
#json_transformation = ""
```

## Examples
Expand Down Expand Up @@ -84,3 +89,207 @@ reference the documentation for the specific plugin.
]
}
```

## Transformations

Transformations using the [JSONata standard](https://jsonata.org/) can be specified with
the `json_tansformation` parameter. The input to the transformation is the serialized
metric in the standard-form above.

**Note**: There is a difference in batch and non-batch serialization mode!
The former adds a `metrics` field containing the metric array, while the later
serializes the metric directly.

In the following sections, some rudimentary examples for transformations are shown.
For more elaborated JSONata expressions please consult the
[documentation](https://docs.jsonata.org) or the
[online playground](https://try.jsonata.org).

### Non-batch mode

In the following examples, we will use the following input to the transformation:

```json
{
"fields": {
"field_1": 30,
"field_2": 4,
"field_N": 59,
"n_images": 660
},
"name": "docker",
"tags": {
"host": "raynor"
},
"timestamp": 1458229140
}
```

If you want to flatten the above metric, you can use

```json
$merge([{"name": name, "timestamp": timestamp}, tags, fields])
```

to get

```json
{
"name": "docker",
"timestamp": 1458229140,
"host": "raynor",
"field_1": 30,
"field_2": 4,
"field_N": 59,
"n_images": 660
}
```

It is also possible to do arithmetics or renaming

```json
{
"capacity": $sum($sift($.fields,function($value,$key){$key~>/^field_/}).*),
"images": fields.n_images,
"host": tags.host,
"time": $fromMillis(timestamp*1000)
}
```

will result in

```json
{
"capacity": 93,
"images": 660,
"host": "raynor",
"time": "2016-03-17T15:39:00.000Z"
}
```

### Batch mode

When an output plugin emits multiple metrics in a batch fashion it might be usefull
to restructure and/or combine the metric elements. We will use the following input
example in this section

```json
{
"metrics": [
{
"fields": {
"field_1": 30,
"field_2": 4,
"field_N": 59,
"n_images": 660
},
"name": "docker",
"tags": {
"host": "raynor"
},
"timestamp": 1458229140
},
{
"fields": {
"field_1": 12,
"field_2": 43,
"field_3": 0,
"field_4": 5,
"field_5": 7,
"field_N": 27,
"n_images": 72
},
"name": "docker",
"tags": {
"host": "amaranth"
},
"timestamp": 1458229140
},
{
"fields": {
"field_1": 5,
"field_N": 34,
"n_images": 0
},
"name": "storage",
"tags": {
"host": "amaranth"
},
"timestamp": 1458229140
}
]
}
```

We can do the same computation as above, iterating over the metrics

```json
metrics.{
"capacity": $sum($sift($.fields,function($value,$key){$key~>/^field_/}).*),
"images": fields.n_images,
"service": (name & "(" & tags.host & ")"),
"time": $fromMillis(timestamp*1000)
}

```

resulting in

```json
[
{
"capacity": 93,
"images": 660,
"service": "docker(raynor)",
"time": "2016-03-17T15:39:00.000Z"
},
{
"capacity": 94,
"images": 72,
"service": "docker(amaranth)",
"time": "2016-03-17T15:39:00.000Z"
},
{
"capacity": 39,
"images": 0,
"service": "storage(amaranth)",
"time": "2016-03-17T15:39:00.000Z"
}
]
```

However, the more interesting use-case is to restructure and **combine** the metrics, e.g. by grouping by `host`

```json
{
"time": $min(metrics.timestamp) * 1000 ~> $fromMillis(),
"images": metrics{
tags.host: {
name: fields.n_images
}
},
"capacity alerts": metrics[fields.n_images < 10].[(tags.host & " " & name)]
}
```

resulting in

```json
{
"time": "2016-03-17T15:39:00.000Z",
"images": {
"raynor": {
"docker": 660
},
"amaranth": {
"docker": 72,
"storage": 0
}
},
"capacity alerts": [
"amaranth storage"
]
}
```

Please consult the JSONata documentation for more examples and details.
Loading

0 comments on commit 9f3a741

Please sign in to comment.