From 38715e2262f66cff272d4ac37b3413280188af3e Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Thu, 19 Sep 2024 14:20:57 -0500 Subject: [PATCH] Add filebeat receiver and otel consumer output --- dev-tools/notice/overrides.json | 1 + go.mod | 21 ++- go.sum | 49 +++--- libbeat/cmd/instance/beat.go | 19 ++- libbeat/outputs/otelconsumer/config.go | 30 ++++ libbeat/outputs/otelconsumer/otelconsumer.go | 159 +++++++++++++++++++ libbeat/publisher/includes/includes.go | 1 + x-pack/filebeat/fbreceiver/config.go | 25 +++ x-pack/filebeat/fbreceiver/config_test.go | 40 +++++ x-pack/filebeat/fbreceiver/factory.go | 90 +++++++++++ x-pack/filebeat/fbreceiver/receiver.go | 30 ++++ x-pack/filebeat/fbreceiver/receiver_test.go | 63 ++++++++ 12 files changed, 497 insertions(+), 31 deletions(-) create mode 100644 libbeat/outputs/otelconsumer/config.go create mode 100644 libbeat/outputs/otelconsumer/otelconsumer.go create mode 100644 x-pack/filebeat/fbreceiver/config.go create mode 100644 x-pack/filebeat/fbreceiver/config_test.go create mode 100644 x-pack/filebeat/fbreceiver/factory.go create mode 100644 x-pack/filebeat/fbreceiver/receiver.go create mode 100644 x-pack/filebeat/fbreceiver/receiver_test.go diff --git a/dev-tools/notice/overrides.json b/dev-tools/notice/overrides.json index eee18acc0de..bb82c97ebe4 100644 --- a/dev-tools/notice/overrides.json +++ b/dev-tools/notice/overrides.json @@ -18,3 +18,4 @@ {"name": "github.com/dnaeon/go-vcr", "licenceType": "BSD-2-Clause"} {"name": "github.com/JohnCGriffin/overflow", "licenceType": "MIT"} {"name": "github.com/elastic/ebpfevents", "licenceType": "Apache-2.0"} +{"name": "go.opentelemetry.io/collector/config/configopaque", "licenceType": "Apache-2.0"} diff --git a/go.mod b/go.mod index cf0290a192c..8567300014e 100644 --- a/go.mod +++ b/go.mod @@ -194,7 +194,7 @@ require ( github.com/elastic/bayeux v1.0.5 github.com/elastic/ebpfevents v0.6.0 github.com/elastic/elastic-agent-autodiscover v0.8.2 - github.com/elastic/elastic-agent-libs v0.10.0 + github.com/elastic/elastic-agent-libs v0.11.0 github.com/elastic/elastic-agent-system-metrics v0.11.1 github.com/elastic/go-elasticsearch/v8 v8.14.0 github.com/elastic/go-sfdc v0.0.0-20240621062639-bcc8456508ff @@ -222,7 +222,10 @@ require ( go.elastic.co/apm/module/apmhttp/v2 v2.6.0 go.elastic.co/apm/v2 v2.6.0 go.mongodb.org/mongo-driver v1.5.1 - go.opentelemetry.io/collector/consumer v0.107.0 + go.opentelemetry.io/collector/component v0.109.0 + go.opentelemetry.io/collector/consumer v0.109.0 + go.opentelemetry.io/collector/pdata v1.15.0 + go.opentelemetry.io/collector/receiver v0.109.0 google.golang.org/genproto/googleapis/api v0.0.0-20240725223205-93522f1f2a9f gopkg.in/natefinch/lumberjack.v2 v2.2.1 ) @@ -355,7 +358,7 @@ require ( github.com/pierrec/lz4 v2.6.0+incompatible // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect - github.com/prometheus/client_golang v1.20.0 // indirect + github.com/prometheus/client_golang v1.20.2 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/sergi/go-diff v1.3.1 // indirect github.com/sirupsen/logrus v1.9.3 // indirect @@ -369,19 +372,21 @@ require ( github.com/zeebo/xxh3 v1.0.2 // indirect go.elastic.co/fastjson v1.1.0 // indirect go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/collector/pdata v1.13.0 // indirect + go.opentelemetry.io/collector/config/configtelemetry v0.109.0 // indirect + go.opentelemetry.io/collector/consumer/consumerprofiles v0.109.0 // indirect + go.opentelemetry.io/collector/pdata/pprofile v0.109.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect - go.opentelemetry.io/otel v1.28.0 // indirect + go.opentelemetry.io/otel v1.29.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect - go.opentelemetry.io/otel/metric v1.28.0 // indirect - go.opentelemetry.io/otel/trace v1.28.0 // indirect + go.opentelemetry.io/otel/metric v1.29.0 // indirect + go.opentelemetry.io/otel/trace v1.29.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/ratelimit v0.3.1 // indirect golang.org/x/exp v0.0.0-20240205201215-2c58cdc269a3 // indirect golang.org/x/term v0.24.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/klog/v2 v2.110.1 // indirect k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect diff --git a/go.sum b/go.sum index e5215a0f1bc..e6368534f8b 100644 --- a/go.sum +++ b/go.sum @@ -562,8 +562,8 @@ github.com/elastic/elastic-agent-autodiscover v0.8.2 h1:Fs2FhR33AMBPfm5/jz4drVza github.com/elastic/elastic-agent-autodiscover v0.8.2/go.mod h1:VZnU53EVaFTxR8Xf6YsLN8FHD5DKQzHSPlKax9/4w+o= github.com/elastic/elastic-agent-client/v7 v7.15.0 h1:nDB7v8TBoNuD6IIzC3z7Q0y+7bMgXoT2DsHfolO2CHE= github.com/elastic/elastic-agent-client/v7 v7.15.0/go.mod h1:6h+f9QdIr3GO2ODC0Y8+aEXRwzbA5W4eV4dd/67z7nI= -github.com/elastic/elastic-agent-libs v0.10.0 h1:W7uvay0UYdLPtauXGsMD8Xfoe4qtcVWQR4icBgf/26Q= -github.com/elastic/elastic-agent-libs v0.10.0/go.mod h1:2VgYxHaeM+cCDBjiS2wbmTvzPGbnlXAtYrlcLefheS8= +github.com/elastic/elastic-agent-libs v0.11.0 h1:m9rnNE3BkBF2XJoqubqEbu/kbtKEBZ7pHCjDlxfVRH0= +github.com/elastic/elastic-agent-libs v0.11.0/go.mod h1:5CR02awPrBr+tfmjBBK+JI+dMmHNQjpVY24J0wjbC7M= github.com/elastic/elastic-agent-system-metrics v0.11.1 h1:BxViQHnqxvvi/65rj3mGwG6Eto6ldFCTnuDTUJnakaU= github.com/elastic/elastic-agent-system-metrics v0.11.1/go.mod h1:3QiMu9wTKJFvpCN+5klgGqasTMNKJbgY3xcoN1KQXJk= github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA= @@ -1479,8 +1479,8 @@ github.com/prometheus/client_golang v1.5.1/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3O github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_golang v1.10.0/go.mod h1:WJM3cc3yu7XKBKa/I8WeZm+V3eltZnBwfENSU7mdogU= github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= -github.com/prometheus/client_golang v1.20.0 h1:jBzTZ7B099Rg24tny+qngoynol8LtVYlA2bqx3vEloI= -github.com/prometheus/client_golang v1.20.0/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= +github.com/prometheus/client_golang v1.20.2 h1:5ctymQzZlyOON1666svgwn3s6IKWgfbjsejTMiXIyjg= +github.com/prometheus/client_golang v1.20.2/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -1736,26 +1736,39 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= -go.opentelemetry.io/collector/consumer v0.107.0 h1:fF/+xyv9BfXQUvuJqkljrpzKyBQExDQt6zB5rzGyuHs= -go.opentelemetry.io/collector/consumer v0.107.0/go.mod h1:wgWpFes9sbnZ11XeJPSeutU8GJx6dT/gzSUqHpaZZQA= -go.opentelemetry.io/collector/pdata v1.13.0 h1:eV3NQt2f1UcaibkziMvGTQI34LlpiYBUGp1yP0G/Cxw= -go.opentelemetry.io/collector/pdata v1.13.0/go.mod h1:MYeB0MmMAxeM0hstCFrCqWLzdyeYySim2dG6pDT6nYI= +go.opentelemetry.io/collector v0.109.0 h1:ULnMWuwcy4ix1oP5RFFRcmpEbaU5YabW6nWcLMQQRo0= +go.opentelemetry.io/collector/component v0.109.0 h1:AU6eubP1htO8Fvm86uWn66Kw0DMSFhgcRM2cZZTYfII= +go.opentelemetry.io/collector/component v0.109.0/go.mod h1:jRVFY86GY6JZ61SXvUN69n7CZoTjDTqWyNC+wJJvzOw= +go.opentelemetry.io/collector/config/configtelemetry v0.109.0 h1:ItbYw3tgFMU+TqGcDVEOqJLKbbOpfQg3AHD8b22ygl8= +go.opentelemetry.io/collector/config/configtelemetry v0.109.0/go.mod h1:R0MBUxjSMVMIhljuDHWIygzzJWQyZHXXWIgQNxcFwhc= +go.opentelemetry.io/collector/consumer v0.109.0 h1:fdXlJi5Rat/poHPiznM2mLiXjcv1gPy3fyqqeirri58= +go.opentelemetry.io/collector/consumer v0.109.0/go.mod h1:E7PZHnVe1DY9hYy37toNxr9/hnsO7+LmnsixW8akLQI= +go.opentelemetry.io/collector/consumer/consumerprofiles v0.109.0 h1:+WZ6MEWQRC6so3IRrW916XK58rI9NnrFHKW/P19jQvc= +go.opentelemetry.io/collector/consumer/consumerprofiles v0.109.0/go.mod h1:spZ9Dn1MRMPDHHThdXZA5TrFhdOL1wsl0Dw45EBVoVo= +go.opentelemetry.io/collector/consumer/consumertest v0.109.0 h1:v4w9G2MXGJ/eabCmX1DvQYmxzdysC8UqIxa/BWz7ACo= +go.opentelemetry.io/collector/consumer/consumertest v0.109.0/go.mod h1:lECt0qOrx118wLJbGijtqNz855XfvJv0xx9GSoJ8qSE= +go.opentelemetry.io/collector/pdata v1.15.0 h1:q/T1sFpRKJnjDrUsHdJ6mq4uSqViR/f92yvGwDby/gY= +go.opentelemetry.io/collector/pdata v1.15.0/go.mod h1:2wcsTIiLAJSbqBq/XUUYbi+cP+N87d0jEJzmb9nT19U= +go.opentelemetry.io/collector/pdata/pprofile v0.109.0 h1:5lobQKeHk8p4WC7KYbzL6ZqqX3eSizsdmp5vM8pQFBs= +go.opentelemetry.io/collector/pdata/pprofile v0.109.0/go.mod h1:lXIifCdtR5ewO17JAYTUsclMqRp6h6dCowoXHhGyw8Y= +go.opentelemetry.io/collector/receiver v0.109.0 h1:DTOM7xaDl7FUGQIjvjmWZn03JUE+aG4mJzWWfb7S8zw= +go.opentelemetry.io/collector/receiver v0.109.0/go.mod h1:jeiCHaf3PE6aXoZfHF5Uexg7aztu+Vkn9LVw0YDKm6g= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 h1:4Pp6oUg3+e/6M4C0A/3kJ2VYa++dsWVTtGgLVj5xtHg= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0/go.mod h1:Mjt1i1INqiaoZOMGR1RIUJN+i3ChKoFRqzrRQhlkbs0= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 h1:4K4tsIXefpVJtvA/8srF4V4y0akAoPHkIslgAkjixJA= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0/go.mod h1:jjdQuTGVsXV4vSs+CJ2qYDeDPf9yIJV23qlIzBm73Vg= -go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= -go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= +go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw= +go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 h1:3Q/xZUyC1BBkualc9ROb4G8qkH90LXEIICcs5zv1OYY= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0/go.mod h1:s75jGIWA9OfCMzF0xr+ZgfrB5FEbbV7UuYo32ahUiFI= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0 h1:j9+03ymgYhPKmeXGk5Zu+cIZOlVzd9Zv7QIiyItjFBU= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0/go.mod h1:Y5+XiUG4Emn1hTfciPzGPJaSI+RpDts6BnCIir0SLqk= -go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= -go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= -go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE= -go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg= -go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= -go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= +go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc= +go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8= +go.opentelemetry.io/otel/sdk v1.29.0 h1:vkqKjk7gwhS8VaWb0POZKmIEDimRCMsopNYnriHyryo= +go.opentelemetry.io/otel/sdk v1.29.0/go.mod h1:pM8Dx5WKnvxLCb+8lG1PRNIDxu9g9b9g59Qr7hfAAok= +go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4= +go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= @@ -2365,8 +2378,8 @@ google.golang.org/genproto v0.0.0-20240730163845-b1a4ccb954bf h1:OqdXDEakZCVtDiZ google.golang.org/genproto v0.0.0-20240730163845-b1a4ccb954bf/go.mod h1:mCr1K1c8kX+1iSBREvU3Juo11CB+QOEWxbRS01wWl5M= google.golang.org/genproto/googleapis/api v0.0.0-20240725223205-93522f1f2a9f h1:b1Ln/PG8orm0SsBbHZWke8dDp2lrCD4jSmfglFpTZbk= google.golang.org/genproto/googleapis/api v0.0.0-20240725223205-93522f1f2a9f/go.mod h1:AHT0dDg3SoMOgZGnZk29b5xTbPHMoEC8qthmBLJCpys= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf h1:liao9UHurZLtiEwBgT9LMOnKYsHze6eA6w1KQCMVN2Q= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd h1:6TEm2ZxXoQmFWFlt1vNxvVOa1Q0dXFQD1m/rYjXmS0E= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.0/go.mod h1:chYK+tFQF0nDUGJgXMSgLCQk3phJEuONr2DCgLDdAQM= diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index e29d94c67af..df3a71416b6 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -41,6 +41,7 @@ import ( "github.com/gofrs/uuid/v5" "go.opentelemetry.io/collector/consumer" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "github.com/elastic/beats/v7/libbeat/api" "github.com/elastic/beats/v7/libbeat/asset" @@ -287,7 +288,7 @@ func NewBeat(name, indexPrefix, v string, elasticLicensed bool, initFuncs []func } // NewBeatReceiver creates a Beat that will be used in the context of an otel receiver -func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, consumer consumer.Logs) (*Beat, error) { +func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, consumer consumer.Logs, core zapcore.Core) (*Beat, error) { b, err := NewBeat(settings.Name, settings.IndexPrefix, settings.Version, @@ -355,6 +356,18 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c return nil, fmt.Errorf("error unpacking config data: %w", err) } + logpConfig := logp.Config{} + logpConfig.Beat = b.Info.Name + logpConfig.Files.MaxSize = 1 + + if err := b.Config.Logging.Unpack(&logpConfig); err != nil { + return nil, fmt.Errorf("error unpacking beats logging config: %w\n%v", err, b.Config.Logging) + } + + if err := logp.ConfigureWithCore(logpConfig, core); err != nil { + return nil, fmt.Errorf("error configuring beats logp: %w", err) + } + if err := promoteOutputQueueSettings(&b.Config); err != nil { return nil, fmt.Errorf("could not promote output queue settings: %w", err) } @@ -374,10 +387,6 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c return nil, fmt.Errorf("error setting timestamp precision: %w", err) } - if err := configure.LoggingWithTypedOutputs(b.Info.Beat, b.Config.Logging, b.Config.EventLogging, logp.TypeKey, logp.EventType); err != nil { - return nil, fmt.Errorf("error initializing logging: %w", err) - } - // log paths values to help with troubleshooting logp.Info(paths.Paths.String()) diff --git a/libbeat/outputs/otelconsumer/config.go b/libbeat/outputs/otelconsumer/config.go new file mode 100644 index 00000000000..d8281c1aedb --- /dev/null +++ b/libbeat/outputs/otelconsumer/config.go @@ -0,0 +1,30 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package otelconsumer + +import ( + "github.com/elastic/elastic-agent-libs/config" +) + +type otelConsumerConfig struct { + Queue config.Namespace `config:"queue"` +} + +func defaultConfig() otelConsumerConfig { + return otelConsumerConfig{} +} diff --git a/libbeat/outputs/otelconsumer/otelconsumer.go b/libbeat/outputs/otelconsumer/otelconsumer.go new file mode 100644 index 00000000000..f75dc8e8a8b --- /dev/null +++ b/libbeat/outputs/otelconsumer/otelconsumer.go @@ -0,0 +1,159 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package otelconsumer + +import ( + "context" + "fmt" + "time" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/mapstr" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" +) + +func init() { + outputs.RegisterType("otelconsumer", makeOtelConsumer) +} + +type otelConsumer struct { + observer outputs.Observer + logsConsumer consumer.Logs + beatInfo beat.Info +} + +func makeOtelConsumer(_ outputs.IndexManager, beat beat.Info, observer outputs.Observer, cfg *config.C) (outputs.Group, error) { + + out := &otelConsumer{ + observer: observer, + logsConsumer: beat.LogConsumer, + beatInfo: beat, + } + + ocConfig := defaultConfig() + if err := cfg.Unpack(&ocConfig); err != nil { + return outputs.Fail(err) + } + return outputs.Success(ocConfig.Queue, -1, 0, nil, out) +} + +// Close is a noop for otelconsumer +func (out *otelConsumer) Close() error { + return nil +} + +// Publish converts Beat events to Otel format and send to the next otel consumer +func (out *otelConsumer) Publish(ctx context.Context, batch publisher.Batch) error { + switch { + case out.logsConsumer != nil: + return out.logsPublish(ctx, batch) + default: + panic(fmt.Errorf("an otel consumer must be specified")) + } +} + +func (out *otelConsumer) logsPublish(_ context.Context, batch publisher.Batch) error { + defer batch.ACK() + st := out.observer + pLogs := plog.NewLogs() + resourceLogs := pLogs.ResourceLogs().AppendEmpty() + sourceLogs := resourceLogs.ScopeLogs().AppendEmpty() + logRecords := sourceLogs.LogRecords() + + events := batch.Events() + for _, event := range events { + logRecord := logRecords.AppendEmpty() + meta := event.Content.Meta.Clone() + meta["beat"] = out.beatInfo.Beat + meta["version"] = out.beatInfo.Version + meta["type"] = "_doc" + + beatEvent := event.Content.Fields.Clone() + beatEvent["@timestamp"] = event.Content.Timestamp + beatEvent["@metadata"] = meta + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(event.Content.Timestamp)) + pcommonEvent := mapstrToPcommonMap(beatEvent) + pcommonEvent.CopyTo(logRecord.Body().SetEmptyMap()) + } + + if err := out.logsConsumer.ConsumeLogs(context.TODO(), pLogs); err != nil { + return fmt.Errorf("error otel log consumer: %w", err) + } + + st.NewBatch(len(events)) + st.AckedEvents(len(events)) + return nil +} + +func (out *otelConsumer) String() string { + return "otelconsumer" +} + +// mapstrToPcommonMap is necessary to convert from Beats mapstr to +// Otel Map. This step could be avoided if we choose to encode the +// Body as a slice of bytes. +func mapstrToPcommonMap(m mapstr.M) pcommon.Map { + out := pcommon.NewMap() + for k, v := range m { + switch x := v.(type) { + case string: + out.PutStr(k, x) + case int: + out.PutInt(k, int64(v.(int))) + case int8: + out.PutInt(k, int64(v.(int8))) + case int16: + out.PutInt(k, int64(v.(int16))) + case int32: + out.PutInt(k, int64(v.(int32))) + case int64: + out.PutInt(k, v.(int64)) + case uint: + out.PutInt(k, int64(v.(uint))) + case uint8: + out.PutInt(k, int64(v.(uint8))) + case uint16: + out.PutInt(k, int64(v.(uint16))) + case uint32: + out.PutInt(k, int64(v.(uint32))) + case uint64: + out.PutInt(k, int64(v.(uint64))) + case float32: + out.PutDouble(k, float64(v.(float32))) + case float64: + out.PutDouble(k, v.(float64)) + case bool: + out.PutBool(k, x) + case mapstr.M: + dest := out.PutEmptyMap(k) + newMap := mapstrToPcommonMap(x) + newMap.CopyTo(dest) + case time.Time: + out.PutInt(k, x.UnixMilli()) + default: + out.PutStr(k, fmt.Sprintf("unknown type: %T", x)) + } + } + return out +} diff --git a/libbeat/publisher/includes/includes.go b/libbeat/publisher/includes/includes.go index ccb69d8e475..10c1d01c602 100644 --- a/libbeat/publisher/includes/includes.go +++ b/libbeat/publisher/includes/includes.go @@ -27,6 +27,7 @@ import ( _ "github.com/elastic/beats/v7/libbeat/outputs/fileout" _ "github.com/elastic/beats/v7/libbeat/outputs/kafka" _ "github.com/elastic/beats/v7/libbeat/outputs/logstash" + _ "github.com/elastic/beats/v7/libbeat/outputs/otelconsumer" _ "github.com/elastic/beats/v7/libbeat/outputs/redis" _ "github.com/elastic/beats/v7/libbeat/publisher/queue/diskqueue" _ "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" diff --git a/x-pack/filebeat/fbreceiver/config.go b/x-pack/filebeat/fbreceiver/config.go new file mode 100644 index 00000000000..13a52fa502e --- /dev/null +++ b/x-pack/filebeat/fbreceiver/config.go @@ -0,0 +1,25 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package fbreceiver + +import "fmt" + +// Config is config settings for filebeat receiver. The structure of +// which is the same as the filebeat.yml configuration file. +type Config struct { + Beatconfig map[string]interface{} `mapstructure:",remain"` +} + +// Validate checks if the configuration in valid +func (cfg *Config) Validate() error { + if len(cfg.Beatconfig) == 0 { + return fmt.Errorf("NO config sent") + } + _, prs := cfg.Beatconfig["filebeat"] + if !prs { + return fmt.Errorf("'filebeat' key is required") + } + return nil +} diff --git a/x-pack/filebeat/fbreceiver/config_test.go b/x-pack/filebeat/fbreceiver/config_test.go new file mode 100644 index 00000000000..9f9263886d0 --- /dev/null +++ b/x-pack/filebeat/fbreceiver/config_test.go @@ -0,0 +1,40 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package fbreceiver + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestValidate(t *testing.T) { + tests := map[string]struct { + c *Config + hasError bool + errorString string + }{ + "No Items": { + c: &Config{Beatconfig: map[string]interface{}{}}, + hasError: true, + errorString: "NO config sent", + }, + "Valid config": { + c: &Config{Beatconfig: map[string]interface{}{"filebeat": map[string]interface{}{}}}, + hasError: false, + errorString: "", + }, + } + for name, tc := range tests { + err := tc.c.Validate() + if tc.hasError { + assert.NotNil(t, err, name) + assert.Equal(t, err.Error(), tc.errorString, name) + } + if !tc.hasError { + assert.Nil(t, err, name) + } + } +} diff --git a/x-pack/filebeat/fbreceiver/factory.go b/x-pack/filebeat/fbreceiver/factory.go new file mode 100644 index 00000000000..8bc6f872df7 --- /dev/null +++ b/x-pack/filebeat/fbreceiver/factory.go @@ -0,0 +1,90 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package fbreceiver + +import ( + "context" + "fmt" + + "github.com/elastic/beats/v7/filebeat/beater" + "github.com/elastic/beats/v7/filebeat/cmd" + "github.com/elastic/beats/v7/libbeat/cmd/instance" + "github.com/elastic/beats/v7/libbeat/processors" + "github.com/elastic/beats/v7/libbeat/publisher/processing" + "github.com/elastic/beats/v7/x-pack/filebeat/include" + inputs "github.com/elastic/beats/v7/x-pack/filebeat/input/default-inputs" + "github.com/elastic/elastic-agent-libs/mapstr" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/receiver" +) + +const ( + Name = "filebeatreceiver" +) + +func createDefaultConfig() component.Config { + return &Config{} +} + +func createReceiver(_ context.Context, set receiver.Settings, baseCfg component.Config, consumer consumer.Logs) (receiver.Logs, error) { + cfg := baseCfg.(*Config) + + settings := cmd.FilebeatSettings(Name) + globalProcs, err := processors.NewPluginConfigFromList(defaultProcessors()) + if err != nil { + return nil, fmt.Errorf("error making global processors: %w", err) + } + settings.Processing = processing.MakeDefaultSupport(true, globalProcs, processing.WithECS, processing.WithHost, processing.WithAgentMeta()) + settings.ElasticLicensed = true + settings.Initialize = append(settings.Initialize, include.InitializeModule) + + b, err := instance.NewBeatReceiver(settings, cfg.Beatconfig, consumer, set.Logger.Core()) + if err != nil { + return nil, fmt.Errorf("error creating %s: %w", Name, err) + } + + beatCreator := beater.New(inputs.Init) + + beatConfig, err := b.BeatConfig() + if err != nil { + return nil, fmt.Errorf("error getting beat config: %w", err) + } + + fbBeater, err := beatCreator(&b.Beat, beatConfig) + if err != nil { + return nil, fmt.Errorf("error getting %s creator:%w", Name, err) + } + + return &filebeatReceiver{beat: &b.Beat, beater: fbBeater}, nil +} + +func defaultProcessors() []mapstr.M { + // processors: + // - add_host_metadata: + // when.not.contains.tags: forwarded + // - add_cloud_metadata: ~ + // - add_docker_metadata: ~ + // - add_kubernetes_metadata: ~ + + return []mapstr.M{ + { + "add_host_metadata": mapstr.M{ + "when.not.contains.tags": "forwarded", + }, + }, + {"add_cloud_metadata": nil}, + {"add_docker_metadata": nil}, + {"add_kubernetes_metadata": nil}, + } +} + +func NewFactory() receiver.Factory { + return receiver.NewFactory( + component.MustNewType(Name), + createDefaultConfig, + receiver.WithLogs(createReceiver, component.StabilityLevelAlpha)) +} diff --git a/x-pack/filebeat/fbreceiver/receiver.go b/x-pack/filebeat/fbreceiver/receiver.go new file mode 100644 index 00000000000..a2e180ccc80 --- /dev/null +++ b/x-pack/filebeat/fbreceiver/receiver.go @@ -0,0 +1,30 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package fbreceiver + +import ( + "context" + + "github.com/elastic/beats/v7/libbeat/beat" + + "go.opentelemetry.io/collector/component" +) + +type filebeatReceiver struct { + beat *beat.Beat + beater beat.Beater +} + +func (fb *filebeatReceiver) Start(ctx context.Context, host component.Host) error { + go func() { + fb.beater.Run(fb.beat) + }() + return nil +} + +func (fb *filebeatReceiver) Shutdown(ctx context.Context) error { + fb.beater.Stop() + return nil +} diff --git a/x-pack/filebeat/fbreceiver/receiver_test.go b/x-pack/filebeat/fbreceiver/receiver_test.go new file mode 100644 index 00000000000..5822b6f5f07 --- /dev/null +++ b/x-pack/filebeat/fbreceiver/receiver_test.go @@ -0,0 +1,63 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package fbreceiver + +import ( + "bytes" + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/receiver" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +func TestNewReceiver(t *testing.T) { + config := Config{ + Beatconfig: map[string]interface{}{ + "filebeat": map[string]interface{}{ + "inputs": []map[string]interface{}{ + { + "type": "benchmark", + "enabled": true, + "message": "test", + "count": 1, + }, + }, + }, + "output": map[string]interface{}{ + "otelconsumer": map[string]interface{}{}, + }, + "logging": map[string]interface{}{ + "level": "debug", + }, + "path.home": t.TempDir(), + }, + } + var zapLogs bytes.Buffer + core := zapcore.NewCore( + zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), + zapcore.AddSync(&zapLogs), + zapcore.InfoLevel) + + receiverSettings := receiver.Settings{} + receiverSettings.Logger = zap.New(core) + + var countLogs int + logConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { + countLogs = countLogs + ld.LogRecordCount() + return nil + }) + + r, err := createReceiver(context.Background(), receiverSettings, &config, logConsumer) + assert.NoErrorf(t, err, "Error creating receiver. Logs:\n %s", zapLogs.String()) + r.Start(context.Background(), nil) + assert.Eventuallyf(t, func() bool { return countLogs > 0 }, 60*time.Second, 1*time.Second, "consumed logs didn't increase\nCount: %d\nLogs: %s\n", countLogs, zapLogs.String()) + r.Shutdown(context.Background()) +}