From 2d0266342b7f1561d96df895a7b46e1fdf329939 Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Thu, 25 Apr 2024 14:49:13 -0700 Subject: [PATCH] [receiver/discovery] Emit evaluation entities by the endpoint tracker (#4728) This change consolidates the entity emitting in one place: endpoint tracker. Once the evaluation succeeds, the endpoint attributes are updated in the correlation store and the new entity state is emitted right away. Every subsequent state is emitted with all the added attributes. The tests are kept with minimal changes to ensure that no attributes are lost in the emitted entities. Currently, we emit superset of all the attributes that were previously emitted on regular basis or on the evaluation. Later, we will reduce the set and remove redundant attributes. --- .../receiver/discoveryreceiver/correlation.go | 8 ++ .../discoveryreceiver/endpoint_tracker.go | 9 +- .../endpoint_tracker_test.go | 7 +- .../receiver/discoveryreceiver/evaluator.go | 7 +- .../discoveryreceiver/evaluator_test.go | 8 +- .../discoveryreceiver/metric_evaluator.go | 103 ++++++------------ .../metric_evaluator_test.go | 99 ++++------------- .../receiver/discoveryreceiver/receiver.go | 4 +- .../discoveryreceiver/statement_evaluator.go | 55 +++++----- .../statement_evaluator_test.go | 36 +++--- ...t_observer_simple_prometheus_statuses.yaml | 18 +++ 11 files changed, 148 insertions(+), 206 deletions(-) diff --git a/internal/receiver/discoveryreceiver/correlation.go b/internal/receiver/discoveryreceiver/correlation.go index d3aa4717e4..59168f6d02 100644 --- a/internal/receiver/discoveryreceiver/correlation.go +++ b/internal/receiver/discoveryreceiver/correlation.go @@ -50,6 +50,7 @@ type correlationStore interface { GetOrCreate(endpointID observer.EndpointID, receiverID component.ID) correlation Attrs(endpointID observer.EndpointID) map[string]string UpdateAttrs(endpointID observer.EndpointID, attrs map[string]string) + EmitCh() chan correlation Endpoints(updatedBefore time.Time) []observer.Endpoint // Start the reaping loop to prevent unnecessary endpoint buildup Start() @@ -71,6 +72,7 @@ type store struct { attrs *sync.Map // sentinel for terminating reaper loop sentinel chan struct{} + emitCh chan correlation reapInterval time.Duration ttl time.Duration } @@ -84,6 +86,7 @@ func newCorrelationStore(logger *zap.Logger, ttl time.Duration) correlationStore reapInterval: 30 * time.Second, ttl: ttl, sentinel: make(chan struct{}, 1), + emitCh: make(chan correlation), } } @@ -118,6 +121,11 @@ func (s *store) MarkStale(endpointID observer.EndpointID) { } } +// EmitCh returns a channel to emit endpoints immediately. +func (s *store) EmitCh() chan correlation { + return s.emitCh +} + // Endpoints returns all active endpoints that have not been updated since the provided time. func (s *store) Endpoints(updatedBefore time.Time) []observer.Endpoint { var endpoints []observer.Endpoint diff --git a/internal/receiver/discoveryreceiver/endpoint_tracker.go b/internal/receiver/discoveryreceiver/endpoint_tracker.go index 5bfa7ba2c0..d725ff6702 100644 --- a/internal/receiver/discoveryreceiver/endpoint_tracker.go +++ b/internal/receiver/discoveryreceiver/endpoint_tracker.go @@ -92,6 +92,8 @@ func (et *endpointTracker) startEmitLoop() { timer := time.NewTicker(et.emitInterval) for { select { + case corr := <-et.correlations.EmitCh(): + et.emitEntityStateEvents(corr.observerID, []observer.Endpoint{corr.endpoint}) case <-timer.C: for obs := range et.observables { et.emitEntityStateEvents(obs, et.correlations.Endpoints(time.Now().Add(-et.emitInterval))) @@ -114,7 +116,7 @@ func (et *endpointTracker) stop() { func (et *endpointTracker) emitEntityStateEvents(observerCID component.ID, endpoints []observer.Endpoint) { if et.pLogs != nil { - entityEvents, numFailed, err := entityStateEvents(observerCID, endpoints, time.Now()) + entityEvents, numFailed, err := entityStateEvents(observerCID, endpoints, et.correlations, time.Now()) if err != nil { et.logger.Warn(fmt.Sprintf("failed converting %v endpoints to log records", numFailed), zap.Error(err)) } @@ -209,7 +211,7 @@ func (n *notify) OnChange(changed []observer.Endpoint) { n.endpointTracker.updateEndpoints(changed, n.observerID) } -func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, ts time.Time) (ees experimentalmetricmetadata.EntityEventsSlice, failed int, err error) { +func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, correlations correlationStore, ts time.Time) (ees experimentalmetricmetadata.EntityEventsSlice, failed int, err error) { entityEvents := experimentalmetricmetadata.NewEntityEventsSlice() for _, endpoint := range endpoints { entityEvent := entityEvents.AppendEmpty() @@ -230,6 +232,9 @@ func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, t attrs.PutStr("endpoint", endpoint.Target) attrs.PutStr(observerNameAttr, observerID.Name()) attrs.PutStr(observerTypeAttr, observerID.Type().String()) + for k, v := range correlations.Attrs(endpoint.ID) { + attrs.PutStr(k, v) + } } return entityEvents, failed, err } diff --git a/internal/receiver/discoveryreceiver/endpoint_tracker_test.go b/internal/receiver/discoveryreceiver/endpoint_tracker_test.go index d13f0f5d9a..207e2c99c2 100644 --- a/internal/receiver/discoveryreceiver/endpoint_tracker_test.go +++ b/internal/receiver/discoveryreceiver/endpoint_tracker_test.go @@ -164,7 +164,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) { t.Run(test.name, func(t *testing.T) { events, failed, err := entityStateEvents( component.MustNewIDWithName("observer_type", "observer.name"), - []observer.Endpoint{test.endpoint}, t0, + []observer.Endpoint{test.endpoint}, newCorrelationStore(zap.NewNop(), time.Hour), t0, ) require.NoError(t, err) require.Zero(t, failed) @@ -276,7 +276,7 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) { // Validate entity_state event events, failed, err := entityStateEvents( component.MustNewIDWithName("observer_type", "observer.name"), - []observer.Endpoint{test.endpoint}, t0, + []observer.Endpoint{test.endpoint}, newCorrelationStore(zap.NewNop(), time.Hour), t0, ) if test.expectedError != "" { require.Error(t, err) @@ -343,7 +343,7 @@ func FuzzEndpointToPlogs(f *testing.F) { Transport: observer.Transport(transport), }, }, - }, t0, + }, newCorrelationStore(zap.NewNop(), time.Hour), t0, ) expectedLogs := expectedPLogs(endpointID) @@ -661,6 +661,7 @@ func TestEntityEmittingLifecycle(t *testing.T) { require.Equal(t, 1, gotLogs.LogRecordCount()) // TODO: Use plogtest.IgnoreTimestamp once available expectedEvents, failed, err := entityStateEvents(obsID, []observer.Endpoint{portEndpoint}, + newCorrelationStore(zap.NewNop(), time.Hour), gotLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Timestamp().AsTime()) require.NoError(t, err) require.Zero(t, failed) diff --git a/internal/receiver/discoveryreceiver/evaluator.go b/internal/receiver/discoveryreceiver/evaluator.go index 0ac6324cc1..8d1c4068f1 100644 --- a/internal/receiver/discoveryreceiver/evaluator.go +++ b/internal/receiver/discoveryreceiver/evaluator.go @@ -24,7 +24,6 @@ import ( "github.com/antonmedv/expr/vm" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pdata/pcommon" "go.uber.org/zap" "gopkg.in/yaml.v2" @@ -117,10 +116,10 @@ func (e *evaluator) evaluateMatch(match Match, pattern string, status discovery. } // correlateResourceAttributes sets correlation attributes including embedded base64 config content, if configured. -func (e *evaluator) correlateResourceAttributes(cfg *Config, to pcommon.Map, corr correlation) { +func (e *evaluator) correlateResourceAttributes(cfg *Config, to map[string]string, corr correlation) { observerID := corr.observerID.String() if observerID != "" && observerID != discovery.NoType.String() { - to.PutStr(discovery.ObserverIDAttr, observerID) + to[discovery.ObserverIDAttr] = observerID } if e.config.EmbedReceiverConfig { @@ -140,6 +139,6 @@ func (e *evaluator) correlateResourceAttributes(cfg *Config, to pcommon.Map, cor if err != nil { e.logger.Error("failed embedding receiver config", zap.String("observer", observerID), zap.Error(err)) } - to.PutStr(discovery.ReceiverConfigAttr, base64.StdEncoding.EncodeToString(cfgYaml)) + to[discovery.ReceiverConfigAttr] = base64.StdEncoding.EncodeToString(cfgYaml) } } diff --git a/internal/receiver/discoveryreceiver/evaluator_test.go b/internal/receiver/discoveryreceiver/evaluator_test.go index 5c07dcf3c2..48a1860e83 100644 --- a/internal/receiver/discoveryreceiver/evaluator_test.go +++ b/internal/receiver/discoveryreceiver/evaluator_test.go @@ -24,7 +24,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pdata/pcommon" "go.uber.org/zap" ) @@ -124,12 +123,11 @@ func TestCorrelateResourceAttrs(t *testing.T) { }, } - to := pcommon.NewMap() - + to := map[string]string{} require.Empty(t, eval.correlations.Attrs(endpointID)) eval.correlateResourceAttributes(cfg, to, corr) - expectedResourceAttrs := map[string]any{ + expectedResourceAttrs := map[string]string{ "discovery.observer.id": "type/name", } @@ -147,7 +145,7 @@ watch_observers: `)) } - require.Equal(t, expectedResourceAttrs, to.AsRaw()) + require.Equal(t, expectedResourceAttrs, to) }) } } diff --git a/internal/receiver/discoveryreceiver/metric_evaluator.go b/internal/receiver/discoveryreceiver/metric_evaluator.go index b30262f529..2ba0b69ee4 100644 --- a/internal/receiver/discoveryreceiver/metric_evaluator.go +++ b/internal/receiver/discoveryreceiver/metric_evaluator.go @@ -19,11 +19,9 @@ import ( "fmt" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -47,12 +45,10 @@ var ( // Status match rules. If so, they emit log records for the matching metric. type metricEvaluator struct { *evaluator - pLogs chan plog.Logs } -func newMetricEvaluator(logger *zap.Logger, cfg *Config, pLogs chan plog.Logs, correlations correlationStore) *metricEvaluator { +func newMetricEvaluator(logger *zap.Logger, cfg *Config, correlations correlationStore) *metricEvaluator { return &metricEvaluator{ - pLogs: pLogs, evaluator: newEvaluator(logger, cfg, correlations, // TODO: provide more capable env w/ resource and metric attributes func(pattern string) map[string]any { @@ -67,15 +63,13 @@ func (m *metricEvaluator) Capabilities() consumer.Capabilities { } func (m *metricEvaluator) ConsumeMetrics(_ context.Context, md pmetric.Metrics) error { - if pLogs := m.evaluateMetrics(md); pLogs.LogRecordCount() > 0 { - m.pLogs <- pLogs - } + m.evaluateMetrics(md) return nil } // evaluateMetrics parses the provided Metrics and returns plog.Logs with a single log record if it matches // against the first applicable configured Status match rule. -func (m *metricEvaluator) evaluateMetrics(md pmetric.Metrics) plog.Logs { +func (m *metricEvaluator) evaluateMetrics(md pmetric.Metrics) { if ce := m.logger.Check(zapcore.DebugLevel, "evaluating metrics"); ce != nil { if mbytes, err := jsonMarshaler.MarshalMetrics(md); err == nil { ce.Write(zap.ByteString("metrics", mbytes)) @@ -84,22 +78,22 @@ func (m *metricEvaluator) evaluateMetrics(md pmetric.Metrics) plog.Logs { } } if md.MetricCount() == 0 { - return plog.NewLogs() + return } receiverID, endpointID := statussources.MetricsToReceiverIDs(md) if receiverID == discovery.NoType || endpointID == "" { m.logger.Debug("unable to evaluate metrics from receiver without corresponding name or Endpoint.ID", zap.Any("metrics", md)) - return plog.NewLogs() + return } rEntry, ok := m.config.Receivers[receiverID] if !ok { m.logger.Info("No matching configured receiver for metric status evaluation", zap.String("receiver", receiverID.String())) - return plog.NewLogs() + return } if rEntry.Status == nil || len(rEntry.Status.Metrics) == 0 { - return plog.NewLogs() + return } for _, match := range rEntry.Status.Metrics { @@ -108,20 +102,26 @@ func (m *metricEvaluator) evaluateMetrics(md pmetric.Metrics) plog.Logs { continue } - entityEvents := experimentalmetricmetadata.NewEntityEventsSlice() - entityEvent := entityEvents.AppendEmpty() - entityEvent.ID().PutStr(discovery.EndpointIDAttr, string(endpointID)) - entityState := entityEvent.SetEntityState() - - res.Attributes().CopyTo(entityState.Attributes()) corr := m.correlations.GetOrCreate(endpointID, receiverID) - m.correlateResourceAttributes(m.config, entityState.Attributes(), corr) + attrs := m.correlations.Attrs(endpointID) - // Remove the endpoint ID from the attributes as it's set in the entity ID. - entityState.Attributes().Remove(discovery.EndpointIDAttr) + // If the status is already the same as desired, we don't need to update the entity state. + if match.Status == discovery.StatusType(attrs[discovery.StatusAttr]) { + return + } - entityState.Attributes().PutStr(eventTypeAttr, metricMatch) - entityState.Attributes().PutStr(receiverRuleAttr, rEntry.Rule.String()) + res.Attributes().Range(func(k string, v pcommon.Value) bool { + // skip endpoint ID attr since it's set in the entity ID + if k == discovery.EndpointIDAttr { + return true + } + attrs[k] = v.AsString() + return true + }) + m.correlateResourceAttributes(m.config, attrs, corr) + + attrs[eventTypeAttr] = metricMatch + attrs[receiverRuleAttr] = rEntry.Rule.String() desiredRecord := match.Record if desiredRecord == nil { @@ -131,19 +131,17 @@ func (m *metricEvaluator) evaluateMetrics(md pmetric.Metrics) plog.Logs { if desiredRecord.Body != "" { desiredMsg = desiredRecord.Body } - entityState.Attributes().PutStr(discovery.MessageAttr, desiredMsg) + attrs[discovery.MessageAttr] = desiredMsg for k, v := range desiredRecord.Attributes { - entityState.Attributes().PutStr(k, v) - } - entityState.Attributes().PutStr(metricNameAttr, metric.Name()) - entityState.Attributes().PutStr(discovery.StatusAttr, string(match.Status)) - if ts := m.timestampFromMetric(metric); ts != nil { - entityEvent.SetTimestamp(*ts) + attrs[k] = v } + attrs[metricNameAttr] = metric.Name() + attrs[discovery.StatusAttr] = string(match.Status) + m.correlations.UpdateAttrs(endpointID, attrs) - return entityEvents.ConvertAndMoveToLogs() + m.correlations.EmitCh() <- corr + return } - return plog.NewLogs() } // findMatchedMetric finds the metric that matches the provided match rule and return it along with the resource if found. @@ -166,42 +164,3 @@ func (m *metricEvaluator) findMatchedMetric(md pmetric.Metrics, match Match, rec } return pcommon.NewResource(), pmetric.NewMetric(), false } - -func (m *metricEvaluator) timestampFromMetric(metric pmetric.Metric) *pcommon.Timestamp { - var ts *pcommon.Timestamp - switch dt := metric.Type(); dt { - case pmetric.MetricTypeGauge: - dps := metric.Gauge().DataPoints() - if dps.Len() > 0 { - t := dps.At(0).Timestamp() - ts = &t - } - case pmetric.MetricTypeSum: - dps := metric.Sum().DataPoints() - if dps.Len() > 0 { - t := dps.At(0).Timestamp() - ts = &t - } - case pmetric.MetricTypeHistogram: - dps := metric.Histogram().DataPoints() - if dps.Len() > 0 { - t := dps.At(0).Timestamp() - ts = &t - } - case pmetric.MetricTypeExponentialHistogram: - dps := metric.ExponentialHistogram().DataPoints() - if dps.Len() > 0 { - t := dps.At(0).Timestamp() - ts = &t - } - case pmetric.MetricTypeSummary: - dps := metric.Summary().DataPoints() - if dps.Len() > 0 { - t := dps.At(0).Timestamp() - ts = &t - } - default: - m.logger.Debug("cannot get timestamp from data type", zap.String("data type", dt.String())) - } - return ts -} diff --git a/internal/receiver/discoveryreceiver/metric_evaluator_test.go b/internal/receiver/discoveryreceiver/metric_evaluator_test.go index 0a9bc460fe..44e0e84494 100644 --- a/internal/receiver/discoveryreceiver/metric_evaluator_test.go +++ b/internal/receiver/discoveryreceiver/metric_evaluator_test.go @@ -16,6 +16,7 @@ package discoveryreceiver import ( "context" + "sync" "testing" "time" @@ -24,7 +25,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" @@ -34,10 +34,9 @@ import ( func TestMetricEvaluatorBaseMetricConsumer(t *testing.T) { logger := zap.NewNop() cfg := &Config{} - plogs := make(chan plog.Logs) cStore := newCorrelationStore(logger, time.Hour) - me := newMetricEvaluator(logger, cfg, plogs, cStore) + me := newMetricEvaluator(logger, cfg, cStore) require.Equal(t, consumer.Capabilities{}, me.Capabilities()) md := pmetric.NewMetrics() @@ -81,11 +80,20 @@ func TestMetricEvaluation(t *testing.T) { } require.NoError(t, cfg.Validate()) - plogs := make(chan plog.Logs) cStore := newCorrelationStore(logger, time.Hour) + + emitCh := cStore.EmitCh() + emitWG := sync.WaitGroup{} + emitWG.Add(1) + var corr correlation + go func() { + corr = <-emitCh + emitWG.Done() + }() + cStore.UpdateEndpoint(observer.Endpoint{ID: "endpoint.id"}, receiverID, observerID) - me := newMetricEvaluator(logger, cfg, plogs, cStore) + me := newMetricEvaluator(logger, cfg, cStore) expectedRes := pcommon.NewResource() expectedRes.Attributes().PutStr("discovery.receiver.type", "a_receiver") @@ -113,10 +121,16 @@ func TestMetricEvaluation(t *testing.T) { sms.AppendEmpty().SetName("desired.name") sms.AppendEmpty().SetName("desired.name") - emitted := me.evaluateMetrics(md) + me.evaluateMetrics(md) - require.Equal(t, 1, emitted.LogRecordCount()) + // wait for the emit channel to be processed + emitWG.Wait() + entityEvents, numFailed, err := entityStateEvents(corr.observerID, + []observer.Endpoint{corr.endpoint}, cStore, time.Now()) + require.NoError(t, err) + require.Equal(t, 0, numFailed) + emitted := entityEvents.ConvertAndMoveToLogs() rl := emitted.ResourceLogs().At(0) require.Equal(t, 0, rl.Resource().Attributes().Len()) @@ -145,6 +159,9 @@ func TestMetricEvaluation(t *testing.T) { "one": "one.value", "two": "two.value", "extra_attr": "target_resource", + "discovery.observer.name": "observer.name", + "discovery.observer.type": "an_observer", + "endpoint": "", }, }, lrAttrs.AsRaw()) }) @@ -152,71 +169,3 @@ func TestMetricEvaluation(t *testing.T) { }) } } - -func TestTimestampFromMetric(t *testing.T) { - expectedTime := pcommon.NewTimestampFromTime(time.Now()) - for _, test := range []struct { - metricFunc func(pmetric.Metric) (shouldBeNil bool) - name string - }{ - {name: "MetricTypeGauge", metricFunc: func(md pmetric.Metric) bool { - md.SetEmptyGauge() - md.Gauge().DataPoints().AppendEmpty().SetTimestamp(expectedTime) - return false - }}, - {name: "empty MetricTypeGauge", metricFunc: func(md pmetric.Metric) bool { - md.SetEmptyGauge() - return true - }}, - {name: "MetricTypeSum", metricFunc: func(md pmetric.Metric) bool { - md.SetEmptySum() - md.Sum().DataPoints().AppendEmpty().SetTimestamp(expectedTime) - return false - }}, - {name: "empty MetricTypeSum", metricFunc: func(md pmetric.Metric) bool { - md.SetEmptySum() - return true - }}, - {name: "MetricTypeHistogram", metricFunc: func(md pmetric.Metric) bool { - md.SetEmptyHistogram() - md.Histogram().DataPoints().AppendEmpty().SetTimestamp(expectedTime) - return false - }}, - {name: "empty MetricTypeHistogram", metricFunc: func(md pmetric.Metric) bool { - md.SetEmptyHistogram() - return true - }}, - {name: "MetricTypeExponentialHistogram", metricFunc: func(md pmetric.Metric) bool { - md.SetEmptyExponentialHistogram() - md.ExponentialHistogram().DataPoints().AppendEmpty().SetTimestamp(expectedTime) - return false - }}, - {name: "empty MetricTypeExponentialHistogram", metricFunc: func(md pmetric.Metric) bool { - md.SetEmptyExponentialHistogram() - return true - }}, - {name: "MetricTypeSummary", metricFunc: func(md pmetric.Metric) bool { - md.SetEmptySummary() - md.Summary().DataPoints().AppendEmpty().SetTimestamp(expectedTime) - return false - }}, - {name: "empty MetricTypeSummary", metricFunc: func(md pmetric.Metric) bool { - md.SetEmptySummary() - return true - }}, - {name: "MetricTypeNone", metricFunc: func(_ pmetric.Metric) bool { return true }}, - } { - t.Run(test.name, func(t *testing.T) { - me := newMetricEvaluator(zap.NewNop(), &Config{}, make(chan plog.Logs), nil) - md := pmetric.NewMetrics().ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() - shouldBeNil := test.metricFunc(md) - actual := me.timestampFromMetric(md) - if shouldBeNil { - require.Nil(t, actual) - } else { - require.NotNil(t, actual) - require.Equal(t, expectedTime, *actual) - } - }) - } -} diff --git a/internal/receiver/discoveryreceiver/receiver.go b/internal/receiver/discoveryreceiver/receiver.go index 3c8a157a21..3303ce72c4 100644 --- a/internal/receiver/discoveryreceiver/receiver.go +++ b/internal/receiver/discoveryreceiver/receiver.go @@ -99,9 +99,9 @@ func (d *discoveryReceiver) Start(ctx context.Context, host component.Host) (err d.endpointTracker = newEndpointTracker(d.observables, d.config, d.logger, d.pLogs, correlations) d.endpointTracker.start() - d.metricEvaluator = newMetricEvaluator(d.logger, d.config, d.pLogs, correlations) + d.metricEvaluator = newMetricEvaluator(d.logger, d.config, correlations) - if d.statementEvaluator, err = newStatementEvaluator(d.logger, d.settings.ID, d.config, d.pLogs, correlations); err != nil { + if d.statementEvaluator, err = newStatementEvaluator(d.logger, d.settings.ID, d.config, correlations); err != nil { return fmt.Errorf("failed creating statement evaluator: %w", err) } diff --git a/internal/receiver/discoveryreceiver/statement_evaluator.go b/internal/receiver/discoveryreceiver/statement_evaluator.go index 96b60eecd2..73e9048d1c 100644 --- a/internal/receiver/discoveryreceiver/statement_evaluator.go +++ b/internal/receiver/discoveryreceiver/statement_evaluator.go @@ -19,10 +19,7 @@ import ( "fmt" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -39,14 +36,13 @@ const statementMatch = "statement.match" // for the matching statement. type statementEvaluator struct { *evaluator - pLogs chan plog.Logs // this is the logger to share with other components to evaluate their statements and produce plog.Logs evaluatedLogger *zap.Logger encoder zapcore.Encoder id component.ID } -func newStatementEvaluator(logger *zap.Logger, id component.ID, config *Config, pLogs chan plog.Logs, correlations correlationStore) (*statementEvaluator, error) { +func newStatementEvaluator(logger *zap.Logger, id component.ID, config *Config, correlations correlationStore) (*statementEvaluator, error) { zapConfig := zap.NewProductionConfig() zapConfig.Level = zap.NewAtomicLevelAt(zap.DebugLevel) zapConfig.Sampling.Initial = 1 @@ -54,7 +50,6 @@ func newStatementEvaluator(logger *zap.Logger, id component.ID, config *Config, encoder := statussources.NewZapCoreEncoder() se := &statementEvaluator{ - pLogs: pLogs, encoder: encoder, id: id, } @@ -130,10 +125,7 @@ func (se *statementEvaluator) Write(entry zapcore.Entry, fields []zapcore.Field) } } - if pLogs := se.evaluateStatement(statement); pLogs.LogRecordCount() > 0 { - se.pLogs <- pLogs - } - + se.evaluateStatement(statement) return nil } @@ -144,12 +136,12 @@ func (se *statementEvaluator) Sync() error { // evaluateStatement will convert the provided statussources.Statement into a plog.Logs with a single log record // if it matches against the first applicable configured ReceiverEntry's status Statement.[]Match -func (se *statementEvaluator) evaluateStatement(statement *statussources.Statement) plog.Logs { +func (se *statementEvaluator) evaluateStatement(statement *statussources.Statement) { se.logger.Debug("evaluating statement", zap.Any("statement", statement)) receiverID, endpointID, rEntry, shouldEvaluate := se.receiverEntryFromStatement(statement) if !shouldEvaluate { - return plog.NewLogs() + return } patternMap := map[string]string{"message": statement.Message} @@ -183,19 +175,23 @@ func (se *statementEvaluator) evaluateStatement(statement *statussources.Stateme continue } - entityEvents := experimentalmetricmetadata.NewEntityEventsSlice() - entityEvent := entityEvents.AppendEmpty() - entityEvent.ID().PutStr(discovery.EndpointIDAttr, string(endpointID)) - entityState := entityEvent.SetEntityState() - attrs := entityState.Attributes() - _ = attrs.FromRaw(statement.Fields) corr := se.correlations.GetOrCreate(endpointID, receiverID) + attrs := se.correlations.Attrs(endpointID) + + // If the status is already the same as desired, we don't need to update the entity state. + if match.Status == discovery.StatusType(attrs[discovery.StatusAttr]) { + return + } + + for k, v := range statement.Fields { + attrs[k] = fmt.Sprintf("%v", v) + } se.correlateResourceAttributes(se.config, attrs, corr) - attrs.PutStr(discovery.ReceiverTypeAttr, receiverID.Type().String()) - attrs.PutStr(discovery.ReceiverNameAttr, receiverID.Name()) - attrs.PutStr(discovery.MessageAttr, statement.Message) - attrs.PutStr(eventTypeAttr, statementMatch) - attrs.PutStr(receiverRuleAttr, rEntry.Rule.String()) + attrs[discovery.ReceiverTypeAttr] = receiverID.Type().String() + attrs[discovery.ReceiverNameAttr] = receiverID.Name() + attrs[discovery.MessageAttr] = statement.Message + attrs[eventTypeAttr] = statementMatch + attrs[receiverRuleAttr] = rEntry.Rule.String() var desiredRecord LogRecord if match.Record != nil { @@ -206,18 +202,19 @@ func (se *statementEvaluator) evaluateStatement(statement *statussources.Stateme if desiredRecord.AppendPattern { body = fmt.Sprintf("%s (evaluated %q)", body, p) } - entityState.Attributes().PutStr(discovery.MessageAttr, body) + attrs[discovery.MessageAttr] = body } if len(desiredRecord.Attributes) > 0 { for k, v := range desiredRecord.Attributes { - entityState.Attributes().PutStr(k, v) + attrs[k] = v } } - entityState.Attributes().PutStr(discovery.StatusAttr, string(match.Status)) - entityEvent.SetTimestamp(pcommon.NewTimestampFromTime(statement.Time)) - return entityEvents.ConvertAndMoveToLogs() + attrs[discovery.StatusAttr] = string(match.Status) + se.correlations.UpdateAttrs(endpointID, attrs) + + se.correlations.EmitCh() <- corr + return } - return plog.NewLogs() } func (se *statementEvaluator) receiverEntryFromStatement(statement *statussources.Statement) (component.ID, observer.EndpointID, ReceiverEntry, bool) { diff --git a/internal/receiver/discoveryreceiver/statement_evaluator_test.go b/internal/receiver/discoveryreceiver/statement_evaluator_test.go index f8105b7b3a..1bf2bc1109 100644 --- a/internal/receiver/discoveryreceiver/statement_evaluator_test.go +++ b/internal/receiver/discoveryreceiver/statement_evaluator_test.go @@ -25,7 +25,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" "github.com/signalfx/splunk-otel-collector/internal/common/discovery" @@ -66,31 +65,31 @@ func TestStatementEvaluation(t *testing.T) { } require.NoError(t, cfg.Validate()) - plogs := make(chan plog.Logs) - // If debugging tests, replace the Nop Logger with a test instance to see // all statements. Not in regular use to avoid spamming output. // logger := zaptest.NewLogger(t) logger := zap.NewNop() cStore := newCorrelationStore(logger, time.Hour) + + emitCh := cStore.EmitCh() + emitWG := sync.WaitGroup{} + emitWG.Add(1) + var corr correlation + go func() { + corr = <-emitCh + emitWG.Done() + }() + receiverID := component.MustNewIDWithName("a_receiver", "receiver.name") cStore.UpdateEndpoint(observer.Endpoint{ID: "endpoint.id"}, receiverID, observerID) - se, err := newStatementEvaluator(logger, component.MustNewID("some_type"), cfg, plogs, cStore) + se, err := newStatementEvaluator(logger, component.MustNewID("some_type"), cfg, cStore) require.NoError(t, err) evaluatedLogger := se.evaluatedLogger.With( zap.String("name", `a_receiver/receiver.name/receiver_creator/rc.name/{endpoint=""}/endpoint.id`), ) - var emitted plog.Logs - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - emitted = <-plogs - wg.Done() - }() - for _, statement := range []string{ "undesired.statement", "another.undesired.statement", @@ -105,8 +104,14 @@ func TestStatementEvaluation(t *testing.T) { ) } - wg.Wait() - close(plogs) + // wait for the emit channel to be processed + emitWG.Wait() + + entityEvents, numFailed, err := entityStateEvents(corr.observerID, + []observer.Endpoint{corr.endpoint}, cStore, time.Now()) + require.NoError(t, err) + require.Equal(t, 0, numFailed) + emitted := entityEvents.ConvertAndMoveToLogs() require.Equal(t, 1, emitted.ResourceLogs().Len()) rl := emitted.ResourceLogs().At(0) @@ -160,6 +165,9 @@ func TestStatementEvaluation(t *testing.T) { "attr.two": "attr.two.value", "field.one": "field.one.value", "field_two": "field.two.value", + "discovery.observer.name": "observer.name", + "discovery.observer.type": "an_observer", + "endpoint": "", }, }, lr.Attributes().AsRaw()) }) diff --git a/tests/receivers/discovery/testdata/resource_logs/host_observer_simple_prometheus_statuses.yaml b/tests/receivers/discovery/testdata/resource_logs/host_observer_simple_prometheus_statuses.yaml index 3762405137..0c199e5f6f 100644 --- a/tests/receivers/discovery/testdata/resource_logs/host_observer_simple_prometheus_statuses.yaml +++ b/tests/receivers/discovery/testdata/resource_logs/host_observer_simple_prometheus_statuses.yaml @@ -23,6 +23,15 @@ resource_logs: discovery.status: successful discovery.message: Successfully connected to prometheus server metric.name: otelcol_process_uptime + discovery.observer.type: host_observer + discovery.observer.name: "" + command: /otelcol --config /etc/config.yaml + endpoint: '[::]:8888' + is_ipv6: true + port: 8888 + process_name: otelcol + transport: TCP + type: hostport - scope_logs: - logs: - attributes: @@ -42,3 +51,12 @@ resource_logs: kind: receiver name: prometheus_simple//receiver_creator/discovery{endpoint="[::]:4318"}/(host_observer)[::]-4318-TCP-1 target_labels: '{__name__="up", instance="[::]:4318", job="prometheus_simple/[::]:4318"}' + discovery.observer.name: "" + discovery.observer.type: host_observer + command: /otelcol --config /etc/config.yaml + endpoint: '[::]:4318' + is_ipv6: true + port: 4318 + process_name: otelcol + transport: TCP + type: hostport