Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticsearch exporter: Init JSON encoding support #3101

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 41 additions & 13 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
elasticsearch7 "github.com/elastic/go-elasticsearch/v7"
esutil7 "github.com/elastic/go-elasticsearch/v7/esutil"
"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/multierr"
"go.uber.org/zap"
)

Expand All @@ -46,6 +47,7 @@ type elasticsearchExporter struct {

client *esClientCurrent
bulkIndexer esBulkIndexerCurrent
model mappingModel
}

var retryOnStatus = []int{500, 502, 503, 504, 429}
Expand All @@ -72,13 +74,17 @@ func newExporter(logger *zap.Logger, cfg *Config) (*elasticsearchExporter, error
maxAttempts = cfg.Retry.MaxRequests
}

// TODO: Apply encoding and field mapping settings.
model := &encodeModel{dedup: true, dedot: false}

return &elasticsearchExporter{
logger: logger,
client: client,
bulkIndexer: bulkIndexer,

index: cfg.Index,
maxAttempts: maxAttempts,
model: model,
}, nil
}

Expand All @@ -87,7 +93,36 @@ func (e *elasticsearchExporter) Shutdown(ctx context.Context) error {
}

func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld pdata.Logs) error {
panic("TODO")
var errs []error

rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
rl := rls.At(i)
resource := rl.Resource()
ills := rl.InstrumentationLibraryLogs()
for j := 0; j < ills.Len(); j++ {
logs := ills.At(i).Logs()
for k := 0; k < logs.Len(); k++ {
if err := e.pushLogRecord(ctx, resource, logs.At(k)); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
}

errs = append(errs, err)
}
}
}
}

return multierr.Combine(errs...)
}

func (e *elasticsearchExporter) pushLogRecord(ctx context.Context, resource pdata.Resource, record pdata.LogRecord) error {
document, err := e.model.encodeLog(resource, record)
if err != nil {
return fmt.Errorf("Failed to encode log event: %w", err)
}
return e.pushEvent(ctx, document)
}

func (e *elasticsearchExporter) pushEvent(ctx context.Context, document []byte) error {
Expand Down Expand Up @@ -184,17 +219,10 @@ func newElasticsearchClient(logger *zap.Logger, config *Config) (*esClientCurren

// maxRetries configures the maximum number of event publishing attempts,
// including the first send and additional retries.
// Issue: https://github.com/elastic/go-elasticsearch/issues/232
//
// The elasticsearch7.Client retry requires the count to be >= 1, otherwise
// it defaults to 3. Internally the Clients starts the number of send attempts with 1.
// When maxRetries is 1, retries are disabled, meaning that the event is
// dropped if the first HTTP request failed.
//
// Once the issue is resolved we want `maxRetries = config.Retry.MaxRequests - 1`.
maxRetries := config.Retry.MaxRequests
if maxRetries < 1 || !config.Retry.Enabled {
maxRetries = 1
maxRetries := config.Retry.MaxRequests - 1
retryDisabled := !config.Retry.Enabled || maxRetries <= 0
if retryDisabled {
maxRetries = 0
}

return elasticsearch7.NewClient(esConfigCurrent{
Expand All @@ -210,7 +238,7 @@ func newElasticsearchClient(logger *zap.Logger, config *Config) (*esClientCurren

// configure retry behavior
RetryOnStatus: retryOnStatus,
DisableRetry: !config.Retry.Enabled,
DisableRetry: retryDisabled,
EnableRetryOnTimeout: config.Retry.Enabled,
MaxRetries: maxRetries,
RetryBackoff: createElasticsearchBackoffFunc(&config.Retry),
Expand Down
2 changes: 2 additions & 0 deletions exporter/elasticsearchexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/armon/go-metrics v0.3.3 // indirect
github.com/cenkalti/backoff/v4 v4.1.0
github.com/elastic/go-elasticsearch/v7 v7.12.0
github.com/elastic/go-structform v0.0.8
github.com/gogo/googleapis v1.3.0 // indirect
github.com/hashicorp/go-immutable-radix v1.2.0 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
Expand All @@ -15,6 +16,7 @@ require (
github.com/pelletier/go-toml v1.8.0 // indirect
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/collector v0.27.0
go.uber.org/multierr v1.5.0
go.uber.org/zap v1.16.0
gopkg.in/ini.v1 v1.57.0 // indirect
)
3 changes: 3 additions & 0 deletions exporter/elasticsearchexporter/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7Do
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw=
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/crossdock/crossdock-go v0.0.0-20160816171116-049aabb0122b h1:WR1qVJzbvrVywhAk4kMQKRPx09AZVI0NdEdYs59iHcA=
github.com/crossdock/crossdock-go v0.0.0-20160816171116-049aabb0122b/go.mod h1:v9FBN7gdVTpiD/+LZ7Po0UKvROyT87uLVxTHVky/dlQ=
github.com/dave/jennifer v1.2.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg=
github.com/davecgh/go-spew v0.0.0-20161028175848-04cdfd42973b/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -232,6 +233,8 @@ github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7j
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/elastic/go-elasticsearch/v7 v7.12.0 h1:j4tvcMrZJLp39L2NYvBb7f+lHKPqPHSL3nvB8+/DV+s=
github.com/elastic/go-elasticsearch/v7 v7.12.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/elastic/go-structform v0.0.8 h1:U0qnb9Zqig7w+FhF+sLI3VZPPi/+2aJ0bIEW6R1z6Tk=
github.com/elastic/go-structform v0.0.8/go.mod h1:CZWf9aIRYY5SuKSmOhtXScE5uQiLZNqAFnwKR4OrIM4=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
Expand Down
Loading