diff --git a/plugin/README.md b/plugin/README.md index e729888ee..b139ef470 100755 --- a/plugin/README.md +++ b/plugin/README.md @@ -852,6 +852,52 @@ pipelines: ## splunk It sends events to splunk. +By default it only stores original event under the "event" key according to the Splunk output format. + +If other fields are required it is possible to copy fields values from the original event to the other +fields relative to the output json. Copies are not allowed directly to the root of output event or +"event" field and any of its subfields. + +For example, timestamps and service name can be copied to provide additional meta data to the Splunk: + +```yaml +copy_fields: + - from: ts + to: time + - from: service + to: fields.service_name +``` + +Here the plugin will lookup for "ts" and "service" fields in the original event and if they are present +they will be copied to the output json starting on the same level as the "event" key. If the field is not +found in the original event plugin will not populate new field in output json. + +In: + +```json +{ + "ts":"1723651045", + "service":"some-service", + "message":"something happened" +} +``` + +Out: + +```json +{ + "event": { + "ts":"1723651045", + "service":"some-service", + "message":"something happened" + }, + "time": "1723651045", + "fields": { + "service_name": "some-service" + } +} +``` + [More details...](plugin/output/splunk/README.md) ## stdout It writes events to stdout(also known as console). diff --git a/plugin/output/README.md b/plugin/output/README.md index 9b05cc4ef..cf573edb1 100755 --- a/plugin/output/README.md +++ b/plugin/output/README.md @@ -129,6 +129,52 @@ pipelines: ## splunk It sends events to splunk. +By default it only stores original event under the "event" key according to the Splunk output format. + +If other fields are required it is possible to copy fields values from the original event to the other +fields relative to the output json. Copies are not allowed directly to the root of output event or +"event" field and any of its subfields. + +For example, timestamps and service name can be copied to provide additional meta data to the Splunk: + +```yaml +copy_fields: + - from: ts + to: time + - from: service + to: fields.service_name +``` + +Here the plugin will lookup for "ts" and "service" fields in the original event and if they are present +they will be copied to the output json starting on the same level as the "event" key. If the field is not +found in the original event plugin will not populate new field in output json. + +In: + +```json +{ + "ts":"1723651045", + "service":"some-service", + "message":"something happened" +} +``` + +Out: + +```json +{ + "event": { + "ts":"1723651045", + "service":"some-service", + "message":"something happened" + }, + "time": "1723651045", + "fields": { + "service_name": "some-service" + } +} +``` + [More details...](plugin/output/splunk/README.md) ## stdout It writes events to stdout(also known as console). diff --git a/plugin/output/splunk/README.md b/plugin/output/splunk/README.md index e20f6e57c..8883a90d9 100755 --- a/plugin/output/splunk/README.md +++ b/plugin/output/splunk/README.md @@ -1,6 +1,52 @@ # splunk HTTP Event Collector output It sends events to splunk. +By default it only stores original event under the "event" key according to the Splunk output format. + +If other fields are required it is possible to copy fields values from the original event to the other +fields relative to the output json. Copies are not allowed directly to the root of output event or +"event" field and any of its subfields. + +For example, timestamps and service name can be copied to provide additional meta data to the Splunk: + +```yaml +copy_fields: + - from: ts + to: time + - from: service + to: fields.service_name +``` + +Here the plugin will lookup for "ts" and "service" fields in the original event and if they are present +they will be copied to the output json starting on the same level as the "event" key. If the field is not +found in the original event plugin will not populate new field in output json. + +In: + +```json +{ + "ts":"1723651045", + "service":"some-service", + "message":"something happened" +} +``` + +Out: + +```json +{ + "event": { + "ts":"1723651045", + "service":"some-service", + "message":"something happened" + }, + "time": "1723651045", + "fields": { + "service_name": "some-service" + } +} +``` + ### Config params **`endpoint`** *`string`* *`required`* @@ -83,5 +129,15 @@ Multiplier for exponential increase of retention between retries
+**`copy_fields`** *`[]CopyField`* + +List of field paths copy `from` field in original event `to` field in output json. +To fields paths are relative to output json - one level higher since original +event is stored under the "event" key. Supports nested fields in both from and to. +Supports copying whole original event, but does not allow to copy directly to the output root +or the "event" key with any of its subkeys. + +
+
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* \ No newline at end of file diff --git a/plugin/output/splunk/splunk.go b/plugin/output/splunk/splunk.go index b8f34a2c3..980952e8c 100644 --- a/plugin/output/splunk/splunk.go +++ b/plugin/output/splunk/splunk.go @@ -7,6 +7,7 @@ import ( "fmt" "net/http" "strconv" + "strings" "time" "github.com/ozontech/file.d/cfg" @@ -22,6 +23,52 @@ import ( /*{ introduction It sends events to splunk. + +By default it only stores original event under the "event" key according to the Splunk output format. + +If other fields are required it is possible to copy fields values from the original event to the other +fields relative to the output json. Copies are not allowed directly to the root of output event or +"event" field and any of its subfields. + +For example, timestamps and service name can be copied to provide additional meta data to the Splunk: + +```yaml +copy_fields: + - from: ts + to: time + - from: service + to: fields.service_name +``` + +Here the plugin will lookup for "ts" and "service" fields in the original event and if they are present +they will be copied to the output json starting on the same level as the "event" key. If the field is not +found in the original event plugin will not populate new field in output json. + +In: + +```json +{ + "ts":"1723651045", + "service":"some-service", + "message":"something happened" +} +``` + +Out: + +```json +{ + "event": { + "ts":"1723651045", + "service":"some-service", + "message":"something happened" + }, + "time": "1723651045", + "fields": { + "service_name": "some-service" + } +} +``` }*/ const ( @@ -55,6 +102,11 @@ func (l gzipCompressionLevel) toFastHTTP() int { } } +type copyFieldPaths struct { + fromPath []string + toPath []string +} + type Plugin struct { config *Config @@ -62,6 +114,8 @@ type Plugin struct { endpoint *fasthttp.URI authHeader string + copyFieldsPaths []copyFieldPaths + logger *zap.SugaredLogger controller pipeline.OutputPluginController @@ -74,6 +128,11 @@ type Plugin struct { sendErrorMetric *prometheus.CounterVec } +type CopyField struct { + From string `json:"from"` + To string `json:"to"` +} + // ! config-params // ^ config-params type Config struct { @@ -151,6 +210,15 @@ type Config struct { // > // > Multiplier for exponential increase of retention between retries RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // * + + // > @3@4@5@6 + // > + // > List of field paths copy `from` field in original event `to` field in output json. + // > To fields paths are relative to output json - one level higher since original + // > event is stored under the "event" key. Supports nested fields in both from and to. + // > Supports copying whole original event, but does not allow to copy directly to the output root + // > or the "event" key with any of its subkeys. + CopyFields []CopyField `json:"copy_fields" slice:"true"` // * } type data struct { @@ -176,6 +244,22 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.registerMetrics(params.MetricCtl) p.prepareClient() + for _, cf := range p.config.CopyFields { + if cf.To == "" { + p.logger.Error("copies to the root are not allowed") + continue + } + if cf.To == "event" || strings.HasPrefix(cf.To, "event.") { + p.logger.Error("copies to the `event` field or any of its subfields are not allowed") + continue + } + cf := copyFieldPaths{ + fromPath: cfg.ParseFieldSelector(cf.From), + toPath: cfg.ParseFieldSelector(cf.To), + } + p.copyFieldsPaths = append(p.copyFieldsPaths, cf) + } + batcherOpts := pipeline.BatcherOptions{ PipelineName: params.PipelineName, OutputType: outPluginType, @@ -272,7 +356,16 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err outBuf := data.outBuf[:0] batch.ForEach(func(event *pipeline.Event) { + // "event" field is necessary, it always contains full event data root.AddField("event").MutateToNode(event.Root.Node) + // copy data from original event to other fields, like event's "ts" to outbuf's "time" + for _, cf := range p.copyFieldsPaths { + fieldVal := event.Root.Dig(cf.fromPath...) + if fieldVal == nil { + continue + } + pipeline.CreateNestedField(root, cf.toPath).MutateToNode(fieldVal) + } outBuf = root.Encode(outBuf) _ = root.DecodeString("{}") }) diff --git a/plugin/output/splunk/splunk_test.go b/plugin/output/splunk/splunk_test.go index b8d13684f..a40e0f6bb 100644 --- a/plugin/output/splunk/splunk_test.go +++ b/plugin/output/splunk/splunk_test.go @@ -6,6 +6,7 @@ import ( "net/http/httptest" "testing" + "github.com/ozontech/file.d/cfg" "github.com/ozontech/file.d/pipeline" "github.com/stretchr/testify/assert" "github.com/valyala/fasthttp" @@ -184,3 +185,97 @@ func TestParseSplunkError(t *testing.T) { }) } } + +func TestCopyFields(t *testing.T) { + suites := []struct { + name string + copyFields []copyFieldPaths + input string + expected string + }{ + { + `no_copy_fields`, + nil, + `{"msg":"AAAA","some_field":"BBBB"}`, + `{"event":{"msg":"AAAA","some_field":"BBBB"}}`, + }, + { + `copy_absent_field`, + []copyFieldPaths{{cfg.ParseFieldSelector("ts"), cfg.ParseFieldSelector("time")}}, + `{"msg":"AAAA","some_field":"BBBB"}`, + `{"event":{"msg":"AAAA","some_field":"BBBB"}}`, + }, + { + `copy_single_non_nested_field`, + []copyFieldPaths{{cfg.ParseFieldSelector("ts"), cfg.ParseFieldSelector("time")}}, + `{"msg":"AAAA","some_field":"BBBB","ts":"1723651045"}`, + `{"event":{"msg":"AAAA","some_field":"BBBB","ts":"1723651045"},"time":"1723651045"}`, + }, + { + `copy_single_field_to_nested_field`, + []copyFieldPaths{{cfg.ParseFieldSelector("service"), cfg.ParseFieldSelector("fields.service_name")}}, + `{"msg":"AAAA","some_field":"BBBB","service":"test-svc"}`, + `{"event":{"msg":"AAAA","some_field":"BBBB","service":"test-svc"},"fields":{"service_name":"test-svc"}}`, + }, + { + `copy_two_fields`, + []copyFieldPaths{ + {cfg.ParseFieldSelector("ts"), cfg.ParseFieldSelector("time")}, + {cfg.ParseFieldSelector("service"), cfg.ParseFieldSelector("fields.service_name")}, + }, + `{"msg":"AAAA","some_field":"BBBB","ts":"1723651045","service":"test-svc"}`, + `{"event":{"msg":"AAAA","some_field":"BBBB","ts":"1723651045","service":"test-svc"},"time":"1723651045","fields":{"service_name":"test-svc"}}`, + }, + { + `copy_root`, + []copyFieldPaths{ + {cfg.ParseFieldSelector(""), cfg.ParseFieldSelector("copy")}, + }, + `{"msg":"AAAA","some_field":"BBBB"}`, + `{"event":{"msg":"AAAA","some_field":"BBBB"},"copy":{"msg":"AAAA","some_field":"BBBB"}}`, + }, + } + + for _, tt := range suites { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + input, err := insaneJSON.DecodeBytes([]byte(tt.input)) + if err != nil { + t.Fatal(err) + } + defer insaneJSON.Release(input) + + var response []byte + testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + response, err = io.ReadAll(req.Body) + if err != nil { + t.Fatal(err) + } + res.WriteHeader(http.StatusOK) + _, _ = res.Write([]byte(`{"code":0}`)) + })) + defer testServer.Close() + + plugin := Plugin{ + config: &Config{ + Endpoint: testServer.URL, + }, + copyFieldsPaths: tt.copyFields, + logger: zap.NewExample().Sugar(), + } + plugin.prepareClient() + + batch := pipeline.NewPreparedBatch([]*pipeline.Event{ + {Root: input}, + {Root: input}, + }) + + data := pipeline.WorkerData(nil) + _ = plugin.out(&data, batch) + + assert.Equal(t, tt.expected+tt.expected, string(response)) + }) + } +}