From b4fcf58855286c4dc54ef0243783ad64a22f46dc Mon Sep 17 00:00:00 2001 From: huxiangyuan Date: Tue, 13 Dec 2022 15:03:18 +0800 Subject: [PATCH 1/3] FIX http-100 request is detected as NOSUPPORT Signed-off-by: huxiangyuan --- .../component/analyzer/network/gauge_pool.go | 15 ++- .../analyzer/network/message_pair.go | 17 +++ .../analyzer/network/network_analyzer.go | 10 +- .../analyzer/network/network_analyzer_test.go | 67 +++++++-- .../network/protocol/http/http_response.go | 5 + .../testdata/dubbo/server-trace-short.yml | 4 +- .../testdata/http/server-trace-continue.yml | 127 ++++++++++++++++++ collector/pkg/model/constlabels/protocols.go | 1 + 8 files changed, 226 insertions(+), 20 deletions(-) create mode 100644 collector/pkg/component/analyzer/network/protocol/testdata/http/server-trace-continue.yml diff --git a/collector/pkg/component/analyzer/network/gauge_pool.go b/collector/pkg/component/analyzer/network/gauge_pool.go index 570138f2c..e4491de61 100644 --- a/collector/pkg/component/analyzer/network/gauge_pool.go +++ b/collector/pkg/component/analyzer/network/gauge_pool.go @@ -23,19 +23,24 @@ func createDataGroup() interface{} { return dataGroup } -type DataGroupPool struct { +type DataGroupPool interface { + Get() *model.DataGroup + Free(dataGroup *model.DataGroup) +} + +type GaugeDataGroupPool struct { pool *sync.Pool } -func NewDataGroupPool() *DataGroupPool { - return &DataGroupPool{pool: &sync.Pool{New: createDataGroup}} +func NewDataGroupPool() DataGroupPool { + return &GaugeDataGroupPool{pool: &sync.Pool{New: createDataGroup}} } -func (p *DataGroupPool) Get() *model.DataGroup { +func (p *GaugeDataGroupPool) Get() *model.DataGroup { return p.pool.Get().(*model.DataGroup) } -func (p *DataGroupPool) Free(dataGroup *model.DataGroup) { +func (p *GaugeDataGroupPool) Free(dataGroup *model.DataGroup) { dataGroup.Reset() dataGroup.Name = constnames.NetRequestMetricGroupName p.pool.Put(dataGroup) diff --git a/collector/pkg/component/analyzer/network/message_pair.go b/collector/pkg/component/analyzer/network/message_pair.go index 7daa3a76e..ec479d735 100644 --- a/collector/pkg/component/analyzer/network/message_pair.go +++ b/collector/pkg/component/analyzer/network/message_pair.go @@ -51,6 +51,13 @@ func (evts *events) getEvent(index int) *model.KindlingEvent { return nil } +func (evts *events) putEventBack(originEvts *events) { + newEvt := evts.event + evts.event = originEvts.event + evts.mergable = originEvts.mergable + evts.mergeEvent(newEvt) +} + func (evts *events) mergeEvent(evt *model.KindlingEvent) { if evts.mergable == nil { firstEvt := evts.event @@ -166,6 +173,16 @@ func (mps *messagePairs) mergeConnect(evt *model.KindlingEvent) { mps.mutex.Unlock() } +func (mps *messagePairs) putRequestBack(evts *events) { + mps.mutex.Lock() + if mps.requests == nil { + mps.requests = evts + } else { + mps.requests.putEventBack(evts) + } + mps.mutex.Unlock() +} + func (mps *messagePairs) mergeRequest(evt *model.KindlingEvent) { mps.mutex.Lock() if mps.requests == nil { diff --git a/collector/pkg/component/analyzer/network/network_analyzer.go b/collector/pkg/component/analyzer/network/network_analyzer.go index 33d4f9bf7..5331f0739 100644 --- a/collector/pkg/component/analyzer/network/network_analyzer.go +++ b/collector/pkg/component/analyzer/network/network_analyzer.go @@ -43,7 +43,7 @@ type NetworkAnalyzer struct { parserFactory *factory.ParserFactory parsers []*protocol.ProtocolParser - dataGroupPool *DataGroupPool + dataGroupPool DataGroupPool requestMonitor sync.Map tcpMessagePairSize int64 udpMessagePairSize int64 @@ -525,6 +525,14 @@ func (na *NetworkAnalyzer) getConnectFailRecords(mps *messagePairs) []*model.Dat func (na *NetworkAnalyzer) getRecords(mps *messagePairs, protocol string, attributes *model.AttributeMap) []*model.DataGroup { evt := mps.requests.event + if attributes.HasAttribute(constlabels.HttpContinue) { + pairInterface, ok := na.requestMonitor.Load(getMessagePairKey(evt)) + if ok { + var oldPairs = pairInterface.(*messagePairs) + oldPairs.putRequestBack(mps.requests) + } + return []*model.DataGroup{} + } slow := false if mps.responses != nil { diff --git a/collector/pkg/component/analyzer/network/network_analyzer_test.go b/collector/pkg/component/analyzer/network/network_analyzer_test.go index 8151db3e3..ac82ad9c1 100644 --- a/collector/pkg/component/analyzer/network/network_analyzer_test.go +++ b/collector/pkg/component/analyzer/network/network_analyzer_test.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "fmt" "reflect" + "sort" "sync" "testing" @@ -21,7 +22,9 @@ func TestHttpProtocol(t *testing.T) { "http/server-trace-slow.yml", "http/server-trace-error.yml", "http/server-trace-split.yml", - "http/server-trace-normal.yml") + "http/server-trace-normal.yml", + "http/server-trace-continue.yml", + ) } func TestMySqlProtocol(t *testing.T) { @@ -45,9 +48,7 @@ func TestKafkaProtocol(t *testing.T) { "kafka/provider-trace-produce-split.yml") testProtocol(t, "kafka/consumer-event.yml", - "kafka/consumer-trace-fetch-split.yml") - - testProtocol(t, "kafka/consumer-event.yml", + "kafka/consumer-trace-fetch-split.yml", "kafka/consumer-trace-fetch-multi-topics.yml") } @@ -58,10 +59,8 @@ func TestDubboProtocol(t *testing.T) { func TestRocketMQProtocol(t *testing.T) { testProtocol(t, "rocketmq/server-event.yml", - "rocketmq/server-trace-json.yml") - testProtocol(t, "rocketmq/server-event.yml", - "rocketmq/server-trace-rocketmq.yml") - testProtocol(t, "rocketmq/server-event.yml", + "rocketmq/server-trace-json.yml", + "rocketmq/server-trace-rocketmq.yml", "rocketmq/server-trace-error.yml") } @@ -70,10 +69,23 @@ type NopProcessor struct { func (n NopProcessor) Consume(dataGroup *model.DataGroup) error { // fmt.Printf("Consume %v\n", dataGroup) + results = append(results, dataGroup) return nil } +type NoCacheDataGroupPool struct { +} + +func (p *NoCacheDataGroupPool) Get() *model.DataGroup { + dataGroup := createDataGroup() + return dataGroup.(*model.DataGroup) +} + +func (p *NoCacheDataGroupPool) Free(dataGroup *model.DataGroup) { +} + var na *NetworkAnalyzer +var results []*model.DataGroup func prepareNetworkAnalyzer() *NetworkAnalyzer { if na == nil { @@ -89,7 +101,7 @@ func prepareNetworkAnalyzer() *NetworkAnalyzer { na = &NetworkAnalyzer{ cfg: config, - dataGroupPool: NewDataGroupPool(), + dataGroupPool: &NoCacheDataGroupPool{}, nextConsumers: []consumer.Consumer{&NopProcessor{}}, telemetry: component.NewDefaultTelemetryTools(), } @@ -130,9 +142,16 @@ func testProtocol(t *testing.T, eventYaml string, traceYamls ...string) { } t.Run(trace.Key, func(t *testing.T) { - mps := trace.PrepareMessagePairs(eventCommon) - result := na.parseProtocols(mps) - trace.Validate(t, result) + results = []*model.DataGroup{} + events := trace.getSortedEvents(eventCommon) + for _, event := range events { + na.ConsumeEvent(event) + } + if pairInterface, ok := na.requestMonitor.Load(getMessagePairKey(events[0])); ok { + var oldPairs = pairInterface.(*messagePairs) + na.distributeTraceMetric(oldPairs, nil) + } + trace.Validate(t, results) }) } } @@ -298,6 +317,30 @@ func checkSize(t *testing.T, key string, expect int, got int) { } } +func (trace *Trace) getSortedEvents(common *EventCommon) []*model.KindlingEvent { + events := []*model.KindlingEvent{} + if trace.Connects != nil { + for _, connect := range trace.Connects { + events = append(events, connect.exchange(common)) + } + } + if trace.Requests != nil { + for _, request := range trace.Requests { + events = append(events, request.exchange(common)) + } + } + if trace.Responses != nil { + for _, response := range trace.Responses { + events = append(events, response.exchange(common)) + } + } + // Sort By Event Timestamp. + sort.SliceStable(events, func(i, j int) bool { + return events[i].Timestamp < events[j].Timestamp + }) + return events +} + type TraceEvent struct { Name string `mapstructure:"name"` Timestamp uint64 `mapstructure:"timestamp"` diff --git a/collector/pkg/component/analyzer/network/protocol/http/http_response.go b/collector/pkg/component/analyzer/network/protocol/http/http_response.go index 5c45fe370..e0301a32a 100644 --- a/collector/pkg/component/analyzer/network/protocol/http/http_response.go +++ b/collector/pkg/component/analyzer/network/protocol/http/http_response.go @@ -50,6 +50,11 @@ func parseHttpResponse() protocol.ParsePkgFn { statusCodeI = 0 } + if statusCodeI == 100 { + // Add http_continue for merge next request. + message.AddBoolAttribute(constlabels.HttpContinue, true) + } + if !message.HasAttribute(constlabels.HttpApmTraceType) { headers := parseHeaders(message) traceType, traceId := tools.ParseTraceHeader(headers) diff --git a/collector/pkg/component/analyzer/network/protocol/testdata/dubbo/server-trace-short.yml b/collector/pkg/component/analyzer/network/protocol/testdata/dubbo/server-trace-short.yml index 5d4f3672a..3bc742185 100644 --- a/collector/pkg/component/analyzer/network/protocol/testdata/dubbo/server-trace-short.yml +++ b/collector/pkg/component/analyzer/network/protocol/testdata/dubbo/server-trace-short.yml @@ -2,7 +2,7 @@ trace: key: shortData requests: - - name: "read" + name: "write" timestamp: 100000000 user_attributes: latency: 5000 @@ -16,7 +16,7 @@ trace: - "3022|Ljava/l" responses: - - name: "write" + name: "read" timestamp: 101000000 user_attributes: latency: 40000 diff --git a/collector/pkg/component/analyzer/network/protocol/testdata/http/server-trace-continue.yml b/collector/pkg/component/analyzer/network/protocol/testdata/http/server-trace-continue.yml new file mode 100644 index 000000000..3a9001e11 --- /dev/null +++ b/collector/pkg/component/analyzer/network/protocol/testdata/http/server-trace-continue.yml @@ -0,0 +1,127 @@ +trace: + # 0--100---104--108--------601 + # READ WRITE(100) + # READ + # READ + # READ WRITE(200) + # WRITE + key: continueData + requests: + # Request Head + - + name: "read" + timestamp: 100000000 + user_attributes: + latency: 1000 + res: 174 + data: + - "POST /io/bigBody?sleep=1 HTTP/1.1\r\n" + - "User-Agent: curl/7.29.0\r\n" + - "Host: 10.0.2.4:19999\r\n" + - "accept: */*\r\n" + - "Content-Type: application/json\r\n" + - "Content-Length: 16082\r\n" + - "Expect: 100-continue\r\n" + - "\r\n" + # Request Body[10220] + - + name: "read" + timestamp: 108000000 + user_attributes: + latency: 5000 + res: 10220 + data: + - "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + - "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + # Request Body[5840] + - + name: "read" + timestamp: 108100000 + user_attributes: + latency: 3000 + res: 5840 + data: + - "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" + - "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" + # Request Body[22] + - + name: "read" + timestamp: 108150000 + user_attributes: + latency: 500 + res: 22 + data: + - "bbbbbbbbbbbbbbbbbbbbbb" + responses: + # Response Continue(100) + - + name: "write" + timestamp: 104000000 + user_attributes: + latency: 1000 + res: 44 + data: + - "HTTP/1.1 100 Continue\r\n" + - "Content-Length: 0\r\n" + - "\r\n" + # Response Body[199] + - + name: "write" + timestamp: 601000000 + user_attributes: + latency: 2000 + res: 199 + data: + - "HTTP/1.1 200 OK\r\n" + - "Connection: keep-alive\r\n" + - "Transfer-Encoding: chunked\r\n" + - "Content-Type: application/json\r\n" + - "Date: Mon, 12 Dec 2022 09:18:27 GMT\r\n" + - "\r\n" + - "35\r\n" + - '{"success":true,"data":"sleep=1, body size is 16082"}\r\n' + # Response Body[5] + - + name: "write" + timestamp: 601100000 + user_attributes: + latency: 300 + res: 5 + data: + - "0\r\n" + - "\r\n" + expects: + - + Timestamp: 99999000 + Values: + request_total_time: 501101000 + connect_time: 0 + request_sent_time: 8151000 + waiting_ttfb_time: 492848000 + content_download_time: 102000 + request_io: 16256 + response_io: 204 + Labels: + comm: testdemo + pid: 12345 + request_tid: 12346 + response_tid: 12346 + src_ip: "127.0.0.1" + src_port: 56266 + dst_ip: "127.0.0.1" + dst_port: 9001 + dnat_ip: "" + dnat_port: -1 + container_id: "" + is_slow: true + is_server: true + protocol: "http" + is_error: false + error_type: 0 + content_key: "/io/bigBody" + http_method: "POST" + http_url: "/io/bigBody?sleep=1" + http_status_code: 200 + end_timestamp: 601100000 + request_payload: "POST /io/bigBody?sleep=1 HTTP/1.1\r\nUser-Agent: curl/7.29.0\r\nHost: 10.0.2.4:19999" + response_payload: "HTTP/1.1 200 OK\r\nConnection: keep-alive\r\nTransfer-Encoding: chunked\r\nContent-Typ" \ No newline at end of file diff --git a/collector/pkg/model/constlabels/protocols.go b/collector/pkg/model/constlabels/protocols.go index 0936da750..06fb29e4f 100644 --- a/collector/pkg/model/constlabels/protocols.go +++ b/collector/pkg/model/constlabels/protocols.go @@ -10,6 +10,7 @@ const ( HttpApmTraceType = "trace_type" HttpApmTraceId = "trace_id" HttpStatusCode = "http_status_code" + HttpContinue = "http_continue" DnsId = "dns_id" DnsDomain = "dns_domain" From 8b24c50ea57ad9437d7a3c83106c0f18e34792f2 Mon Sep 17 00:00:00 2001 From: huxiangyuan Date: Tue, 13 Dec 2022 15:48:41 +0800 Subject: [PATCH 2/3] Add Change Log Signed-off-by: huxiangyuan --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e4d437a61..b114c5064 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ ### Bug fixes +- Fix http-100 request is detected as NOSUPPORT([393](https://github.com/KindlingProject/kindling/pull/393)) - Fix the wrong thread name in the trace profiling function. ([#385])(https://github.com/KindlingProject/kindling/pull/385) - Remove "reset" method of ScheduledTaskRoutine to fix a potential dead-lock issue. ([#369])(https://github.com/KindlingProject/kindling/pull/369) - Fix the bug where the pod metadata with persistent IP in the map is deleted incorrectly due to the deleting mechanism with a delay. ([#374](https://github.com/KindlingProject/kindling/pull/374)) From 9d01ab5a5b77c982a3f5c170ab2c5a3799d56f29 Mon Sep 17 00:00:00 2001 From: huxiangyuan Date: Tue, 13 Dec 2022 17:17:27 +0800 Subject: [PATCH 3/3] Add description for http_continue Signed-off-by: huxiangyuan --- .../analyzer/network/{gauge_pool.go => datagroup_pool.go} | 8 ++++---- .../pkg/component/analyzer/network/network_analyzer.go | 4 ++-- .../analyzer/network/protocol/http/http_response.go | 3 ++- 3 files changed, 8 insertions(+), 7 deletions(-) rename collector/pkg/component/analyzer/network/{gauge_pool.go => datagroup_pool.go} (83%) diff --git a/collector/pkg/component/analyzer/network/gauge_pool.go b/collector/pkg/component/analyzer/network/datagroup_pool.go similarity index 83% rename from collector/pkg/component/analyzer/network/gauge_pool.go rename to collector/pkg/component/analyzer/network/datagroup_pool.go index e4491de61..d5a7740ba 100644 --- a/collector/pkg/component/analyzer/network/gauge_pool.go +++ b/collector/pkg/component/analyzer/network/datagroup_pool.go @@ -28,19 +28,19 @@ type DataGroupPool interface { Free(dataGroup *model.DataGroup) } -type GaugeDataGroupPool struct { +type SimpleDataGroupPool struct { pool *sync.Pool } func NewDataGroupPool() DataGroupPool { - return &GaugeDataGroupPool{pool: &sync.Pool{New: createDataGroup}} + return &SimpleDataGroupPool{pool: &sync.Pool{New: createDataGroup}} } -func (p *GaugeDataGroupPool) Get() *model.DataGroup { +func (p *SimpleDataGroupPool) Get() *model.DataGroup { return p.pool.Get().(*model.DataGroup) } -func (p *GaugeDataGroupPool) Free(dataGroup *model.DataGroup) { +func (p *SimpleDataGroupPool) Free(dataGroup *model.DataGroup) { dataGroup.Reset() dataGroup.Name = constnames.NetRequestMetricGroupName p.pool.Put(dataGroup) diff --git a/collector/pkg/component/analyzer/network/network_analyzer.go b/collector/pkg/component/analyzer/network/network_analyzer.go index 5331f0739..9fbcc5cdf 100644 --- a/collector/pkg/component/analyzer/network/network_analyzer.go +++ b/collector/pkg/component/analyzer/network/network_analyzer.go @@ -525,9 +525,9 @@ func (na *NetworkAnalyzer) getConnectFailRecords(mps *messagePairs) []*model.Dat func (na *NetworkAnalyzer) getRecords(mps *messagePairs, protocol string, attributes *model.AttributeMap) []*model.DataGroup { evt := mps.requests.event + // See the issue https://github.com/KindlingProject/kindling/issues/388 for details. if attributes.HasAttribute(constlabels.HttpContinue) { - pairInterface, ok := na.requestMonitor.Load(getMessagePairKey(evt)) - if ok { + if pairInterface, ok := na.requestMonitor.Load(getMessagePairKey(evt)); ok { var oldPairs = pairInterface.(*messagePairs) oldPairs.putRequestBack(mps.requests) } diff --git a/collector/pkg/component/analyzer/network/protocol/http/http_response.go b/collector/pkg/component/analyzer/network/protocol/http/http_response.go index e0301a32a..7ff7901d2 100644 --- a/collector/pkg/component/analyzer/network/protocol/http/http_response.go +++ b/collector/pkg/component/analyzer/network/protocol/http/http_response.go @@ -51,7 +51,8 @@ func parseHttpResponse() protocol.ParsePkgFn { } if statusCodeI == 100 { - // Add http_continue for merge next request. + // Add http_continue for merging the subsequent request. + // See the issue https://github.com/KindlingProject/kindling/issues/388 for details. message.AddBoolAttribute(constlabels.HttpContinue, true) }