diff --git a/internal/receiver/discoveryreceiver/correlation.go b/internal/receiver/discoveryreceiver/correlation.go index d3aa4717e4..59168f6d02 100644 --- a/internal/receiver/discoveryreceiver/correlation.go +++ b/internal/receiver/discoveryreceiver/correlation.go @@ -50,6 +50,7 @@ type correlationStore interface { GetOrCreate(endpointID observer.EndpointID, receiverID component.ID) correlation Attrs(endpointID observer.EndpointID) map[string]string UpdateAttrs(endpointID observer.EndpointID, attrs map[string]string) + EmitCh() chan correlation Endpoints(updatedBefore time.Time) []observer.Endpoint // Start the reaping loop to prevent unnecessary endpoint buildup Start() @@ -71,6 +72,7 @@ type store struct { attrs *sync.Map // sentinel for terminating reaper loop sentinel chan struct{} + emitCh chan correlation reapInterval time.Duration ttl time.Duration } @@ -84,6 +86,7 @@ func newCorrelationStore(logger *zap.Logger, ttl time.Duration) correlationStore reapInterval: 30 * time.Second, ttl: ttl, sentinel: make(chan struct{}, 1), + emitCh: make(chan correlation), } } @@ -118,6 +121,11 @@ func (s *store) MarkStale(endpointID observer.EndpointID) { } } +// EmitCh returns a channel to emit endpoints immediately. +func (s *store) EmitCh() chan correlation { + return s.emitCh +} + // Endpoints returns all active endpoints that have not been updated since the provided time. func (s *store) Endpoints(updatedBefore time.Time) []observer.Endpoint { var endpoints []observer.Endpoint diff --git a/internal/receiver/discoveryreceiver/endpoint_tracker.go b/internal/receiver/discoveryreceiver/endpoint_tracker.go index 5bfa7ba2c0..d725ff6702 100644 --- a/internal/receiver/discoveryreceiver/endpoint_tracker.go +++ b/internal/receiver/discoveryreceiver/endpoint_tracker.go @@ -92,6 +92,8 @@ func (et *endpointTracker) startEmitLoop() { timer := time.NewTicker(et.emitInterval) for { select { + case corr := <-et.correlations.EmitCh(): + et.emitEntityStateEvents(corr.observerID, []observer.Endpoint{corr.endpoint}) case <-timer.C: for obs := range et.observables { et.emitEntityStateEvents(obs, et.correlations.Endpoints(time.Now().Add(-et.emitInterval))) @@ -114,7 +116,7 @@ func (et *endpointTracker) stop() { func (et *endpointTracker) emitEntityStateEvents(observerCID component.ID, endpoints []observer.Endpoint) { if et.pLogs != nil { - entityEvents, numFailed, err := entityStateEvents(observerCID, endpoints, time.Now()) + entityEvents, numFailed, err := entityStateEvents(observerCID, endpoints, et.correlations, time.Now()) if err != nil { et.logger.Warn(fmt.Sprintf("failed converting %v endpoints to log records", numFailed), zap.Error(err)) } @@ -209,7 +211,7 @@ func (n *notify) OnChange(changed []observer.Endpoint) { n.endpointTracker.updateEndpoints(changed, n.observerID) } -func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, ts time.Time) (ees experimentalmetricmetadata.EntityEventsSlice, failed int, err error) { +func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, correlations correlationStore, ts time.Time) (ees experimentalmetricmetadata.EntityEventsSlice, failed int, err error) { entityEvents := experimentalmetricmetadata.NewEntityEventsSlice() for _, endpoint := range endpoints { entityEvent := entityEvents.AppendEmpty() @@ -230,6 +232,9 @@ func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, t attrs.PutStr("endpoint", endpoint.Target) attrs.PutStr(observerNameAttr, observerID.Name()) attrs.PutStr(observerTypeAttr, observerID.Type().String()) + for k, v := range correlations.Attrs(endpoint.ID) { + attrs.PutStr(k, v) + } } return entityEvents, failed, err } diff --git a/internal/receiver/discoveryreceiver/endpoint_tracker_test.go b/internal/receiver/discoveryreceiver/endpoint_tracker_test.go index d13f0f5d9a..207e2c99c2 100644 --- a/internal/receiver/discoveryreceiver/endpoint_tracker_test.go +++ b/internal/receiver/discoveryreceiver/endpoint_tracker_test.go @@ -164,7 +164,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) { t.Run(test.name, func(t *testing.T) { events, failed, err := entityStateEvents( component.MustNewIDWithName("observer_type", "observer.name"), - []observer.Endpoint{test.endpoint}, t0, + []observer.Endpoint{test.endpoint}, newCorrelationStore(zap.NewNop(), time.Hour), t0, ) require.NoError(t, err) require.Zero(t, failed) @@ -276,7 +276,7 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) { // Validate entity_state event events, failed, err := entityStateEvents( component.MustNewIDWithName("observer_type", "observer.name"), - []observer.Endpoint{test.endpoint}, t0, + []observer.Endpoint{test.endpoint}, newCorrelationStore(zap.NewNop(), time.Hour), t0, ) if test.expectedError != "" { require.Error(t, err) @@ -343,7 +343,7 @@ func FuzzEndpointToPlogs(f *testing.F) { Transport: observer.Transport(transport), }, }, - }, t0, + }, newCorrelationStore(zap.NewNop(), time.Hour), t0, ) expectedLogs := expectedPLogs(endpointID) @@ -661,6 +661,7 @@ func TestEntityEmittingLifecycle(t *testing.T) { require.Equal(t, 1, gotLogs.LogRecordCount()) // TODO: Use plogtest.IgnoreTimestamp once available expectedEvents, failed, err := entityStateEvents(obsID, []observer.Endpoint{portEndpoint}, + newCorrelationStore(zap.NewNop(), time.Hour), gotLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Timestamp().AsTime()) require.NoError(t, err) require.Zero(t, failed) diff --git a/internal/receiver/discoveryreceiver/evaluator.go b/internal/receiver/discoveryreceiver/evaluator.go index 0ac6324cc1..8d1c4068f1 100644 --- a/internal/receiver/discoveryreceiver/evaluator.go +++ b/internal/receiver/discoveryreceiver/evaluator.go @@ -24,7 +24,6 @@ import ( "github.com/antonmedv/expr/vm" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pdata/pcommon" "go.uber.org/zap" "gopkg.in/yaml.v2" @@ -117,10 +116,10 @@ func (e *evaluator) evaluateMatch(match Match, pattern string, status discovery. } // correlateResourceAttributes sets correlation attributes including embedded base64 config content, if configured. -func (e *evaluator) correlateResourceAttributes(cfg *Config, to pcommon.Map, corr correlation) { +func (e *evaluator) correlateResourceAttributes(cfg *Config, to map[string]string, corr correlation) { observerID := corr.observerID.String() if observerID != "" && observerID != discovery.NoType.String() { - to.PutStr(discovery.ObserverIDAttr, observerID) + to[discovery.ObserverIDAttr] = observerID } if e.config.EmbedReceiverConfig { @@ -140,6 +139,6 @@ func (e *evaluator) correlateResourceAttributes(cfg *Config, to pcommon.Map, cor if err != nil { e.logger.Error("failed embedding receiver config", zap.String("observer", observerID), zap.Error(err)) } - to.PutStr(discovery.ReceiverConfigAttr, base64.StdEncoding.EncodeToString(cfgYaml)) + to[discovery.ReceiverConfigAttr] = base64.StdEncoding.EncodeToString(cfgYaml) } } diff --git a/internal/receiver/discoveryreceiver/evaluator_test.go b/internal/receiver/discoveryreceiver/evaluator_test.go index 5c07dcf3c2..48a1860e83 100644 --- a/internal/receiver/discoveryreceiver/evaluator_test.go +++ b/internal/receiver/discoveryreceiver/evaluator_test.go @@ -24,7 +24,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pdata/pcommon" "go.uber.org/zap" ) @@ -124,12 +123,11 @@ func TestCorrelateResourceAttrs(t *testing.T) { }, } - to := pcommon.NewMap() - + to := map[string]string{} require.Empty(t, eval.correlations.Attrs(endpointID)) eval.correlateResourceAttributes(cfg, to, corr) - expectedResourceAttrs := map[string]any{ + expectedResourceAttrs := map[string]string{ "discovery.observer.id": "type/name", } @@ -147,7 +145,7 @@ watch_observers: `)) } - require.Equal(t, expectedResourceAttrs, to.AsRaw()) + require.Equal(t, expectedResourceAttrs, to) }) } } diff --git a/internal/receiver/discoveryreceiver/metric_evaluator.go b/internal/receiver/discoveryreceiver/metric_evaluator.go index b30262f529..2ba0b69ee4 100644 --- a/internal/receiver/discoveryreceiver/metric_evaluator.go +++ b/internal/receiver/discoveryreceiver/metric_evaluator.go @@ -19,11 +19,9 @@ import ( "fmt" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -47,12 +45,10 @@ var ( // Status match rules. If so, they emit log records for the matching metric. type metricEvaluator struct { *evaluator - pLogs chan plog.Logs } -func newMetricEvaluator(logger *zap.Logger, cfg *Config, pLogs chan plog.Logs, correlations correlationStore) *metricEvaluator { +func newMetricEvaluator(logger *zap.Logger, cfg *Config, correlations correlationStore) *metricEvaluator { return &metricEvaluator{ - pLogs: pLogs, evaluator: newEvaluator(logger, cfg, correlations, // TODO: provide more capable env w/ resource and metric attributes func(pattern string) map[string]any { @@ -67,15 +63,13 @@ func (m *metricEvaluator) Capabilities() consumer.Capabilities { } func (m *metricEvaluator) ConsumeMetrics(_ context.Context, md pmetric.Metrics) error { - if pLogs := m.evaluateMetrics(md); pLogs.LogRecordCount() > 0 { - m.pLogs <- pLogs - } + m.evaluateMetrics(md) return nil } // evaluateMetrics parses the provided Metrics and returns plog.Logs with a single log record if it matches // against the first applicable configured Status match rule. -func (m *metricEvaluator) evaluateMetrics(md pmetric.Metrics) plog.Logs { +func (m *metricEvaluator) evaluateMetrics(md pmetric.Metrics) { if ce := m.logger.Check(zapcore.DebugLevel, "evaluating metrics"); ce != nil { if mbytes, err := jsonMarshaler.MarshalMetrics(md); err == nil { ce.Write(zap.ByteString("metrics", mbytes)) @@ -84,22 +78,22 @@ func (m *metricEvaluator) evaluateMetrics(md pmetric.Metrics) plog.Logs { } } if md.MetricCount() == 0 { - return plog.NewLogs() + return } receiverID, endpointID := statussources.MetricsToReceiverIDs(md) if receiverID == discovery.NoType || endpointID == "" { m.logger.Debug("unable to evaluate metrics from receiver without corresponding name or Endpoint.ID", zap.Any("metrics", md)) - return plog.NewLogs() + return } rEntry, ok := m.config.Receivers[receiverID] if !ok { m.logger.Info("No matching configured receiver for metric status evaluation", zap.String("receiver", receiverID.String())) - return plog.NewLogs() + return } if rEntry.Status == nil || len(rEntry.Status.Metrics) == 0 { - return plog.NewLogs() + return } for _, match := range rEntry.Status.Metrics { @@ -108,20 +102,26 @@ func (m *metricEvaluator) evaluateMetrics(md pmetric.Metrics) plog.Logs { continue } - entityEvents := experimentalmetricmetadata.NewEntityEventsSlice() - entityEvent := entityEvents.AppendEmpty() - entityEvent.ID().PutStr(discovery.EndpointIDAttr, string(endpointID)) - entityState := entityEvent.SetEntityState() - - res.Attributes().CopyTo(entityState.Attributes()) corr := m.correlations.GetOrCreate(endpointID, receiverID) - m.correlateResourceAttributes(m.config, entityState.Attributes(), corr) + attrs := m.correlations.Attrs(endpointID) - // Remove the endpoint ID from the attributes as it's set in the entity ID. - entityState.Attributes().Remove(discovery.EndpointIDAttr) + // If the status is already the same as desired, we don't need to update the entity state. + if match.Status == discovery.StatusType(attrs[discovery.StatusAttr]) { + return + } - entityState.Attributes().PutStr(eventTypeAttr, metricMatch) - entityState.Attributes().PutStr(receiverRuleAttr, rEntry.Rule.String()) + res.Attributes().Range(func(k string, v pcommon.Value) bool { + // skip endpoint ID attr since it's set in the entity ID + if k == discovery.EndpointIDAttr { + return true + } + attrs[k] = v.AsString() + return true + }) + m.correlateResourceAttributes(m.config, attrs, corr) + + attrs[eventTypeAttr] = metricMatch + attrs[receiverRuleAttr] = rEntry.Rule.String() desiredRecord := match.Record if desiredRecord == nil { @@ -131,19 +131,17 @@ func (m *metricEvaluator) evaluateMetrics(md pmetric.Metrics) plog.Logs { if desiredRecord.Body != "" { desiredMsg = desiredRecord.Body } - entityState.Attributes().PutStr(discovery.MessageAttr, desiredMsg) + attrs[discovery.MessageAttr] = desiredMsg for k, v := range desiredRecord.Attributes { - entityState.Attributes().PutStr(k, v) - } - entityState.Attributes().PutStr(metricNameAttr, metric.Name()) - entityState.Attributes().PutStr(discovery.StatusAttr, string(match.Status)) - if ts := m.timestampFromMetric(metric); ts != nil { - entityEvent.SetTimestamp(*ts) + attrs[k] = v } + attrs[metricNameAttr] = metric.Name() + attrs[discovery.StatusAttr] = string(match.Status) + m.correlations.UpdateAttrs(endpointID, attrs) - return entityEvents.ConvertAndMoveToLogs() + m.correlations.EmitCh() <- corr + return } - return plog.NewLogs() } // findMatchedMetric finds the metric that matches the provided match rule and return it along with the resource if found. @@ -166,42 +164,3 @@ func (m *metricEvaluator) findMatchedMetric(md pmetric.Metrics, match Match, rec } return pcommon.NewResource(), pmetric.NewMetric(), false } - -func (m *metricEvaluator) timestampFromMetric(metric pmetric.Metric) *pcommon.Timestamp { - var ts *pcommon.Timestamp - switch dt := metric.Type(); dt { - case pmetric.MetricTypeGauge: - dps := metric.Gauge().DataPoints() - if dps.Len() > 0 { - t := dps.At(0).Timestamp() - ts = &t - } - case pmetric.MetricTypeSum: - dps := metric.Sum().DataPoints() - if dps.Len() > 0 { - t := dps.At(0).Timestamp() - ts = &t - } - case pmetric.MetricTypeHistogram: - dps := metric.Histogram().DataPoints() - if dps.Len() > 0 { - t := dps.At(0).Timestamp() - ts = &t - } - case pmetric.MetricTypeExponentialHistogram: - dps := metric.ExponentialHistogram().DataPoints() - if dps.Len() > 0 { - t := dps.At(0).Timestamp() - ts = &t - } - case pmetric.MetricTypeSummary: - dps := metric.Summary().DataPoints() - if dps.Len() > 0 { - t := dps.At(0).Timestamp() - ts = &t - } - default: - m.logger.Debug("cannot get timestamp from data type", zap.String("data type", dt.String())) - } - return ts -} diff --git a/internal/receiver/discoveryreceiver/metric_evaluator_test.go b/internal/receiver/discoveryreceiver/metric_evaluator_test.go index 0a9bc460fe..44e0e84494 100644 --- a/internal/receiver/discoveryreceiver/metric_evaluator_test.go +++ b/internal/receiver/discoveryreceiver/metric_evaluator_test.go @@ -16,6 +16,7 @@ package discoveryreceiver import ( "context" + "sync" "testing" "time" @@ -24,7 +25,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" @@ -34,10 +34,9 @@ import ( func TestMetricEvaluatorBaseMetricConsumer(t *testing.T) { logger := zap.NewNop() cfg := &Config{} - plogs := make(chan plog.Logs) cStore := newCorrelationStore(logger, time.Hour) - me := newMetricEvaluator(logger, cfg, plogs, cStore) + me := newMetricEvaluator(logger, cfg, cStore) require.Equal(t, consumer.Capabilities{}, me.Capabilities()) md := pmetric.NewMetrics() @@ -81,11 +80,20 @@ func TestMetricEvaluation(t *testing.T) { } require.NoError(t, cfg.Validate()) - plogs := make(chan plog.Logs) cStore := newCorrelationStore(logger, time.Hour) + + emitCh := cStore.EmitCh() + emitWG := sync.WaitGroup{} + emitWG.Add(1) + var corr correlation + go func() { + corr = <-emitCh + emitWG.Done() + }() + cStore.UpdateEndpoint(observer.Endpoint{ID: "endpoint.id"}, receiverID, observerID) - me := newMetricEvaluator(logger, cfg, plogs, cStore) + me := newMetricEvaluator(logger, cfg, cStore) expectedRes := pcommon.NewResource() expectedRes.Attributes().PutStr("discovery.receiver.type", "a_receiver") @@ -113,10 +121,16 @@ func TestMetricEvaluation(t *testing.T) { sms.AppendEmpty().SetName("desired.name") sms.AppendEmpty().SetName("desired.name") - emitted := me.evaluateMetrics(md) + me.evaluateMetrics(md) - require.Equal(t, 1, emitted.LogRecordCount()) + // wait for the emit channel to be processed + emitWG.Wait() + entityEvents, numFailed, err := entityStateEvents(corr.observerID, + []observer.Endpoint{corr.endpoint}, cStore, time.Now()) + require.NoError(t, err) + require.Equal(t, 0, numFailed) + emitted := entityEvents.ConvertAndMoveToLogs() rl := emitted.ResourceLogs().At(0) require.Equal(t, 0, rl.Resource().Attributes().Len()) @@ -145,6 +159,9 @@ func TestMetricEvaluation(t *testing.T) { "one": "one.value", "two": "two.value", "extra_attr": "target_resource", + "discovery.observer.name": "observer.name", + "discovery.observer.type": "an_observer", + "endpoint": "", }, }, lrAttrs.AsRaw()) }) @@ -152,71 +169,3 @@ func TestMetricEvaluation(t *testing.T) { }) } } - -func TestTimestampFromMetric(t *testing.T) { - expectedTime := pcommon.NewTimestampFromTime(time.Now()) - for _, test := range []struct { - metricFunc func(pmetric.Metric) (shouldBeNil bool) - name string - }{ - {name: "MetricTypeGauge", metricFunc: func(md pmetric.Metric) bool { - md.SetEmptyGauge() - md.Gauge().DataPoints().AppendEmpty().SetTimestamp(expectedTime) - return false - }}, - {name: "empty MetricTypeGauge", metricFunc: func(md pmetric.Metric) bool { - md.SetEmptyGauge() - return true - }}, - {name: "MetricTypeSum", metricFunc: func(md pmetric.Metric) bool { - md.SetEmptySum() - md.Sum().DataPoints().AppendEmpty().SetTimestamp(expectedTime) - return false - }}, - {name: "empty MetricTypeSum", metricFunc: func(md pmetric.Metric) bool { - md.SetEmptySum() - return true - }}, - {name: "MetricTypeHistogram", metricFunc: func(md pmetric.Metric) bool { - md.SetEmptyHistogram() - md.Histogram().DataPoints().AppendEmpty().SetTimestamp(expectedTime) - return false - }}, - {name: "empty MetricTypeHistogram", metricFunc: func(md pmetric.Metric) bool { - md.SetEmptyHistogram() - return true - }}, - {name: "MetricTypeExponentialHistogram", metricFunc: func(md pmetric.Metric) bool { - md.SetEmptyExponentialHistogram() - md.ExponentialHistogram().DataPoints().AppendEmpty().SetTimestamp(expectedTime) - return false - }}, - {name: "empty MetricTypeExponentialHistogram", metricFunc: func(md pmetric.Metric) bool { - md.SetEmptyExponentialHistogram() - return true - }}, - {name: "MetricTypeSummary", metricFunc: func(md pmetric.Metric) bool { - md.SetEmptySummary() - md.Summary().DataPoints().AppendEmpty().SetTimestamp(expectedTime) - return false - }}, - {name: "empty MetricTypeSummary", metricFunc: func(md pmetric.Metric) bool { - md.SetEmptySummary() - return true - }}, - {name: "MetricTypeNone", metricFunc: func(_ pmetric.Metric) bool { return true }}, - } { - t.Run(test.name, func(t *testing.T) { - me := newMetricEvaluator(zap.NewNop(), &Config{}, make(chan plog.Logs), nil) - md := pmetric.NewMetrics().ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() - shouldBeNil := test.metricFunc(md) - actual := me.timestampFromMetric(md) - if shouldBeNil { - require.Nil(t, actual) - } else { - require.NotNil(t, actual) - require.Equal(t, expectedTime, *actual) - } - }) - } -} diff --git a/internal/receiver/discoveryreceiver/receiver.go b/internal/receiver/discoveryreceiver/receiver.go index 3c8a157a21..3303ce72c4 100644 --- a/internal/receiver/discoveryreceiver/receiver.go +++ b/internal/receiver/discoveryreceiver/receiver.go @@ -99,9 +99,9 @@ func (d *discoveryReceiver) Start(ctx context.Context, host component.Host) (err d.endpointTracker = newEndpointTracker(d.observables, d.config, d.logger, d.pLogs, correlations) d.endpointTracker.start() - d.metricEvaluator = newMetricEvaluator(d.logger, d.config, d.pLogs, correlations) + d.metricEvaluator = newMetricEvaluator(d.logger, d.config, correlations) - if d.statementEvaluator, err = newStatementEvaluator(d.logger, d.settings.ID, d.config, d.pLogs, correlations); err != nil { + if d.statementEvaluator, err = newStatementEvaluator(d.logger, d.settings.ID, d.config, correlations); err != nil { return fmt.Errorf("failed creating statement evaluator: %w", err) } diff --git a/internal/receiver/discoveryreceiver/statement_evaluator.go b/internal/receiver/discoveryreceiver/statement_evaluator.go index 96b60eecd2..73e9048d1c 100644 --- a/internal/receiver/discoveryreceiver/statement_evaluator.go +++ b/internal/receiver/discoveryreceiver/statement_evaluator.go @@ -19,10 +19,7 @@ import ( "fmt" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -39,14 +36,13 @@ const statementMatch = "statement.match" // for the matching statement. type statementEvaluator struct { *evaluator - pLogs chan plog.Logs // this is the logger to share with other components to evaluate their statements and produce plog.Logs evaluatedLogger *zap.Logger encoder zapcore.Encoder id component.ID } -func newStatementEvaluator(logger *zap.Logger, id component.ID, config *Config, pLogs chan plog.Logs, correlations correlationStore) (*statementEvaluator, error) { +func newStatementEvaluator(logger *zap.Logger, id component.ID, config *Config, correlations correlationStore) (*statementEvaluator, error) { zapConfig := zap.NewProductionConfig() zapConfig.Level = zap.NewAtomicLevelAt(zap.DebugLevel) zapConfig.Sampling.Initial = 1 @@ -54,7 +50,6 @@ func newStatementEvaluator(logger *zap.Logger, id component.ID, config *Config, encoder := statussources.NewZapCoreEncoder() se := &statementEvaluator{ - pLogs: pLogs, encoder: encoder, id: id, } @@ -130,10 +125,7 @@ func (se *statementEvaluator) Write(entry zapcore.Entry, fields []zapcore.Field) } } - if pLogs := se.evaluateStatement(statement); pLogs.LogRecordCount() > 0 { - se.pLogs <- pLogs - } - + se.evaluateStatement(statement) return nil } @@ -144,12 +136,12 @@ func (se *statementEvaluator) Sync() error { // evaluateStatement will convert the provided statussources.Statement into a plog.Logs with a single log record // if it matches against the first applicable configured ReceiverEntry's status Statement.[]Match -func (se *statementEvaluator) evaluateStatement(statement *statussources.Statement) plog.Logs { +func (se *statementEvaluator) evaluateStatement(statement *statussources.Statement) { se.logger.Debug("evaluating statement", zap.Any("statement", statement)) receiverID, endpointID, rEntry, shouldEvaluate := se.receiverEntryFromStatement(statement) if !shouldEvaluate { - return plog.NewLogs() + return } patternMap := map[string]string{"message": statement.Message} @@ -183,19 +175,23 @@ func (se *statementEvaluator) evaluateStatement(statement *statussources.Stateme continue } - entityEvents := experimentalmetricmetadata.NewEntityEventsSlice() - entityEvent := entityEvents.AppendEmpty() - entityEvent.ID().PutStr(discovery.EndpointIDAttr, string(endpointID)) - entityState := entityEvent.SetEntityState() - attrs := entityState.Attributes() - _ = attrs.FromRaw(statement.Fields) corr := se.correlations.GetOrCreate(endpointID, receiverID) + attrs := se.correlations.Attrs(endpointID) + + // If the status is already the same as desired, we don't need to update the entity state. + if match.Status == discovery.StatusType(attrs[discovery.StatusAttr]) { + return + } + + for k, v := range statement.Fields { + attrs[k] = fmt.Sprintf("%v", v) + } se.correlateResourceAttributes(se.config, attrs, corr) - attrs.PutStr(discovery.ReceiverTypeAttr, receiverID.Type().String()) - attrs.PutStr(discovery.ReceiverNameAttr, receiverID.Name()) - attrs.PutStr(discovery.MessageAttr, statement.Message) - attrs.PutStr(eventTypeAttr, statementMatch) - attrs.PutStr(receiverRuleAttr, rEntry.Rule.String()) + attrs[discovery.ReceiverTypeAttr] = receiverID.Type().String() + attrs[discovery.ReceiverNameAttr] = receiverID.Name() + attrs[discovery.MessageAttr] = statement.Message + attrs[eventTypeAttr] = statementMatch + attrs[receiverRuleAttr] = rEntry.Rule.String() var desiredRecord LogRecord if match.Record != nil { @@ -206,18 +202,19 @@ func (se *statementEvaluator) evaluateStatement(statement *statussources.Stateme if desiredRecord.AppendPattern { body = fmt.Sprintf("%s (evaluated %q)", body, p) } - entityState.Attributes().PutStr(discovery.MessageAttr, body) + attrs[discovery.MessageAttr] = body } if len(desiredRecord.Attributes) > 0 { for k, v := range desiredRecord.Attributes { - entityState.Attributes().PutStr(k, v) + attrs[k] = v } } - entityState.Attributes().PutStr(discovery.StatusAttr, string(match.Status)) - entityEvent.SetTimestamp(pcommon.NewTimestampFromTime(statement.Time)) - return entityEvents.ConvertAndMoveToLogs() + attrs[discovery.StatusAttr] = string(match.Status) + se.correlations.UpdateAttrs(endpointID, attrs) + + se.correlations.EmitCh() <- corr + return } - return plog.NewLogs() } func (se *statementEvaluator) receiverEntryFromStatement(statement *statussources.Statement) (component.ID, observer.EndpointID, ReceiverEntry, bool) { diff --git a/internal/receiver/discoveryreceiver/statement_evaluator_test.go b/internal/receiver/discoveryreceiver/statement_evaluator_test.go index f8105b7b3a..1bf2bc1109 100644 --- a/internal/receiver/discoveryreceiver/statement_evaluator_test.go +++ b/internal/receiver/discoveryreceiver/statement_evaluator_test.go @@ -25,7 +25,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" "github.com/signalfx/splunk-otel-collector/internal/common/discovery" @@ -66,31 +65,31 @@ func TestStatementEvaluation(t *testing.T) { } require.NoError(t, cfg.Validate()) - plogs := make(chan plog.Logs) - // If debugging tests, replace the Nop Logger with a test instance to see // all statements. Not in regular use to avoid spamming output. // logger := zaptest.NewLogger(t) logger := zap.NewNop() cStore := newCorrelationStore(logger, time.Hour) + + emitCh := cStore.EmitCh() + emitWG := sync.WaitGroup{} + emitWG.Add(1) + var corr correlation + go func() { + corr = <-emitCh + emitWG.Done() + }() + receiverID := component.MustNewIDWithName("a_receiver", "receiver.name") cStore.UpdateEndpoint(observer.Endpoint{ID: "endpoint.id"}, receiverID, observerID) - se, err := newStatementEvaluator(logger, component.MustNewID("some_type"), cfg, plogs, cStore) + se, err := newStatementEvaluator(logger, component.MustNewID("some_type"), cfg, cStore) require.NoError(t, err) evaluatedLogger := se.evaluatedLogger.With( zap.String("name", `a_receiver/receiver.name/receiver_creator/rc.name/{endpoint=""}/endpoint.id`), ) - var emitted plog.Logs - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - emitted = <-plogs - wg.Done() - }() - for _, statement := range []string{ "undesired.statement", "another.undesired.statement", @@ -105,8 +104,14 @@ func TestStatementEvaluation(t *testing.T) { ) } - wg.Wait() - close(plogs) + // wait for the emit channel to be processed + emitWG.Wait() + + entityEvents, numFailed, err := entityStateEvents(corr.observerID, + []observer.Endpoint{corr.endpoint}, cStore, time.Now()) + require.NoError(t, err) + require.Equal(t, 0, numFailed) + emitted := entityEvents.ConvertAndMoveToLogs() require.Equal(t, 1, emitted.ResourceLogs().Len()) rl := emitted.ResourceLogs().At(0) @@ -160,6 +165,9 @@ func TestStatementEvaluation(t *testing.T) { "attr.two": "attr.two.value", "field.one": "field.one.value", "field_two": "field.two.value", + "discovery.observer.name": "observer.name", + "discovery.observer.type": "an_observer", + "endpoint": "", }, }, lr.Attributes().AsRaw()) }) diff --git a/tests/receivers/discovery/testdata/resource_logs/host_observer_simple_prometheus_statuses.yaml b/tests/receivers/discovery/testdata/resource_logs/host_observer_simple_prometheus_statuses.yaml index 3762405137..0c199e5f6f 100644 --- a/tests/receivers/discovery/testdata/resource_logs/host_observer_simple_prometheus_statuses.yaml +++ b/tests/receivers/discovery/testdata/resource_logs/host_observer_simple_prometheus_statuses.yaml @@ -23,6 +23,15 @@ resource_logs: discovery.status: successful discovery.message: Successfully connected to prometheus server metric.name: otelcol_process_uptime + discovery.observer.type: host_observer + discovery.observer.name: "" + command: /otelcol --config /etc/config.yaml + endpoint: '[::]:8888' + is_ipv6: true + port: 8888 + process_name: otelcol + transport: TCP + type: hostport - scope_logs: - logs: - attributes: @@ -42,3 +51,12 @@ resource_logs: kind: receiver name: prometheus_simple//receiver_creator/discovery{endpoint="[::]:4318"}/(host_observer)[::]-4318-TCP-1 target_labels: '{__name__="up", instance="[::]:4318", job="prometheus_simple/[::]:4318"}' + discovery.observer.name: "" + discovery.observer.type: host_observer + command: /otelcol --config /etc/config.yaml + endpoint: '[::]:4318' + is_ipv6: true + port: 4318 + process_name: otelcol + transport: TCP + type: hostport