From b5ff96efeec138465289262186d9aebfee25bfaa Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Fri, 10 Feb 2023 11:10:20 +0800 Subject: [PATCH 1/2] Fix the bug that tcp metrics are not aggregated correctly Signed-off-by: Daxin Wang --- .../default_aggregator_test.go | 2 +- .../defaultaggregator/value_recorder.go | 4 +- .../defaultaggregator/value_recorder_test.go | 37 +++++++++++++++++ .../tcpmetricanalyzer/tcp_analyzer.go | 15 +++---- .../exporter/otelexporter/instrument_test.go | 19 ++++----- .../exporter/otelexporter/otelexporter.go | 26 +++++------- .../otelexporter/otelexporter_test.go | 24 ++++++----- .../exporter/prometheusexporter/prometheus.go | 40 ++++++------------- .../processor/aggregateprocessor/processor.go | 9 ++++- .../k8sprocessor/kubernetes_processor.go | 9 ++++- collector/pkg/model/constnames/const.go | 8 ++-- 11 files changed, 113 insertions(+), 80 deletions(-) diff --git a/collector/pkg/aggregator/defaultaggregator/default_aggregator_test.go b/collector/pkg/aggregator/defaultaggregator/default_aggregator_test.go index 25a0745ad..4ba6f3dfd 100644 --- a/collector/pkg/aggregator/defaultaggregator/default_aggregator_test.go +++ b/collector/pkg/aggregator/defaultaggregator/default_aggregator_test.go @@ -34,7 +34,7 @@ func TestConcurrentAggregator(t *testing.T) { go func() { for i := 0; i < runLoop; i++ { metricValues := []*model.Metric{ - {Name: "duration", Data: &model.Metric_Int{Int: &model.Int{Value: duration}}}, + {Name: "duration", Data: &model.Int{Value: duration}}, } dataGroup := model.NewDataGroup("testMetric", labels, 0, metricValues...) aggregatorInstance.Aggregate(dataGroup, labelSelectors) diff --git a/collector/pkg/aggregator/defaultaggregator/value_recorder.go b/collector/pkg/aggregator/defaultaggregator/value_recorder.go index e51d286ae..6f1083e7a 100644 --- a/collector/pkg/aggregator/defaultaggregator/value_recorder.go +++ b/collector/pkg/aggregator/defaultaggregator/value_recorder.go @@ -22,7 +22,9 @@ func newValueRecorder(recorderName string, aggKindMap map[string][]KindConfig) * } } -// Record is thread-safe, and return the result value +// Record is thread-safe, and return the result value. +// A recorder can record only the metrics that are same as the initial ones when using a same key. +// But it allows to record different metrics with different keys. func (r *valueRecorder) Record(key *aggregator.LabelKeys, metricValues []*model.Metric, timestamp uint64) { if key == nil { return diff --git a/collector/pkg/aggregator/defaultaggregator/value_recorder_test.go b/collector/pkg/aggregator/defaultaggregator/value_recorder_test.go index 9dbbd17dd..e30333bcd 100644 --- a/collector/pkg/aggregator/defaultaggregator/value_recorder_test.go +++ b/collector/pkg/aggregator/defaultaggregator/value_recorder_test.go @@ -4,6 +4,8 @@ import ( "reflect" "testing" + "github.com/stretchr/testify/assert" + "github.com/Kindling-project/kindling/collector/pkg/aggregator" "github.com/Kindling-project/kindling/collector/pkg/model" ) @@ -78,3 +80,38 @@ func TestRecord(t *testing.T) { t.Errorf("expected %+v, got %+v", expectedValue, histogramValue.GetHistogram()) } } + +// TestRecordDiffMetricsWithSameKey validates that a recorder can record only the metrics that are same as the initial ones. +// But it allows to record different metrics with different keys. +func TestRecordDiffMetricsWithSameKey(t *testing.T) { + aggKindMap := AggregatedConfig{KindMap: map[string][]KindConfig{ + "duration": { + {Kind: SumKind, OutputName: "duration_sum"}, + }, + "last": {{Kind: LastKind, OutputName: "last"}}, + }} + recorder := newValueRecorder("testRecorder", aggKindMap.KindMap) + keys := aggregator.NewLabelKeys([]aggregator.LabelKey{ + { + Name: "stringKey", + Value: "stringValue", + VType: aggregator.StringType, + }, + }...) + for i := 0; i < 100; i++ { + metricValues := []*model.Metric{ + model.NewIntMetric("duration", int64(100)), + } + recorder.Record(keys, metricValues, 0) + metricValues = []*model.Metric{ + model.NewIntMetric("last", int64(i)), + } + recorder.Record(keys, metricValues, 0) + } + retMetricGroup := recorder.dump() + sumValue, _ := retMetricGroup[0].GetMetric("duration_sum") + assert.Equal(t, int64(10000), sumValue.GetInt().Value) + // last is not aggregated because it is not the initial one using the current key + _, ok := retMetricGroup[0].GetMetric("last") + assert.Equal(t, false, ok) +} diff --git a/collector/pkg/component/analyzer/tcpmetricanalyzer/tcp_analyzer.go b/collector/pkg/component/analyzer/tcpmetricanalyzer/tcp_analyzer.go index 3581d0d74..5df86b70d 100644 --- a/collector/pkg/component/analyzer/tcpmetricanalyzer/tcp_analyzer.go +++ b/collector/pkg/component/analyzer/tcpmetricanalyzer/tcp_analyzer.go @@ -3,6 +3,10 @@ package tcpmetricanalyzer import ( "fmt" + "github.com/hashicorp/go-multierror" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "github.com/Kindling-project/kindling/collector/pkg/component" "github.com/Kindling-project/kindling/collector/pkg/component/analyzer" "github.com/Kindling-project/kindling/collector/pkg/component/consumer" @@ -10,9 +14,6 @@ import ( "github.com/Kindling-project/kindling/collector/pkg/model" "github.com/Kindling-project/kindling/collector/pkg/model/constlabels" "github.com/Kindling-project/kindling/collector/pkg/model/constnames" - "github.com/hashicorp/go-multierror" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" ) const ( @@ -25,7 +26,7 @@ type TcpMetricAnalyzer struct { telemetry *component.TelemetryTools } -func NewTcpMetricAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, nextConsumers []consumer.Consumer) analyzer.Analyzer { +func NewTcpMetricAnalyzer(_ interface{}, telemetry *component.TelemetryTools, nextConsumers []consumer.Consumer) analyzer.Analyzer { retAnalyzer := &TcpMetricAnalyzer{ consumers: nextConsumers, telemetry: telemetry, @@ -101,7 +102,7 @@ func (a *TcpMetricAnalyzer) generateRtt(event *model.KindlingEvent) (*model.Data return nil, nil } metric := model.NewIntMetric(constnames.TcpRttMetricName, int64(rtt)) - return model.NewDataGroup(constnames.TcpMetricGroupName, labels, event.Timestamp, metric), nil + return model.NewDataGroup(constnames.TcpRttMetricGroupName, labels, event.Timestamp, metric), nil } func (a *TcpMetricAnalyzer) generateRetransmit(event *model.KindlingEvent) (*model.DataGroup, error) { @@ -110,7 +111,7 @@ func (a *TcpMetricAnalyzer) generateRetransmit(event *model.KindlingEvent) (*mod return nil, err } metric := model.NewIntMetric(constnames.TcpRetransmitMetricName, 1) - return model.NewDataGroup(constnames.TcpMetricGroupName, labels, event.Timestamp, metric), nil + return model.NewDataGroup(constnames.TcpRetransmitMetricGroupName, labels, event.Timestamp, metric), nil } func (a *TcpMetricAnalyzer) generateDrop(event *model.KindlingEvent) (*model.DataGroup, error) { @@ -119,7 +120,7 @@ func (a *TcpMetricAnalyzer) generateDrop(event *model.KindlingEvent) (*model.Dat return nil, err } metric := model.NewIntMetric(constnames.TcpDropMetricName, 1) - return model.NewDataGroup(constnames.TcpMetricGroupName, labels, event.Timestamp, metric), nil + return model.NewDataGroup(constnames.TcpDropMetricGroupName, labels, event.Timestamp, metric), nil } func (a *TcpMetricAnalyzer) getTupleLabels(event *model.KindlingEvent) (*model.AttributeMap, error) { diff --git a/collector/pkg/component/consumer/exporter/otelexporter/instrument_test.go b/collector/pkg/component/consumer/exporter/otelexporter/instrument_test.go index 580394ce4..b823ef2e2 100644 --- a/collector/pkg/component/consumer/exporter/otelexporter/instrument_test.go +++ b/collector/pkg/component/consumer/exporter/otelexporter/instrument_test.go @@ -6,16 +6,17 @@ import ( "testing" "time" - "github.com/Kindling-project/kindling/collector/pkg/component" - "github.com/Kindling-project/kindling/collector/pkg/component/consumer/exporter/tools/adapter" - "github.com/Kindling-project/kindling/collector/pkg/model" - "github.com/Kindling-project/kindling/collector/pkg/model/constlabels" - "github.com/Kindling-project/kindling/collector/pkg/model/constnames" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" otelprocessor "go.opentelemetry.io/otel/sdk/metric/processor/basic" "go.opentelemetry.io/otel/sdk/metric/selector/simple" + + "github.com/Kindling-project/kindling/collector/pkg/component" + "github.com/Kindling-project/kindling/collector/pkg/component/consumer/exporter/tools/adapter" + "github.com/Kindling-project/kindling/collector/pkg/model" + "github.com/Kindling-project/kindling/collector/pkg/model/constlabels" + "github.com/Kindling-project/kindling/collector/pkg/model/constnames" ) func Test_instrumentFactory_recordLastValue(t *testing.T) { @@ -61,7 +62,7 @@ func Test_instrumentFactory_recordLastValue(t *testing.T) { controller.WithResource(nil), ) - cont.Start(context.Background()) + _ = cont.Start(context.Background()) ins := newInstrumentFactory(cont.Meter("test"), component.NewDefaultTelemetryTools(), nil) @@ -77,7 +78,7 @@ func Test_instrumentFactory_recordLastValue(t *testing.T) { func makeTcpGroup(rttLatency int64) *model.DataGroup { return model.NewDataGroup( - constnames.TcpMetricGroupName, + constnames.TcpRttMetricGroupName, model.NewAttributeMapWithValues( map[string]model.AttributeValue{ constlabels.SrcIp: model.NewStringValue("src-ip"), @@ -170,8 +171,8 @@ func Test_instrumentFactory_recordTraceAsMetric(t *testing.T) { t1 = lastTraceAsMetric[i] // value check - if metric, ok := t1.GetMetric(constnames.TraceAsMetric); ok { - if metric.GetInt().Value != randTime { + if m, ok := t1.GetMetric(constnames.TraceAsMetric); ok { + if m.GetInt().Value != randTime { t.Errorf("Value check failed") } } else { diff --git a/collector/pkg/component/consumer/exporter/otelexporter/otelexporter.go b/collector/pkg/component/consumer/exporter/otelexporter/otelexporter.go index 2a3e7e643..8f2200480 100644 --- a/collector/pkg/component/consumer/exporter/otelexporter/otelexporter.go +++ b/collector/pkg/component/consumer/exporter/otelexporter/otelexporter.go @@ -7,10 +7,6 @@ import ( "os" "time" - "github.com/Kindling-project/kindling/collector/pkg/component" - "github.com/Kindling-project/kindling/collector/pkg/component/consumer/exporter" - "github.com/Kindling-project/kindling/collector/pkg/component/consumer/exporter/tools/adapter" - "github.com/Kindling-project/kindling/collector/pkg/model/constnames" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" @@ -29,6 +25,11 @@ import ( semconv "go.opentelemetry.io/otel/semconv/v1.7.0" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + + "github.com/Kindling-project/kindling/collector/pkg/component" + "github.com/Kindling-project/kindling/collector/pkg/component/consumer/exporter" + "github.com/Kindling-project/kindling/collector/pkg/component/consumer/exporter/tools/adapter" + "github.com/Kindling-project/kindling/collector/pkg/model/constnames" ) const ( @@ -44,17 +45,6 @@ const ( var serviceName string -type labelKey struct { - metric string - srcIp string - dstIp string - dstPort int64 - requestContent string - responseContent string - statusCode string - protocol string -} - type OtelOutputExporters struct { metricExporter exportmetric.Exporter traceExporter sdktrace.SpanExporter @@ -151,7 +141,8 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export StorePodDetail: cfg.AdapterConfig.NeedPodDetail, StoreExternalSrcIP: cfg.AdapterConfig.StoreExternalSrcIP, }), - adapter.NewSimpleAdapter([]string{constnames.TcpMetricGroupName, constnames.TcpConnectMetricGroupName}, customLabels), + adapter.NewSimpleAdapter([]string{constnames.TcpRttMetricGroupName, constnames.TcpRetransmitMetricGroupName, + constnames.TcpDropMetricGroupName, constnames.TcpConnectMetricGroupName}, customLabels), }, } go func() { @@ -218,7 +209,8 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export StorePodDetail: cfg.AdapterConfig.NeedPodDetail, StoreExternalSrcIP: cfg.AdapterConfig.StoreExternalSrcIP, }), - adapter.NewSimpleAdapter([]string{constnames.TcpMetricGroupName, constnames.TcpConnectMetricGroupName}, customLabels), + adapter.NewSimpleAdapter([]string{constnames.TcpRttMetricGroupName, constnames.TcpRetransmitMetricGroupName, + constnames.TcpDropMetricGroupName, constnames.TcpConnectMetricGroupName}, customLabels), }, } diff --git a/collector/pkg/component/consumer/exporter/otelexporter/otelexporter_test.go b/collector/pkg/component/consumer/exporter/otelexporter/otelexporter_test.go index f0eed5c33..3d5c315cf 100644 --- a/collector/pkg/component/consumer/exporter/otelexporter/otelexporter_test.go +++ b/collector/pkg/component/consumer/exporter/otelexporter/otelexporter_test.go @@ -7,6 +7,13 @@ import ( "testing" "time" + "github.com/spf13/viper" + "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" + controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" + otelprocessor "go.opentelemetry.io/otel/sdk/metric/processor/basic" + "go.opentelemetry.io/otel/sdk/metric/selector/simple" + "go.uber.org/zap" + "github.com/Kindling-project/kindling/collector/pkg/component" "github.com/Kindling-project/kindling/collector/pkg/component/consumer/exporter" "github.com/Kindling-project/kindling/collector/pkg/component/consumer/exporter/tools/adapter" @@ -14,12 +21,6 @@ import ( "github.com/Kindling-project/kindling/collector/pkg/model/constlabels" "github.com/Kindling-project/kindling/collector/pkg/model/constnames" "github.com/Kindling-project/kindling/collector/pkg/model/constvalues" - "github.com/spf13/viper" - "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" - controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" - otelprocessor "go.opentelemetry.io/otel/sdk/metric/processor/basic" - "go.opentelemetry.io/otel/sdk/metric/selector/simple" - "go.uber.org/zap" ) func InitOtelExporter(t *testing.T) exporter.Exporter { @@ -171,13 +172,13 @@ func BenchmarkOtelExporter_Consume(b *testing.B) { } telemetry := component.NewDefaultTelemetryTools() - exporter, _ := newExporters(context.Background(), cfg, telemetry) + myExporter, _ := newExporters(context.Background(), cfg, telemetry) cont := controller.New( otelprocessor.NewFactory(simple.NewWithHistogramDistribution( histogram.WithExplicitBoundaries(exponentialInt64NanosecondsBoundaries), - ), exporter.metricExporter), - controller.WithExporter(exporter.metricExporter), + ), myExporter.metricExporter), + controller.WithExporter(myExporter.metricExporter), controller.WithCollectPeriod(cfg.StdoutCfg.CollectPeriod), controller.WithResource(nil), ) @@ -198,7 +199,8 @@ func BenchmarkOtelExporter_Consume(b *testing.B) { StorePodDetail: cfg.AdapterConfig.NeedPodDetail, StoreExternalSrcIP: cfg.AdapterConfig.StoreExternalSrcIP, }), - adapter.NewSimpleAdapter([]string{constnames.TcpMetricGroupName}, nil), + adapter.NewSimpleAdapter([]string{constnames.TcpRttMetricGroupName, constnames.TcpRetransmitMetricGroupName, + constnames.TcpDropMetricGroupName}, nil), }, } @@ -254,7 +256,7 @@ func BenchmarkOtelExporter_Consume(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - otelexporter.Consume(metricsGroupsSlice[recordCounter%dimension]) + _ = otelexporter.Consume(metricsGroupsSlice[recordCounter%dimension]) recordCounter++ } diff --git a/collector/pkg/component/consumer/exporter/prometheusexporter/prometheus.go b/collector/pkg/component/consumer/exporter/prometheusexporter/prometheus.go index f03456a2e..ff5e4707b 100644 --- a/collector/pkg/component/consumer/exporter/prometheusexporter/prometheus.go +++ b/collector/pkg/component/consumer/exporter/prometheusexporter/prometheus.go @@ -5,24 +5,19 @@ import ( "net" "net/http" - "github.com/Kindling-project/kindling/collector/pkg/component" - "github.com/Kindling-project/kindling/collector/pkg/component/consumer/exporter" - adapter3 "github.com/Kindling-project/kindling/collector/pkg/component/consumer/exporter/tools/adapter" - "github.com/Kindling-project/kindling/collector/pkg/model/constnames" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "go.opentelemetry.io/otel/attribute" "go.uber.org/zap" "golang.org/x/net/context" -) - -const Type = "prometheus" -const ( - Int64BoundaryMultiplier = 1e6 + "github.com/Kindling-project/kindling/collector/pkg/component" + "github.com/Kindling-project/kindling/collector/pkg/component/consumer/exporter" + adapter3 "github.com/Kindling-project/kindling/collector/pkg/component/consumer/exporter/tools/adapter" + "github.com/Kindling-project/kindling/collector/pkg/model/constnames" ) -var serviceName string +const Type = "prometheus" type prometheusExporter struct { cfg *Config @@ -45,7 +40,7 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export collector := newCollector(cfg, telemetry.Logger) registry := prometheus.NewRegistry() - registry.Register(collector) + _ = registry.Register(collector) netAdapter := adapter3.NewNetAdapter(nil, &adapter3.NetAdapterConfig{ StoreTraceAsMetric: cfg.AdapterConfig.NeedTraceAsMetric, @@ -53,7 +48,8 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export StorePodDetail: cfg.AdapterConfig.NeedPodDetail, StoreExternalSrcIP: cfg.AdapterConfig.StoreExternalSrcIP, }) - simpleAdapter := adapter3.NewSimpleAdapter([]string{constnames.TcpMetricGroupName}, nil) + simpleAdapter := adapter3.NewSimpleAdapter([]string{constnames.TcpRttMetricGroupName, constnames.TcpRetransmitMetricGroupName, + constnames.TcpDropMetricGroupName}, nil) prometheusExporter := &prometheusExporter{ cfg: cfg, @@ -62,7 +58,7 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export registry, promhttp.HandlerOpts{ ErrorHandling: promhttp.ContinueOnError, - ErrorLog: &promLogger{realLog: telemetry.Logger}, + ErrorLog: newPromLogger(telemetry.Logger), }, ), // metricAggregationMap: cfg.MetricAggregationMap, @@ -71,13 +67,15 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export constnames.NetRequestMetricGroupName: {netAdapter}, constnames.AggregatedNetRequestMetricGroup: {netAdapter}, constnames.SingleNetRequestMetricGroup: {netAdapter}, - constnames.TcpMetricGroupName: {simpleAdapter}, + constnames.TcpRttMetricGroupName: {simpleAdapter}, + constnames.TcpRetransmitMetricGroupName: {simpleAdapter}, + constnames.TcpDropMetricGroupName: {simpleAdapter}, }, adapter: simpleAdapter, } go func() { - prometheusExporter.Start(context.Background()) + _ = prometheusExporter.Start(context.Background()) }() return prometheusExporter } @@ -87,18 +85,6 @@ func (p *prometheusExporter) findInstrumentKind(metricName string) (MetricAggreg return kind, find } -var exponentialInt64Boundaries = []float64{10, 25, 50, 80, 130, 200, 300, - 400, 500, 700, 1000, 2000, 5000, 30000} - -// exponentialInt64NanoSecondsBoundaries applies a multiplier to the exponential -// Int64Boundaries: [ 5M, 10M, 20M, 40M, ...] -var exponentialInt64NanosecondsBoundaries = func(bounds []float64) (asint []float64) { - for _, f := range bounds { - asint = append(asint, Int64BoundaryMultiplier*f) - } - return -}(exponentialInt64Boundaries) - func (p *prometheusExporter) Start(_ context.Context) error { ln, err := net.Listen("tcp", p.cfg.PromCfg.Endpoint) if err != nil { diff --git a/collector/pkg/component/consumer/processor/aggregateprocessor/processor.go b/collector/pkg/component/consumer/processor/aggregateprocessor/processor.go index c79d2edaf..349fbb32c 100644 --- a/collector/pkg/component/consumer/processor/aggregateprocessor/processor.go +++ b/collector/pkg/component/consumer/processor/aggregateprocessor/processor.go @@ -4,6 +4,8 @@ import ( "math/rand" "time" + "go.uber.org/zap" + "github.com/Kindling-project/kindling/collector/pkg/aggregator" "github.com/Kindling-project/kindling/collector/pkg/aggregator/defaultaggregator" "github.com/Kindling-project/kindling/collector/pkg/component" @@ -13,7 +15,6 @@ import ( "github.com/Kindling-project/kindling/collector/pkg/model" "github.com/Kindling-project/kindling/collector/pkg/model/constlabels" "github.com/Kindling-project/kindling/collector/pkg/model/constnames" - "go.uber.org/zap" ) const Type = "aggregateprocessor" @@ -124,7 +125,11 @@ func (p *AggregateProcessor) Consume(dataGroup *model.DataGroup) error { dataGroup.Name = constnames.AggregatedNetRequestMetricGroup p.aggregator.Aggregate(dataGroup, p.netRequestLabelSelectors) return abnormalDataErr - case constnames.TcpMetricGroupName: + case constnames.TcpRttMetricGroupName: + fallthrough + case constnames.TcpRetransmitMetricGroupName: + fallthrough + case constnames.TcpDropMetricGroupName: p.aggregator.Aggregate(dataGroup, p.tcpLabelSelectors) return nil case constnames.TcpConnectMetricGroupName: diff --git a/collector/pkg/component/consumer/processor/k8sprocessor/kubernetes_processor.go b/collector/pkg/component/consumer/processor/k8sprocessor/kubernetes_processor.go index 957ce1c2b..7460dd7e7 100644 --- a/collector/pkg/component/consumer/processor/k8sprocessor/kubernetes_processor.go +++ b/collector/pkg/component/consumer/processor/k8sprocessor/kubernetes_processor.go @@ -3,6 +3,8 @@ package k8sprocessor import ( "strconv" + "go.uber.org/zap" + "github.com/Kindling-project/kindling/collector/pkg/component" "github.com/Kindling-project/kindling/collector/pkg/component/consumer" "github.com/Kindling-project/kindling/collector/pkg/component/consumer/processor" @@ -10,7 +12,6 @@ import ( "github.com/Kindling-project/kindling/collector/pkg/model" "github.com/Kindling-project/kindling/collector/pkg/model/constlabels" "github.com/Kindling-project/kindling/collector/pkg/model/constnames" - "go.uber.org/zap" ) const ( @@ -76,7 +77,11 @@ func (p *K8sMetadataProcessor) Consume(dataGroup *model.DataGroup) error { switch name { case constnames.NetRequestMetricGroupName: p.processNetRequestMetric(dataGroup) - case constnames.TcpMetricGroupName: + case constnames.TcpRttMetricGroupName: + fallthrough + case constnames.TcpRetransmitMetricGroupName: + fallthrough + case constnames.TcpDropMetricGroupName: p.processTcpMetric(dataGroup) default: p.processNetRequestMetric(dataGroup) diff --git a/collector/pkg/model/constnames/const.go b/collector/pkg/model/constnames/const.go index eb990febb..2b7396f6b 100644 --- a/collector/pkg/model/constnames/const.go +++ b/collector/pkg/model/constnames/const.go @@ -35,7 +35,9 @@ const ( CameraEventGroupName = "camera_event_group" - TcpMetricGroupName = "tcp_metric_metric_group" - NodeMetricGroupName = "node_metric_metric_group" - TcpConnectMetricGroupName = "tcp_connect_metric_group" + TcpRttMetricGroupName = "tcp_rtt_metric_group" + TcpRetransmitMetricGroupName = "tcp_retransmit_metric_group" + TcpDropMetricGroupName = "tcp_drop_metric_group" + NodeMetricGroupName = "node_metric_metric_group" + TcpConnectMetricGroupName = "tcp_connect_metric_group" ) From 2ffead22ee1fd72b8a8e4cf702745823827dc158 Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Fri, 10 Feb 2023 11:15:28 +0800 Subject: [PATCH 2/2] Update the CHANGELOG.md Signed-off-by: Daxin Wang --- CHANGELOG.md | 1 + collector/pkg/aggregator/defaultaggregator/value_recorder.go | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 83b9a1b0d..dd0039b93 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ - Support to identify the MySQL protocol with statements `commit` and `set`. ([#417](https://github.com/KindlingProject/kindling/pull/417)) ### Bug fixes +- Fix the bug that TCP metrics are not aggregated correctly. ([#444](https://github.com/KindlingProject/kindling/pull/444)) - Fix the bug that cpuanalyzer missed some trigger events due to the incorrect variable reference. This may cause some traces can't correlate with on/off CPU data. ([#424](https://github.com/KindlingProject/kindling/pull/424)) ## v0.6.0 - 2022-12-21 diff --git a/collector/pkg/aggregator/defaultaggregator/value_recorder.go b/collector/pkg/aggregator/defaultaggregator/value_recorder.go index 6f1083e7a..19eef4400 100644 --- a/collector/pkg/aggregator/defaultaggregator/value_recorder.go +++ b/collector/pkg/aggregator/defaultaggregator/value_recorder.go @@ -23,8 +23,8 @@ func newValueRecorder(recorderName string, aggKindMap map[string][]KindConfig) * } // Record is thread-safe, and return the result value. -// A recorder can record only the metrics that are same as the initial ones when using a same key. -// But it allows to record different metrics with different keys. +// A recorder can record only the metrics that are the same as the initial ones when using the same key. +// But it can record different metrics with different keys. func (r *valueRecorder) Record(key *aggregator.LabelKeys, metricValues []*model.Metric, timestamp uint64) { if key == nil { return