From ddc2204af1018b2ea2bfc2c3da8fbff7d6a45c18 Mon Sep 17 00:00:00 2001 From: Murphy Chen Date: Fri, 5 May 2023 12:28:55 +0800 Subject: [PATCH] [exporter/clickhouse] Fix insert metrics with duplicate scope data (#21209) --- .chloggen/fix-deplicate-scope-data.yaml | 16 ++ .../clickhouseexporter/exporter_metrics.go | 19 +-- .../exporter_metrics_test.go | 149 ++++++++++++++++++ .../internal/exponential_histogram_metrics.go | 12 +- .../internal/gauge_metrics.go | 12 +- .../internal/histogram_metrics.go | 12 +- .../internal/metrics_model.go | 2 +- .../internal/sum_metrics.go | 12 +- .../internal/summary_metrics.go | 12 +- 9 files changed, 219 insertions(+), 27 deletions(-) create mode 100644 .chloggen/fix-deplicate-scope-data.yaml diff --git a/.chloggen/fix-deplicate-scope-data.yaml b/.chloggen/fix-deplicate-scope-data.yaml new file mode 100644 index 000000000000..8c3c6893956b --- /dev/null +++ b/.chloggen/fix-deplicate-scope-data.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: clickhouseexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix insert metrics with duplicate scope data + +# One or more tracking issues related to the change +issues: [21082] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/exporter/clickhouseexporter/exporter_metrics.go b/exporter/clickhouseexporter/exporter_metrics.go index 761208b747d3..1abc581262a1 100644 --- a/exporter/clickhouseexporter/exporter_metrics.go +++ b/exporter/clickhouseexporter/exporter_metrics.go @@ -70,29 +70,26 @@ func (e *metricsExporter) shutdown(ctx context.Context) error { func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metrics) error { metricsMap := internal.NewMetricsModel(e.cfg.MetricsTableName) for i := 0; i < md.ResourceMetrics().Len(); i++ { - metaData := internal.MetricsMetaData{} metrics := md.ResourceMetrics().At(i) - res := metrics.Resource() - metaData.ResAttr = attributesToMap(res.Attributes()) - metaData.ResURL = metrics.SchemaUrl() + resAttr := attributesToMap(metrics.Resource().Attributes()) for j := 0; j < metrics.ScopeMetrics().Len(); j++ { rs := metrics.ScopeMetrics().At(j).Metrics() - metaData.ScopeURL = metrics.ScopeMetrics().At(j).SchemaUrl() - metaData.ScopeInstr = metrics.ScopeMetrics().At(j).Scope() + scopeInstr := metrics.ScopeMetrics().At(j).Scope() + scopeURL := metrics.ScopeMetrics().At(j).SchemaUrl() for k := 0; k < rs.Len(); k++ { r := rs.At(k) var errs error switch r.Type() { case pmetric.MetricTypeGauge: - errs = multierr.Append(errs, metricsMap[pmetric.MetricTypeGauge].Add(r.Gauge(), &metaData, r.Name(), r.Description(), r.Unit())) + errs = multierr.Append(errs, metricsMap[pmetric.MetricTypeGauge].Add(resAttr, metrics.SchemaUrl(), scopeInstr, scopeURL, r.Gauge(), r.Name(), r.Description(), r.Unit())) case pmetric.MetricTypeSum: - errs = multierr.Append(errs, metricsMap[pmetric.MetricTypeSum].Add(r.Sum(), &metaData, r.Name(), r.Description(), r.Unit())) + errs = multierr.Append(errs, metricsMap[pmetric.MetricTypeSum].Add(resAttr, metrics.SchemaUrl(), scopeInstr, scopeURL, r.Sum(), r.Name(), r.Description(), r.Unit())) case pmetric.MetricTypeHistogram: - errs = multierr.Append(errs, metricsMap[pmetric.MetricTypeHistogram].Add(r.Histogram(), &metaData, r.Name(), r.Description(), r.Unit())) + errs = multierr.Append(errs, metricsMap[pmetric.MetricTypeHistogram].Add(resAttr, metrics.SchemaUrl(), scopeInstr, scopeURL, r.Histogram(), r.Name(), r.Description(), r.Unit())) case pmetric.MetricTypeExponentialHistogram: - errs = multierr.Append(errs, metricsMap[pmetric.MetricTypeExponentialHistogram].Add(r.ExponentialHistogram(), &metaData, r.Name(), r.Description(), r.Unit())) + errs = multierr.Append(errs, metricsMap[pmetric.MetricTypeExponentialHistogram].Add(resAttr, metrics.SchemaUrl(), scopeInstr, scopeURL, r.ExponentialHistogram(), r.Name(), r.Description(), r.Unit())) case pmetric.MetricTypeSummary: - errs = multierr.Append(errs, metricsMap[pmetric.MetricTypeSummary].Add(r.Summary(), &metaData, r.Name(), r.Description(), r.Unit())) + errs = multierr.Append(errs, metricsMap[pmetric.MetricTypeSummary].Add(resAttr, metrics.SchemaUrl(), scopeInstr, scopeURL, r.Summary(), r.Name(), r.Description(), r.Unit())) default: return fmt.Errorf("unsupported metrics type") } diff --git a/exporter/clickhouseexporter/exporter_metrics_test.go b/exporter/clickhouseexporter/exporter_metrics_test.go index 188b8cc48933..bc29038f1a16 100644 --- a/exporter/clickhouseexporter/exporter_metrics_test.go +++ b/exporter/clickhouseexporter/exporter_metrics_test.go @@ -103,6 +103,48 @@ func TestExporter_pushMetricsData(t *testing.T) { require.Equal(t, int32(5), items.Load()) }) + t.Run("check one resource metrics contain different scope metrics", func(t *testing.T) { + initClickhouseTestServer(t, func(query string, values []driver.Value) error { + if strings.HasPrefix(query, "INSERT INTO otel_metrics_exponential_histogram") { + require.Equal(t, "Resource SchemaUrl 2", values[30]) + require.Equal(t, "Scope name 2", values[31]) + + require.Equal(t, "Resource SchemaUrl 2", values[59]) + require.Equal(t, "Scope name 3", values[60]) + } + if strings.HasPrefix(query, "INSERT INTO otel_metrics_gauge") { + require.Equal(t, "Resource SchemaUrl 2", values[21]) + require.Equal(t, "Scope name 2", values[22]) + + require.Equal(t, "Resource SchemaUrl 2", values[41]) + require.Equal(t, "Scope name 3", values[42]) + } + if strings.HasPrefix(query, "INSERT INTO otel_metrics_histogram") { + require.Equal(t, "Resource SchemaUrl 2", values[26]) + require.Equal(t, "Scope name 2", values[27]) + + require.Equal(t, "Resource SchemaUrl 2", values[51]) + require.Equal(t, "Scope name 3", values[52]) + } + if strings.HasPrefix(query, "INSERT INTO otel_metrics_sum (") { + require.Equal(t, "Resource SchemaUrl 2", values[23]) + require.Equal(t, "Scope name 2", values[24]) + + require.Equal(t, "Resource SchemaUrl 2", values[45]) + require.Equal(t, "Scope name 3", values[46]) + } + if strings.HasPrefix(query, "INSERT INTO otel_metrics_summary") { + require.Equal(t, "Resource SchemaUrl 2", values[19]) + require.Equal(t, "Scope name 2", values[20]) + + require.Equal(t, "Resource SchemaUrl 2", values[37]) + require.Equal(t, "Scope name 3", values[38]) + } + return nil + }) + exporter := newTestMetricsExporter(t) + mustPushMetricsData(t, exporter, simpleMetrics(1)) + }) } func Benchmark_pushMetricsData(b *testing.B) { @@ -342,6 +384,113 @@ func simpleMetrics(count int) pmetric.Metrics { quantileValues.SetValue(1) quantileValues.SetQuantile(1) } + + // add a different scope metrics + sm = rm.ScopeMetrics().AppendEmpty() + sm.SetSchemaUrl("Scope SchemaUrl 3") + sm.Scope().Attributes().PutStr("Scope Attributes 3", "value3") + sm.Scope().SetDroppedAttributesCount(20) + sm.Scope().SetName("Scope name 3") + sm.Scope().SetVersion("Scope version 3") + for i := 0; i < count; i++ { + // gauge + m := sm.Metrics().AppendEmpty() + m.SetName("gauge metrics") + m.SetUnit("count") + m.SetDescription("This is a gauge metrics") + dp := m.SetEmptyGauge().DataPoints().AppendEmpty() + dp.SetIntValue(int64(i)) + dp.Attributes().PutStr("gauge_label_3", "3") + dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + exemplars := dp.Exemplars().AppendEmpty() + exemplars.SetIntValue(54) + exemplars.FilteredAttributes().PutStr("key", "value") + exemplars.FilteredAttributes().PutStr("key2", "value2") + exemplars.SetSpanID([8]byte{1, 2, 3, byte(i)}) + exemplars.SetTraceID([16]byte{1, 2, 3, byte(i)}) + + // sum + m = sm.Metrics().AppendEmpty() + m.SetName("sum metrics") + m.SetUnit("count") + m.SetDescription("This is a sum metrics") + dp = m.SetEmptySum().DataPoints().AppendEmpty() + dp.SetIntValue(int64(i)) + dp.Attributes().PutStr("sum_label_2", "2") + dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + exemplars = dp.Exemplars().AppendEmpty() + exemplars.SetIntValue(54) + exemplars.FilteredAttributes().PutStr("key", "value") + exemplars.FilteredAttributes().PutStr("key2", "value2") + exemplars.SetSpanID([8]byte{1, 2, 3, byte(i)}) + exemplars.SetTraceID([16]byte{1, 2, 3, byte(i)}) + + // histogram + m = sm.Metrics().AppendEmpty() + m.SetName("histogram metrics") + m.SetUnit("ms") + m.SetDescription("This is a histogram metrics") + dpHisto := m.SetEmptyHistogram().DataPoints().AppendEmpty() + dpHisto.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now())) + dpHisto.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + dpHisto.SetCount(1) + dpHisto.SetSum(1) + dpHisto.Attributes().PutStr("key", "value") + dpHisto.Attributes().PutStr("key", "value") + dpHisto.ExplicitBounds().FromRaw([]float64{0, 0, 0, 0, 0}) + dpHisto.BucketCounts().FromRaw([]uint64{0, 0, 0, 1, 0}) + dpHisto.SetMin(0) + dpHisto.SetMax(1) + exemplars = dpHisto.Exemplars().AppendEmpty() + exemplars.SetIntValue(54) + exemplars.FilteredAttributes().PutStr("key", "value") + exemplars.FilteredAttributes().PutStr("key2", "value2") + exemplars.SetSpanID([8]byte{1, 2, 3, byte(i)}) + exemplars.SetTraceID([16]byte{1, 2, 3, byte(i)}) + + // exp histogram + m = sm.Metrics().AppendEmpty() + m.SetName("exp histogram metrics") + m.SetUnit("ms") + m.SetDescription("This is a exp histogram metrics") + dpExpHisto := m.SetEmptyExponentialHistogram().DataPoints().AppendEmpty() + dpExpHisto.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now())) + dpExpHisto.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + dpExpHisto.SetSum(1) + dpExpHisto.SetMin(0) + dpExpHisto.SetMax(1) + dpExpHisto.SetZeroCount(0) + dpExpHisto.SetCount(1) + dpExpHisto.Attributes().PutStr("key", "value") + dpExpHisto.Attributes().PutStr("key", "value") + dpExpHisto.Negative().SetOffset(1) + dpExpHisto.Negative().BucketCounts().FromRaw([]uint64{0, 0, 0, 1, 0}) + dpExpHisto.Positive().SetOffset(1) + dpExpHisto.Positive().BucketCounts().FromRaw([]uint64{0, 0, 0, 1, 0}) + + exemplars = dpExpHisto.Exemplars().AppendEmpty() + exemplars.SetIntValue(54) + exemplars.FilteredAttributes().PutStr("key", "value") + exemplars.FilteredAttributes().PutStr("key2", "value2") + exemplars.SetSpanID([8]byte{1, 2, 3, byte(i)}) + exemplars.SetTraceID([16]byte{1, 2, 3, byte(i)}) + + // summary + m = sm.Metrics().AppendEmpty() + m.SetName("summary histogram metrics") + m.SetUnit("ms") + m.SetDescription("This is a summary metrics") + summary := m.SetEmptySummary().DataPoints().AppendEmpty() + summary.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now())) + summary.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + summary.Attributes().PutStr("key", "value") + summary.Attributes().PutStr("key2", "value2") + summary.SetCount(1) + summary.SetSum(1) + quantileValues := summary.QuantileValues().AppendEmpty() + quantileValues.SetValue(1) + quantileValues.SetQuantile(1) + } return metrics } diff --git a/exporter/clickhouseexporter/internal/exponential_histogram_metrics.go b/exporter/clickhouseexporter/internal/exponential_histogram_metrics.go index c252600d6b49..1d648b5c7b19 100644 --- a/exporter/clickhouseexporter/internal/exponential_histogram_metrics.go +++ b/exporter/clickhouseexporter/internal/exponential_histogram_metrics.go @@ -21,6 +21,7 @@ import ( "strings" "time" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" ) @@ -189,7 +190,7 @@ func (e *expHistogramMetrics) insert(ctx context.Context, db *sql.DB) error { return nil } -func (e *expHistogramMetrics) Add(metrics any, metaData *MetricsMetaData, name string, description string, unit string) error { +func (e *expHistogramMetrics) Add(resAttr map[string]string, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error { expHistogram, ok := metrics.(pmetric.ExponentialHistogram) if !ok { return fmt.Errorf("metrics param is not type of ExponentialHistogram") @@ -199,8 +200,13 @@ func (e *expHistogramMetrics) Add(metrics any, metaData *MetricsMetaData, name s metricName: name, metricDescription: description, metricUnit: unit, - metadata: metaData, - expHistogram: expHistogram, + metadata: &MetricsMetaData{ + ResAttr: resAttr, + ResURL: resURL, + ScopeURL: scopeURL, + ScopeInstr: scopeInstr, + }, + expHistogram: expHistogram, }) return nil diff --git a/exporter/clickhouseexporter/internal/gauge_metrics.go b/exporter/clickhouseexporter/internal/gauge_metrics.go index 136539b1bff9..ed2c376e6e53 100644 --- a/exporter/clickhouseexporter/internal/gauge_metrics.go +++ b/exporter/clickhouseexporter/internal/gauge_metrics.go @@ -21,6 +21,7 @@ import ( "strings" "time" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" ) @@ -162,7 +163,7 @@ func (g *gaugeMetrics) insert(ctx context.Context, db *sql.DB) error { return nil } -func (g *gaugeMetrics) Add(metrics any, metaData *MetricsMetaData, name string, description string, unit string) error { +func (g *gaugeMetrics) Add(resAttr map[string]string, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error { gauge, ok := metrics.(pmetric.Gauge) if !ok { return fmt.Errorf("metrics param is not type of Gauge") @@ -172,8 +173,13 @@ func (g *gaugeMetrics) Add(metrics any, metaData *MetricsMetaData, name string, metricName: name, metricDescription: description, metricUnit: unit, - metadata: metaData, - gauge: gauge, + metadata: &MetricsMetaData{ + ResAttr: resAttr, + ResURL: resURL, + ScopeURL: scopeURL, + ScopeInstr: scopeInstr, + }, + gauge: gauge, }) return nil } diff --git a/exporter/clickhouseexporter/internal/histogram_metrics.go b/exporter/clickhouseexporter/internal/histogram_metrics.go index 01ebbf682221..f6a472638a5a 100644 --- a/exporter/clickhouseexporter/internal/histogram_metrics.go +++ b/exporter/clickhouseexporter/internal/histogram_metrics.go @@ -21,6 +21,7 @@ import ( "strings" "time" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" ) @@ -177,7 +178,7 @@ func (h *histogramMetrics) insert(ctx context.Context, db *sql.DB) error { return nil } -func (h *histogramMetrics) Add(metrics any, metaData *MetricsMetaData, name string, description string, unit string) error { +func (h *histogramMetrics) Add(resAttr map[string]string, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error { histogram, ok := metrics.(pmetric.Histogram) if !ok { return fmt.Errorf("metrics param is not type of Histogram") @@ -187,8 +188,13 @@ func (h *histogramMetrics) Add(metrics any, metaData *MetricsMetaData, name stri metricName: name, metricDescription: description, metricUnit: unit, - metadata: metaData, - histogram: histogram, + metadata: &MetricsMetaData{ + ResAttr: resAttr, + ResURL: resURL, + ScopeURL: scopeURL, + ScopeInstr: scopeInstr, + }, + histogram: histogram, }) return nil } diff --git a/exporter/clickhouseexporter/internal/metrics_model.go b/exporter/clickhouseexporter/internal/metrics_model.go index 087de2188c6d..a3bafa5eda3a 100644 --- a/exporter/clickhouseexporter/internal/metrics_model.go +++ b/exporter/clickhouseexporter/internal/metrics_model.go @@ -43,7 +43,7 @@ var logger *zap.Logger // any type of metrics need implement it. type MetricsModel interface { // Add used to bind MetricsMetaData to a specific metric then put them into a slice - Add(metrics any, metaData *MetricsMetaData, name string, description string, unit string) error + Add(resAttr map[string]string, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error // insert is used to insert metric data to clickhouse insert(ctx context.Context, db *sql.DB) error } diff --git a/exporter/clickhouseexporter/internal/sum_metrics.go b/exporter/clickhouseexporter/internal/sum_metrics.go index 8fe71b0637de..c30fa5cd184c 100644 --- a/exporter/clickhouseexporter/internal/sum_metrics.go +++ b/exporter/clickhouseexporter/internal/sum_metrics.go @@ -21,6 +21,7 @@ import ( "strings" "time" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" ) @@ -168,7 +169,7 @@ func (s *sumMetrics) insert(ctx context.Context, db *sql.DB) error { return nil } -func (s *sumMetrics) Add(metrics any, metaData *MetricsMetaData, name string, description string, unit string) error { +func (s *sumMetrics) Add(resAttr map[string]string, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error { sum, ok := metrics.(pmetric.Sum) if !ok { return fmt.Errorf("metrics param is not type of Sum") @@ -178,8 +179,13 @@ func (s *sumMetrics) Add(metrics any, metaData *MetricsMetaData, name string, de metricName: name, metricDescription: description, metricUnit: unit, - metadata: metaData, - sum: sum, + metadata: &MetricsMetaData{ + ResAttr: resAttr, + ResURL: resURL, + ScopeURL: scopeURL, + ScopeInstr: scopeInstr, + }, + sum: sum, }) return nil } diff --git a/exporter/clickhouseexporter/internal/summary_metrics.go b/exporter/clickhouseexporter/internal/summary_metrics.go index 6b29a1c858df..8bd769ae3f19 100644 --- a/exporter/clickhouseexporter/internal/summary_metrics.go +++ b/exporter/clickhouseexporter/internal/summary_metrics.go @@ -21,6 +21,7 @@ import ( "strings" "time" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" ) @@ -156,7 +157,7 @@ func (s *summaryMetrics) insert(ctx context.Context, db *sql.DB) error { return nil } -func (s *summaryMetrics) Add(metrics any, metaData *MetricsMetaData, name string, description string, unit string) error { +func (s *summaryMetrics) Add(resAttr map[string]string, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error { summary, ok := metrics.(pmetric.Summary) if !ok { return fmt.Errorf("metrics param is not type of Summary") @@ -166,8 +167,13 @@ func (s *summaryMetrics) Add(metrics any, metaData *MetricsMetaData, name string metricName: name, metricDescription: description, metricUnit: unit, - metadata: metaData, - summary: summary, + metadata: &MetricsMetaData{ + ResAttr: resAttr, + ResURL: resURL, + ScopeURL: scopeURL, + ScopeInstr: scopeInstr, + }, + summary: summary, }) return nil }