Skip to content

Commit

Permalink
Add copy fields feat to output splunk plugin (#668)
Browse files Browse the repository at this point in the history
* Add copy fields feat in output splunk plugin

* Update docs

* Disable govet printf due to false positives
  • Loading branch information
HeadHunter483 committed Sep 9, 2024
1 parent e21ef33 commit cb0b804
Show file tree
Hide file tree
Showing 5 changed files with 336 additions and 0 deletions.
46 changes: 46 additions & 0 deletions plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,52 @@ pipelines:
## splunk
It sends events to splunk.

By default it only stores original event under the "event" key according to the Splunk output format.

If other fields are required it is possible to copy fields values from the original event to the other
fields relative to the output json. Copies are not allowed directly to the root of output event or
"event" field and any of its subfields.

For example, timestamps and service name can be copied to provide additional meta data to the Splunk:

```yaml
copy_fields:
- from: ts
to: time
- from: service
to: fields.service_name
```

Here the plugin will lookup for "ts" and "service" fields in the original event and if they are present
they will be copied to the output json starting on the same level as the "event" key. If the field is not
found in the original event plugin will not populate new field in output json.

In:

```json
{
"ts":"1723651045",
"service":"some-service",
"message":"something happened"
}
```

Out:

```json
{
"event": {
"ts":"1723651045",
"service":"some-service",
"message":"something happened"
},
"time": "1723651045",
"fields": {
"service_name": "some-service"
}
}
```

[More details...](plugin/output/splunk/README.md)
## stdout
It writes events to stdout(also known as console).
Expand Down
46 changes: 46 additions & 0 deletions plugin/output/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,52 @@ pipelines:
## splunk
It sends events to splunk.
By default it only stores original event under the "event" key according to the Splunk output format.
If other fields are required it is possible to copy fields values from the original event to the other
fields relative to the output json. Copies are not allowed directly to the root of output event or
"event" field and any of its subfields.
For example, timestamps and service name can be copied to provide additional meta data to the Splunk:
```yaml
copy_fields:
- from: ts
to: time
- from: service
to: fields.service_name
```
Here the plugin will lookup for "ts" and "service" fields in the original event and if they are present
they will be copied to the output json starting on the same level as the "event" key. If the field is not
found in the original event plugin will not populate new field in output json.
In:
```json
{
"ts":"1723651045",
"service":"some-service",
"message":"something happened"
}
```

Out:

```json
{
"event": {
"ts":"1723651045",
"service":"some-service",
"message":"something happened"
},
"time": "1723651045",
"fields": {
"service_name": "some-service"
}
}
```

[More details...](plugin/output/splunk/README.md)
## stdout
It writes events to stdout(also known as console).
Expand Down
56 changes: 56 additions & 0 deletions plugin/output/splunk/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,52 @@
# splunk HTTP Event Collector output
It sends events to splunk.

By default it only stores original event under the "event" key according to the Splunk output format.

If other fields are required it is possible to copy fields values from the original event to the other
fields relative to the output json. Copies are not allowed directly to the root of output event or
"event" field and any of its subfields.

For example, timestamps and service name can be copied to provide additional meta data to the Splunk:

```yaml
copy_fields:
- from: ts
to: time
- from: service
to: fields.service_name
```
Here the plugin will lookup for "ts" and "service" fields in the original event and if they are present
they will be copied to the output json starting on the same level as the "event" key. If the field is not
found in the original event plugin will not populate new field in output json.
In:
```json
{
"ts":"1723651045",
"service":"some-service",
"message":"something happened"
}
```

Out:

```json
{
"event": {
"ts":"1723651045",
"service":"some-service",
"message":"something happened"
},
"time": "1723651045",
"fields": {
"service_name": "some-service"
}
}
```

### Config params
**`endpoint`** *`string`* *`required`*

Expand Down Expand Up @@ -83,5 +129,15 @@ Multiplier for exponential increase of retention between retries

<br>

**`copy_fields`** *`[]CopyField`*

List of field paths copy `from` field in original event `to` field in output json.
To fields paths are relative to output json - one level higher since original
event is stored under the "event" key. Supports nested fields in both from and to.
Supports copying whole original event, but does not allow to copy directly to the output root
or the "event" key with any of its subkeys.

<br>


<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
93 changes: 93 additions & 0 deletions plugin/output/splunk/splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net/http"
"strconv"
"strings"
"time"

"github.com/ozontech/file.d/cfg"
Expand All @@ -22,6 +23,52 @@ import (

/*{ introduction
It sends events to splunk.
By default it only stores original event under the "event" key according to the Splunk output format.
If other fields are required it is possible to copy fields values from the original event to the other
fields relative to the output json. Copies are not allowed directly to the root of output event or
"event" field and any of its subfields.
For example, timestamps and service name can be copied to provide additional meta data to the Splunk:
```yaml
copy_fields:
- from: ts
to: time
- from: service
to: fields.service_name
```
Here the plugin will lookup for "ts" and "service" fields in the original event and if they are present
they will be copied to the output json starting on the same level as the "event" key. If the field is not
found in the original event plugin will not populate new field in output json.
In:
```json
{
"ts":"1723651045",
"service":"some-service",
"message":"something happened"
}
```
Out:
```json
{
"event": {
"ts":"1723651045",
"service":"some-service",
"message":"something happened"
},
"time": "1723651045",
"fields": {
"service_name": "some-service"
}
}
```
}*/

const (
Expand Down Expand Up @@ -55,13 +102,20 @@ func (l gzipCompressionLevel) toFastHTTP() int {
}
}

type copyFieldPaths struct {
fromPath []string
toPath []string
}

type Plugin struct {
config *Config

client *fasthttp.Client
endpoint *fasthttp.URI
authHeader string

copyFieldsPaths []copyFieldPaths

logger *zap.SugaredLogger
controller pipeline.OutputPluginController

Expand All @@ -74,6 +128,11 @@ type Plugin struct {
sendErrorMetric *prometheus.CounterVec
}

type CopyField struct {
From string `json:"from"`
To string `json:"to"`
}

// ! config-params
// ^ config-params
type Config struct {
Expand Down Expand Up @@ -151,6 +210,15 @@ type Config struct {
// >
// > Multiplier for exponential increase of retention between retries
RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // *

// > @3@4@5@6
// >
// > List of field paths copy `from` field in original event `to` field in output json.
// > To fields paths are relative to output json - one level higher since original
// > event is stored under the "event" key. Supports nested fields in both from and to.
// > Supports copying whole original event, but does not allow to copy directly to the output root
// > or the "event" key with any of its subkeys.
CopyFields []CopyField `json:"copy_fields" slice:"true"` // *
}

type data struct {
Expand All @@ -176,6 +244,22 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.registerMetrics(params.MetricCtl)
p.prepareClient()

for _, cf := range p.config.CopyFields {
if cf.To == "" {
p.logger.Error("copies to the root are not allowed")
continue
}
if cf.To == "event" || strings.HasPrefix(cf.To, "event.") {
p.logger.Error("copies to the `event` field or any of its subfields are not allowed")
continue
}
cf := copyFieldPaths{
fromPath: cfg.ParseFieldSelector(cf.From),
toPath: cfg.ParseFieldSelector(cf.To),
}
p.copyFieldsPaths = append(p.copyFieldsPaths, cf)
}

batcherOpts := pipeline.BatcherOptions{
PipelineName: params.PipelineName,
OutputType: outPluginType,
Expand Down Expand Up @@ -272,7 +356,16 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err
outBuf := data.outBuf[:0]

batch.ForEach(func(event *pipeline.Event) {
// "event" field is necessary, it always contains full event data
root.AddField("event").MutateToNode(event.Root.Node)
// copy data from original event to other fields, like event's "ts" to outbuf's "time"
for _, cf := range p.copyFieldsPaths {
fieldVal := event.Root.Dig(cf.fromPath...)
if fieldVal == nil {
continue
}
pipeline.CreateNestedField(root, cf.toPath).MutateToNode(fieldVal)
}
outBuf = root.Encode(outBuf)
_ = root.DecodeString("{}")
})
Expand Down
Loading

0 comments on commit cb0b804

Please sign in to comment.