Skip to content

Commit

Permalink
Add filebeat receiver and otel consumer output
Browse files Browse the repository at this point in the history
  • Loading branch information
leehinman committed Sep 25, 2024
1 parent dbbfb5b commit 38715e2
Show file tree
Hide file tree
Showing 12 changed files with 497 additions and 31 deletions.
1 change: 1 addition & 0 deletions dev-tools/notice/overrides.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@
{"name": "github.com/dnaeon/go-vcr", "licenceType": "BSD-2-Clause"}
{"name": "github.com/JohnCGriffin/overflow", "licenceType": "MIT"}
{"name": "github.com/elastic/ebpfevents", "licenceType": "Apache-2.0"}
{"name": "go.opentelemetry.io/collector/config/configopaque", "licenceType": "Apache-2.0"}
21 changes: 13 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ require (
github.com/elastic/bayeux v1.0.5
github.com/elastic/ebpfevents v0.6.0
github.com/elastic/elastic-agent-autodiscover v0.8.2
github.com/elastic/elastic-agent-libs v0.10.0
github.com/elastic/elastic-agent-libs v0.11.0
github.com/elastic/elastic-agent-system-metrics v0.11.1
github.com/elastic/go-elasticsearch/v8 v8.14.0
github.com/elastic/go-sfdc v0.0.0-20240621062639-bcc8456508ff
Expand Down Expand Up @@ -222,7 +222,10 @@ require (
go.elastic.co/apm/module/apmhttp/v2 v2.6.0
go.elastic.co/apm/v2 v2.6.0
go.mongodb.org/mongo-driver v1.5.1
go.opentelemetry.io/collector/consumer v0.107.0
go.opentelemetry.io/collector/component v0.109.0
go.opentelemetry.io/collector/consumer v0.109.0
go.opentelemetry.io/collector/pdata v1.15.0
go.opentelemetry.io/collector/receiver v0.109.0
google.golang.org/genproto/googleapis/api v0.0.0-20240725223205-93522f1f2a9f
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)
Expand Down Expand Up @@ -355,7 +358,7 @@ require (
github.com/pierrec/lz4 v2.6.0+incompatible // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_golang v1.20.0 // indirect
github.com/prometheus/client_golang v1.20.2 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/sergi/go-diff v1.3.1 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
Expand All @@ -369,19 +372,21 @@ require (
github.com/zeebo/xxh3 v1.0.2 // indirect
go.elastic.co/fastjson v1.1.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector/pdata v1.13.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.109.0 // indirect
go.opentelemetry.io/collector/consumer/consumerprofiles v0.109.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.109.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel v1.29.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
go.opentelemetry.io/otel/metric v1.29.0 // indirect
go.opentelemetry.io/otel/trace v1.29.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/ratelimit v0.3.1 // indirect
golang.org/x/exp v0.0.0-20240205201215-2c58cdc269a3 // indirect
golang.org/x/term v0.24.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.110.1 // indirect
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect
Expand Down
49 changes: 31 additions & 18 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -562,8 +562,8 @@ github.com/elastic/elastic-agent-autodiscover v0.8.2 h1:Fs2FhR33AMBPfm5/jz4drVza
github.com/elastic/elastic-agent-autodiscover v0.8.2/go.mod h1:VZnU53EVaFTxR8Xf6YsLN8FHD5DKQzHSPlKax9/4w+o=
github.com/elastic/elastic-agent-client/v7 v7.15.0 h1:nDB7v8TBoNuD6IIzC3z7Q0y+7bMgXoT2DsHfolO2CHE=
github.com/elastic/elastic-agent-client/v7 v7.15.0/go.mod h1:6h+f9QdIr3GO2ODC0Y8+aEXRwzbA5W4eV4dd/67z7nI=
github.com/elastic/elastic-agent-libs v0.10.0 h1:W7uvay0UYdLPtauXGsMD8Xfoe4qtcVWQR4icBgf/26Q=
github.com/elastic/elastic-agent-libs v0.10.0/go.mod h1:2VgYxHaeM+cCDBjiS2wbmTvzPGbnlXAtYrlcLefheS8=
github.com/elastic/elastic-agent-libs v0.11.0 h1:m9rnNE3BkBF2XJoqubqEbu/kbtKEBZ7pHCjDlxfVRH0=
github.com/elastic/elastic-agent-libs v0.11.0/go.mod h1:5CR02awPrBr+tfmjBBK+JI+dMmHNQjpVY24J0wjbC7M=
github.com/elastic/elastic-agent-system-metrics v0.11.1 h1:BxViQHnqxvvi/65rj3mGwG6Eto6ldFCTnuDTUJnakaU=
github.com/elastic/elastic-agent-system-metrics v0.11.1/go.mod h1:3QiMu9wTKJFvpCN+5klgGqasTMNKJbgY3xcoN1KQXJk=
github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA=
Expand Down Expand Up @@ -1479,8 +1479,8 @@ github.com/prometheus/client_golang v1.5.1/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3O
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.10.0/go.mod h1:WJM3cc3yu7XKBKa/I8WeZm+V3eltZnBwfENSU7mdogU=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.20.0 h1:jBzTZ7B099Rg24tny+qngoynol8LtVYlA2bqx3vEloI=
github.com/prometheus/client_golang v1.20.0/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
github.com/prometheus/client_golang v1.20.2 h1:5ctymQzZlyOON1666svgwn3s6IKWgfbjsejTMiXIyjg=
github.com/prometheus/client_golang v1.20.2/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
Expand Down Expand Up @@ -1736,26 +1736,39 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/collector/consumer v0.107.0 h1:fF/+xyv9BfXQUvuJqkljrpzKyBQExDQt6zB5rzGyuHs=
go.opentelemetry.io/collector/consumer v0.107.0/go.mod h1:wgWpFes9sbnZ11XeJPSeutU8GJx6dT/gzSUqHpaZZQA=
go.opentelemetry.io/collector/pdata v1.13.0 h1:eV3NQt2f1UcaibkziMvGTQI34LlpiYBUGp1yP0G/Cxw=
go.opentelemetry.io/collector/pdata v1.13.0/go.mod h1:MYeB0MmMAxeM0hstCFrCqWLzdyeYySim2dG6pDT6nYI=
go.opentelemetry.io/collector v0.109.0 h1:ULnMWuwcy4ix1oP5RFFRcmpEbaU5YabW6nWcLMQQRo0=
go.opentelemetry.io/collector/component v0.109.0 h1:AU6eubP1htO8Fvm86uWn66Kw0DMSFhgcRM2cZZTYfII=
go.opentelemetry.io/collector/component v0.109.0/go.mod h1:jRVFY86GY6JZ61SXvUN69n7CZoTjDTqWyNC+wJJvzOw=
go.opentelemetry.io/collector/config/configtelemetry v0.109.0 h1:ItbYw3tgFMU+TqGcDVEOqJLKbbOpfQg3AHD8b22ygl8=
go.opentelemetry.io/collector/config/configtelemetry v0.109.0/go.mod h1:R0MBUxjSMVMIhljuDHWIygzzJWQyZHXXWIgQNxcFwhc=
go.opentelemetry.io/collector/consumer v0.109.0 h1:fdXlJi5Rat/poHPiznM2mLiXjcv1gPy3fyqqeirri58=
go.opentelemetry.io/collector/consumer v0.109.0/go.mod h1:E7PZHnVe1DY9hYy37toNxr9/hnsO7+LmnsixW8akLQI=
go.opentelemetry.io/collector/consumer/consumerprofiles v0.109.0 h1:+WZ6MEWQRC6so3IRrW916XK58rI9NnrFHKW/P19jQvc=
go.opentelemetry.io/collector/consumer/consumerprofiles v0.109.0/go.mod h1:spZ9Dn1MRMPDHHThdXZA5TrFhdOL1wsl0Dw45EBVoVo=
go.opentelemetry.io/collector/consumer/consumertest v0.109.0 h1:v4w9G2MXGJ/eabCmX1DvQYmxzdysC8UqIxa/BWz7ACo=
go.opentelemetry.io/collector/consumer/consumertest v0.109.0/go.mod h1:lECt0qOrx118wLJbGijtqNz855XfvJv0xx9GSoJ8qSE=
go.opentelemetry.io/collector/pdata v1.15.0 h1:q/T1sFpRKJnjDrUsHdJ6mq4uSqViR/f92yvGwDby/gY=
go.opentelemetry.io/collector/pdata v1.15.0/go.mod h1:2wcsTIiLAJSbqBq/XUUYbi+cP+N87d0jEJzmb9nT19U=
go.opentelemetry.io/collector/pdata/pprofile v0.109.0 h1:5lobQKeHk8p4WC7KYbzL6ZqqX3eSizsdmp5vM8pQFBs=
go.opentelemetry.io/collector/pdata/pprofile v0.109.0/go.mod h1:lXIifCdtR5ewO17JAYTUsclMqRp6h6dCowoXHhGyw8Y=
go.opentelemetry.io/collector/receiver v0.109.0 h1:DTOM7xaDl7FUGQIjvjmWZn03JUE+aG4mJzWWfb7S8zw=
go.opentelemetry.io/collector/receiver v0.109.0/go.mod h1:jeiCHaf3PE6aXoZfHF5Uexg7aztu+Vkn9LVw0YDKm6g=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 h1:4Pp6oUg3+e/6M4C0A/3kJ2VYa++dsWVTtGgLVj5xtHg=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0/go.mod h1:Mjt1i1INqiaoZOMGR1RIUJN+i3ChKoFRqzrRQhlkbs0=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 h1:4K4tsIXefpVJtvA/8srF4V4y0akAoPHkIslgAkjixJA=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0/go.mod h1:jjdQuTGVsXV4vSs+CJ2qYDeDPf9yIJV23qlIzBm73Vg=
go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo=
go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4=
go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw=
go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 h1:3Q/xZUyC1BBkualc9ROb4G8qkH90LXEIICcs5zv1OYY=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0/go.mod h1:s75jGIWA9OfCMzF0xr+ZgfrB5FEbbV7UuYo32ahUiFI=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0 h1:j9+03ymgYhPKmeXGk5Zu+cIZOlVzd9Zv7QIiyItjFBU=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0/go.mod h1:Y5+XiUG4Emn1hTfciPzGPJaSI+RpDts6BnCIir0SLqk=
go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q=
go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s=
go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE=
go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg=
go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g=
go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI=
go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc=
go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8=
go.opentelemetry.io/otel/sdk v1.29.0 h1:vkqKjk7gwhS8VaWb0POZKmIEDimRCMsopNYnriHyryo=
go.opentelemetry.io/otel/sdk v1.29.0/go.mod h1:pM8Dx5WKnvxLCb+8lG1PRNIDxu9g9b9g59Qr7hfAAok=
go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4=
go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0=
go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8=
Expand Down Expand Up @@ -2365,8 +2378,8 @@ google.golang.org/genproto v0.0.0-20240730163845-b1a4ccb954bf h1:OqdXDEakZCVtDiZ
google.golang.org/genproto v0.0.0-20240730163845-b1a4ccb954bf/go.mod h1:mCr1K1c8kX+1iSBREvU3Juo11CB+QOEWxbRS01wWl5M=
google.golang.org/genproto/googleapis/api v0.0.0-20240725223205-93522f1f2a9f h1:b1Ln/PG8orm0SsBbHZWke8dDp2lrCD4jSmfglFpTZbk=
google.golang.org/genproto/googleapis/api v0.0.0-20240725223205-93522f1f2a9f/go.mod h1:AHT0dDg3SoMOgZGnZk29b5xTbPHMoEC8qthmBLJCpys=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf h1:liao9UHurZLtiEwBgT9LMOnKYsHze6eA6w1KQCMVN2Q=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd h1:6TEm2ZxXoQmFWFlt1vNxvVOa1Q0dXFQD1m/rYjXmS0E=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.0/go.mod h1:chYK+tFQF0nDUGJgXMSgLCQk3phJEuONr2DCgLDdAQM=
Expand Down
19 changes: 14 additions & 5 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/gofrs/uuid/v5"
"go.opentelemetry.io/collector/consumer"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/elastic/beats/v7/libbeat/api"
"github.com/elastic/beats/v7/libbeat/asset"
Expand Down Expand Up @@ -287,7 +288,7 @@ func NewBeat(name, indexPrefix, v string, elasticLicensed bool, initFuncs []func
}

// NewBeatReceiver creates a Beat that will be used in the context of an otel receiver
func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, consumer consumer.Logs) (*Beat, error) {
func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, consumer consumer.Logs, core zapcore.Core) (*Beat, error) {
b, err := NewBeat(settings.Name,
settings.IndexPrefix,
settings.Version,
Expand Down Expand Up @@ -355,6 +356,18 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c
return nil, fmt.Errorf("error unpacking config data: %w", err)
}

logpConfig := logp.Config{}
logpConfig.Beat = b.Info.Name
logpConfig.Files.MaxSize = 1

if err := b.Config.Logging.Unpack(&logpConfig); err != nil {
return nil, fmt.Errorf("error unpacking beats logging config: %w\n%v", err, b.Config.Logging)
}

if err := logp.ConfigureWithCore(logpConfig, core); err != nil {
return nil, fmt.Errorf("error configuring beats logp: %w", err)
}

if err := promoteOutputQueueSettings(&b.Config); err != nil {
return nil, fmt.Errorf("could not promote output queue settings: %w", err)
}
Expand All @@ -374,10 +387,6 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c
return nil, fmt.Errorf("error setting timestamp precision: %w", err)
}

if err := configure.LoggingWithTypedOutputs(b.Info.Beat, b.Config.Logging, b.Config.EventLogging, logp.TypeKey, logp.EventType); err != nil {
return nil, fmt.Errorf("error initializing logging: %w", err)
}

// log paths values to help with troubleshooting
logp.Info(paths.Paths.String())

Expand Down
30 changes: 30 additions & 0 deletions libbeat/outputs/otelconsumer/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package otelconsumer

import (
"github.com/elastic/elastic-agent-libs/config"
)

type otelConsumerConfig struct {
Queue config.Namespace `config:"queue"`
}

func defaultConfig() otelConsumerConfig {
return otelConsumerConfig{}
}
159 changes: 159 additions & 0 deletions libbeat/outputs/otelconsumer/otelconsumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package otelconsumer

import (
"context"
"fmt"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
)

func init() {
outputs.RegisterType("otelconsumer", makeOtelConsumer)
}

type otelConsumer struct {
observer outputs.Observer
logsConsumer consumer.Logs
beatInfo beat.Info
}

func makeOtelConsumer(_ outputs.IndexManager, beat beat.Info, observer outputs.Observer, cfg *config.C) (outputs.Group, error) {

out := &otelConsumer{
observer: observer,
logsConsumer: beat.LogConsumer,
beatInfo: beat,
}

ocConfig := defaultConfig()
if err := cfg.Unpack(&ocConfig); err != nil {
return outputs.Fail(err)
}
return outputs.Success(ocConfig.Queue, -1, 0, nil, out)
}

// Close is a noop for otelconsumer
func (out *otelConsumer) Close() error {
return nil
}

// Publish converts Beat events to Otel format and send to the next otel consumer
func (out *otelConsumer) Publish(ctx context.Context, batch publisher.Batch) error {
switch {
case out.logsConsumer != nil:
return out.logsPublish(ctx, batch)
default:
panic(fmt.Errorf("an otel consumer must be specified"))
}
}

func (out *otelConsumer) logsPublish(_ context.Context, batch publisher.Batch) error {
defer batch.ACK()
st := out.observer
pLogs := plog.NewLogs()
resourceLogs := pLogs.ResourceLogs().AppendEmpty()
sourceLogs := resourceLogs.ScopeLogs().AppendEmpty()
logRecords := sourceLogs.LogRecords()

events := batch.Events()
for _, event := range events {
logRecord := logRecords.AppendEmpty()
meta := event.Content.Meta.Clone()
meta["beat"] = out.beatInfo.Beat
meta["version"] = out.beatInfo.Version
meta["type"] = "_doc"

beatEvent := event.Content.Fields.Clone()
beatEvent["@timestamp"] = event.Content.Timestamp
beatEvent["@metadata"] = meta
logRecord.SetTimestamp(pcommon.NewTimestampFromTime(event.Content.Timestamp))
pcommonEvent := mapstrToPcommonMap(beatEvent)
pcommonEvent.CopyTo(logRecord.Body().SetEmptyMap())
}

if err := out.logsConsumer.ConsumeLogs(context.TODO(), pLogs); err != nil {
return fmt.Errorf("error otel log consumer: %w", err)
}

st.NewBatch(len(events))
st.AckedEvents(len(events))
return nil
}

func (out *otelConsumer) String() string {
return "otelconsumer"
}

// mapstrToPcommonMap is necessary to convert from Beats mapstr to
// Otel Map. This step could be avoided if we choose to encode the
// Body as a slice of bytes.
func mapstrToPcommonMap(m mapstr.M) pcommon.Map {
out := pcommon.NewMap()
for k, v := range m {
switch x := v.(type) {
case string:
out.PutStr(k, x)
case int:
out.PutInt(k, int64(v.(int)))
case int8:
out.PutInt(k, int64(v.(int8)))
case int16:
out.PutInt(k, int64(v.(int16)))
case int32:
out.PutInt(k, int64(v.(int32)))
case int64:
out.PutInt(k, v.(int64))
case uint:
out.PutInt(k, int64(v.(uint)))
case uint8:
out.PutInt(k, int64(v.(uint8)))
case uint16:
out.PutInt(k, int64(v.(uint16)))
case uint32:
out.PutInt(k, int64(v.(uint32)))
case uint64:
out.PutInt(k, int64(v.(uint64)))
case float32:
out.PutDouble(k, float64(v.(float32)))
case float64:
out.PutDouble(k, v.(float64))
case bool:
out.PutBool(k, x)
case mapstr.M:
dest := out.PutEmptyMap(k)
newMap := mapstrToPcommonMap(x)
newMap.CopyTo(dest)
case time.Time:
out.PutInt(k, x.UnixMilli())
default:
out.PutStr(k, fmt.Sprintf("unknown type: %T", x))
}
}
return out
}
1 change: 1 addition & 0 deletions libbeat/publisher/includes/includes.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
_ "github.com/elastic/beats/v7/libbeat/outputs/fileout"
_ "github.com/elastic/beats/v7/libbeat/outputs/kafka"
_ "github.com/elastic/beats/v7/libbeat/outputs/logstash"
_ "github.com/elastic/beats/v7/libbeat/outputs/otelconsumer"
_ "github.com/elastic/beats/v7/libbeat/outputs/redis"
_ "github.com/elastic/beats/v7/libbeat/publisher/queue/diskqueue"
_ "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"
Expand Down
Loading

0 comments on commit 38715e2

Please sign in to comment.