Skip to content

Commit

Permalink
[exporter/clickhouse] Fix insert metrics with duplicate scope data (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Frapschen authored May 5, 2023
1 parent 5bb35a4 commit ddc2204
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 27 deletions.
16 changes: 16 additions & 0 deletions .chloggen/fix-deplicate-scope-data.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: clickhouseexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix insert metrics with duplicate scope data

# One or more tracking issues related to the change
issues: [21082]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
19 changes: 8 additions & 11 deletions exporter/clickhouseexporter/exporter_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,29 +70,26 @@ func (e *metricsExporter) shutdown(ctx context.Context) error {
func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metrics) error {
metricsMap := internal.NewMetricsModel(e.cfg.MetricsTableName)
for i := 0; i < md.ResourceMetrics().Len(); i++ {
metaData := internal.MetricsMetaData{}
metrics := md.ResourceMetrics().At(i)
res := metrics.Resource()
metaData.ResAttr = attributesToMap(res.Attributes())
metaData.ResURL = metrics.SchemaUrl()
resAttr := attributesToMap(metrics.Resource().Attributes())
for j := 0; j < metrics.ScopeMetrics().Len(); j++ {
rs := metrics.ScopeMetrics().At(j).Metrics()
metaData.ScopeURL = metrics.ScopeMetrics().At(j).SchemaUrl()
metaData.ScopeInstr = metrics.ScopeMetrics().At(j).Scope()
scopeInstr := metrics.ScopeMetrics().At(j).Scope()
scopeURL := metrics.ScopeMetrics().At(j).SchemaUrl()
for k := 0; k < rs.Len(); k++ {
r := rs.At(k)
var errs error
switch r.Type() {
case pmetric.MetricTypeGauge:
errs = multierr.Append(errs, metricsMap[pmetric.MetricTypeGauge].Add(r.Gauge(), &metaData, r.Name(), r.Description(), r.Unit()))
errs = multierr.Append(errs, metricsMap[pmetric.MetricTypeGauge].Add(resAttr, metrics.SchemaUrl(), scopeInstr, scopeURL, r.Gauge(), r.Name(), r.Description(), r.Unit()))
case pmetric.MetricTypeSum:
errs = multierr.Append(errs, metricsMap[pmetric.MetricTypeSum].Add(r.Sum(), &metaData, r.Name(), r.Description(), r.Unit()))
errs = multierr.Append(errs, metricsMap[pmetric.MetricTypeSum].Add(resAttr, metrics.SchemaUrl(), scopeInstr, scopeURL, r.Sum(), r.Name(), r.Description(), r.Unit()))
case pmetric.MetricTypeHistogram:
errs = multierr.Append(errs, metricsMap[pmetric.MetricTypeHistogram].Add(r.Histogram(), &metaData, r.Name(), r.Description(), r.Unit()))
errs = multierr.Append(errs, metricsMap[pmetric.MetricTypeHistogram].Add(resAttr, metrics.SchemaUrl(), scopeInstr, scopeURL, r.Histogram(), r.Name(), r.Description(), r.Unit()))
case pmetric.MetricTypeExponentialHistogram:
errs = multierr.Append(errs, metricsMap[pmetric.MetricTypeExponentialHistogram].Add(r.ExponentialHistogram(), &metaData, r.Name(), r.Description(), r.Unit()))
errs = multierr.Append(errs, metricsMap[pmetric.MetricTypeExponentialHistogram].Add(resAttr, metrics.SchemaUrl(), scopeInstr, scopeURL, r.ExponentialHistogram(), r.Name(), r.Description(), r.Unit()))
case pmetric.MetricTypeSummary:
errs = multierr.Append(errs, metricsMap[pmetric.MetricTypeSummary].Add(r.Summary(), &metaData, r.Name(), r.Description(), r.Unit()))
errs = multierr.Append(errs, metricsMap[pmetric.MetricTypeSummary].Add(resAttr, metrics.SchemaUrl(), scopeInstr, scopeURL, r.Summary(), r.Name(), r.Description(), r.Unit()))
default:
return fmt.Errorf("unsupported metrics type")
}
Expand Down
149 changes: 149 additions & 0 deletions exporter/clickhouseexporter/exporter_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,48 @@ func TestExporter_pushMetricsData(t *testing.T) {

require.Equal(t, int32(5), items.Load())
})
t.Run("check one resource metrics contain different scope metrics", func(t *testing.T) {
initClickhouseTestServer(t, func(query string, values []driver.Value) error {
if strings.HasPrefix(query, "INSERT INTO otel_metrics_exponential_histogram") {
require.Equal(t, "Resource SchemaUrl 2", values[30])
require.Equal(t, "Scope name 2", values[31])

require.Equal(t, "Resource SchemaUrl 2", values[59])
require.Equal(t, "Scope name 3", values[60])
}
if strings.HasPrefix(query, "INSERT INTO otel_metrics_gauge") {
require.Equal(t, "Resource SchemaUrl 2", values[21])
require.Equal(t, "Scope name 2", values[22])

require.Equal(t, "Resource SchemaUrl 2", values[41])
require.Equal(t, "Scope name 3", values[42])
}
if strings.HasPrefix(query, "INSERT INTO otel_metrics_histogram") {
require.Equal(t, "Resource SchemaUrl 2", values[26])
require.Equal(t, "Scope name 2", values[27])

require.Equal(t, "Resource SchemaUrl 2", values[51])
require.Equal(t, "Scope name 3", values[52])
}
if strings.HasPrefix(query, "INSERT INTO otel_metrics_sum (") {
require.Equal(t, "Resource SchemaUrl 2", values[23])
require.Equal(t, "Scope name 2", values[24])

require.Equal(t, "Resource SchemaUrl 2", values[45])
require.Equal(t, "Scope name 3", values[46])
}
if strings.HasPrefix(query, "INSERT INTO otel_metrics_summary") {
require.Equal(t, "Resource SchemaUrl 2", values[19])
require.Equal(t, "Scope name 2", values[20])

require.Equal(t, "Resource SchemaUrl 2", values[37])
require.Equal(t, "Scope name 3", values[38])
}
return nil
})
exporter := newTestMetricsExporter(t)
mustPushMetricsData(t, exporter, simpleMetrics(1))
})
}

func Benchmark_pushMetricsData(b *testing.B) {
Expand Down Expand Up @@ -342,6 +384,113 @@ func simpleMetrics(count int) pmetric.Metrics {
quantileValues.SetValue(1)
quantileValues.SetQuantile(1)
}

// add a different scope metrics
sm = rm.ScopeMetrics().AppendEmpty()
sm.SetSchemaUrl("Scope SchemaUrl 3")
sm.Scope().Attributes().PutStr("Scope Attributes 3", "value3")
sm.Scope().SetDroppedAttributesCount(20)
sm.Scope().SetName("Scope name 3")
sm.Scope().SetVersion("Scope version 3")
for i := 0; i < count; i++ {
// gauge
m := sm.Metrics().AppendEmpty()
m.SetName("gauge metrics")
m.SetUnit("count")
m.SetDescription("This is a gauge metrics")
dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
dp.SetIntValue(int64(i))
dp.Attributes().PutStr("gauge_label_3", "3")
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
exemplars := dp.Exemplars().AppendEmpty()
exemplars.SetIntValue(54)
exemplars.FilteredAttributes().PutStr("key", "value")
exemplars.FilteredAttributes().PutStr("key2", "value2")
exemplars.SetSpanID([8]byte{1, 2, 3, byte(i)})
exemplars.SetTraceID([16]byte{1, 2, 3, byte(i)})

// sum
m = sm.Metrics().AppendEmpty()
m.SetName("sum metrics")
m.SetUnit("count")
m.SetDescription("This is a sum metrics")
dp = m.SetEmptySum().DataPoints().AppendEmpty()
dp.SetIntValue(int64(i))
dp.Attributes().PutStr("sum_label_2", "2")
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
exemplars = dp.Exemplars().AppendEmpty()
exemplars.SetIntValue(54)
exemplars.FilteredAttributes().PutStr("key", "value")
exemplars.FilteredAttributes().PutStr("key2", "value2")
exemplars.SetSpanID([8]byte{1, 2, 3, byte(i)})
exemplars.SetTraceID([16]byte{1, 2, 3, byte(i)})

// histogram
m = sm.Metrics().AppendEmpty()
m.SetName("histogram metrics")
m.SetUnit("ms")
m.SetDescription("This is a histogram metrics")
dpHisto := m.SetEmptyHistogram().DataPoints().AppendEmpty()
dpHisto.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now()))
dpHisto.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
dpHisto.SetCount(1)
dpHisto.SetSum(1)
dpHisto.Attributes().PutStr("key", "value")
dpHisto.Attributes().PutStr("key", "value")
dpHisto.ExplicitBounds().FromRaw([]float64{0, 0, 0, 0, 0})
dpHisto.BucketCounts().FromRaw([]uint64{0, 0, 0, 1, 0})
dpHisto.SetMin(0)
dpHisto.SetMax(1)
exemplars = dpHisto.Exemplars().AppendEmpty()
exemplars.SetIntValue(54)
exemplars.FilteredAttributes().PutStr("key", "value")
exemplars.FilteredAttributes().PutStr("key2", "value2")
exemplars.SetSpanID([8]byte{1, 2, 3, byte(i)})
exemplars.SetTraceID([16]byte{1, 2, 3, byte(i)})

// exp histogram
m = sm.Metrics().AppendEmpty()
m.SetName("exp histogram metrics")
m.SetUnit("ms")
m.SetDescription("This is a exp histogram metrics")
dpExpHisto := m.SetEmptyExponentialHistogram().DataPoints().AppendEmpty()
dpExpHisto.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now()))
dpExpHisto.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
dpExpHisto.SetSum(1)
dpExpHisto.SetMin(0)
dpExpHisto.SetMax(1)
dpExpHisto.SetZeroCount(0)
dpExpHisto.SetCount(1)
dpExpHisto.Attributes().PutStr("key", "value")
dpExpHisto.Attributes().PutStr("key", "value")
dpExpHisto.Negative().SetOffset(1)
dpExpHisto.Negative().BucketCounts().FromRaw([]uint64{0, 0, 0, 1, 0})
dpExpHisto.Positive().SetOffset(1)
dpExpHisto.Positive().BucketCounts().FromRaw([]uint64{0, 0, 0, 1, 0})

exemplars = dpExpHisto.Exemplars().AppendEmpty()
exemplars.SetIntValue(54)
exemplars.FilteredAttributes().PutStr("key", "value")
exemplars.FilteredAttributes().PutStr("key2", "value2")
exemplars.SetSpanID([8]byte{1, 2, 3, byte(i)})
exemplars.SetTraceID([16]byte{1, 2, 3, byte(i)})

// summary
m = sm.Metrics().AppendEmpty()
m.SetName("summary histogram metrics")
m.SetUnit("ms")
m.SetDescription("This is a summary metrics")
summary := m.SetEmptySummary().DataPoints().AppendEmpty()
summary.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now()))
summary.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
summary.Attributes().PutStr("key", "value")
summary.Attributes().PutStr("key2", "value2")
summary.SetCount(1)
summary.SetSum(1)
quantileValues := summary.QuantileValues().AppendEmpty()
quantileValues.SetValue(1)
quantileValues.SetQuantile(1)
}
return metrics
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -189,7 +190,7 @@ func (e *expHistogramMetrics) insert(ctx context.Context, db *sql.DB) error {
return nil
}

func (e *expHistogramMetrics) Add(metrics any, metaData *MetricsMetaData, name string, description string, unit string) error {
func (e *expHistogramMetrics) Add(resAttr map[string]string, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error {
expHistogram, ok := metrics.(pmetric.ExponentialHistogram)
if !ok {
return fmt.Errorf("metrics param is not type of ExponentialHistogram")
Expand All @@ -199,8 +200,13 @@ func (e *expHistogramMetrics) Add(metrics any, metaData *MetricsMetaData, name s
metricName: name,
metricDescription: description,
metricUnit: unit,
metadata: metaData,
expHistogram: expHistogram,
metadata: &MetricsMetaData{
ResAttr: resAttr,
ResURL: resURL,
ScopeURL: scopeURL,
ScopeInstr: scopeInstr,
},
expHistogram: expHistogram,
})

return nil
Expand Down
12 changes: 9 additions & 3 deletions exporter/clickhouseexporter/internal/gauge_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -162,7 +163,7 @@ func (g *gaugeMetrics) insert(ctx context.Context, db *sql.DB) error {
return nil
}

func (g *gaugeMetrics) Add(metrics any, metaData *MetricsMetaData, name string, description string, unit string) error {
func (g *gaugeMetrics) Add(resAttr map[string]string, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error {
gauge, ok := metrics.(pmetric.Gauge)
if !ok {
return fmt.Errorf("metrics param is not type of Gauge")
Expand All @@ -172,8 +173,13 @@ func (g *gaugeMetrics) Add(metrics any, metaData *MetricsMetaData, name string,
metricName: name,
metricDescription: description,
metricUnit: unit,
metadata: metaData,
gauge: gauge,
metadata: &MetricsMetaData{
ResAttr: resAttr,
ResURL: resURL,
ScopeURL: scopeURL,
ScopeInstr: scopeInstr,
},
gauge: gauge,
})
return nil
}
12 changes: 9 additions & 3 deletions exporter/clickhouseexporter/internal/histogram_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -177,7 +178,7 @@ func (h *histogramMetrics) insert(ctx context.Context, db *sql.DB) error {
return nil
}

func (h *histogramMetrics) Add(metrics any, metaData *MetricsMetaData, name string, description string, unit string) error {
func (h *histogramMetrics) Add(resAttr map[string]string, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error {
histogram, ok := metrics.(pmetric.Histogram)
if !ok {
return fmt.Errorf("metrics param is not type of Histogram")
Expand All @@ -187,8 +188,13 @@ func (h *histogramMetrics) Add(metrics any, metaData *MetricsMetaData, name stri
metricName: name,
metricDescription: description,
metricUnit: unit,
metadata: metaData,
histogram: histogram,
metadata: &MetricsMetaData{
ResAttr: resAttr,
ResURL: resURL,
ScopeURL: scopeURL,
ScopeInstr: scopeInstr,
},
histogram: histogram,
})
return nil
}
2 changes: 1 addition & 1 deletion exporter/clickhouseexporter/internal/metrics_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var logger *zap.Logger
// any type of metrics need implement it.
type MetricsModel interface {
// Add used to bind MetricsMetaData to a specific metric then put them into a slice
Add(metrics any, metaData *MetricsMetaData, name string, description string, unit string) error
Add(resAttr map[string]string, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error
// insert is used to insert metric data to clickhouse
insert(ctx context.Context, db *sql.DB) error
}
Expand Down
12 changes: 9 additions & 3 deletions exporter/clickhouseexporter/internal/sum_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -168,7 +169,7 @@ func (s *sumMetrics) insert(ctx context.Context, db *sql.DB) error {
return nil
}

func (s *sumMetrics) Add(metrics any, metaData *MetricsMetaData, name string, description string, unit string) error {
func (s *sumMetrics) Add(resAttr map[string]string, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error {
sum, ok := metrics.(pmetric.Sum)
if !ok {
return fmt.Errorf("metrics param is not type of Sum")
Expand All @@ -178,8 +179,13 @@ func (s *sumMetrics) Add(metrics any, metaData *MetricsMetaData, name string, de
metricName: name,
metricDescription: description,
metricUnit: unit,
metadata: metaData,
sum: sum,
metadata: &MetricsMetaData{
ResAttr: resAttr,
ResURL: resURL,
ScopeURL: scopeURL,
ScopeInstr: scopeInstr,
},
sum: sum,
})
return nil
}
12 changes: 9 additions & 3 deletions exporter/clickhouseexporter/internal/summary_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -156,7 +157,7 @@ func (s *summaryMetrics) insert(ctx context.Context, db *sql.DB) error {
return nil
}

func (s *summaryMetrics) Add(metrics any, metaData *MetricsMetaData, name string, description string, unit string) error {
func (s *summaryMetrics) Add(resAttr map[string]string, resURL string, scopeInstr pcommon.InstrumentationScope, scopeURL string, metrics any, name string, description string, unit string) error {
summary, ok := metrics.(pmetric.Summary)
if !ok {
return fmt.Errorf("metrics param is not type of Summary")
Expand All @@ -166,8 +167,13 @@ func (s *summaryMetrics) Add(metrics any, metaData *MetricsMetaData, name string
metricName: name,
metricDescription: description,
metricUnit: unit,
metadata: metaData,
summary: summary,
metadata: &MetricsMetaData{
ResAttr: resAttr,
ResURL: resURL,
ScopeURL: scopeURL,
ScopeInstr: scopeInstr,
},
summary: summary,
})
return nil
}

0 comments on commit ddc2204

Please sign in to comment.