Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX http-100 request is detected as NOSUPPORT #393

Merged
merged 4 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@


### Bug fixes
- Fix http-100 request is detected as NOSUPPORT([393](https://github.com/KindlingProject/kindling/pull/393))
- Fix the wrong thread name in the trace profiling function. ([#385])(https://github.com/KindlingProject/kindling/pull/385)
- Remove "reset" method of ScheduledTaskRoutine to fix a potential dead-lock issue. ([#369])(https://github.com/KindlingProject/kindling/pull/369)
- Fix the bug where the pod metadata with persistent IP in the map is deleted incorrectly due to the deleting mechanism with a delay. ([#374](https://github.com/KindlingProject/kindling/pull/374))
Expand Down
15 changes: 10 additions & 5 deletions collector/pkg/component/analyzer/network/gauge_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,24 @@ func createDataGroup() interface{} {
return dataGroup
}

type DataGroupPool struct {
type DataGroupPool interface {
Get() *model.DataGroup
Free(dataGroup *model.DataGroup)
}

type GaugeDataGroupPool struct {
dxsup marked this conversation as resolved.
Show resolved Hide resolved
pool *sync.Pool
}

func NewDataGroupPool() *DataGroupPool {
return &DataGroupPool{pool: &sync.Pool{New: createDataGroup}}
func NewDataGroupPool() DataGroupPool {
return &GaugeDataGroupPool{pool: &sync.Pool{New: createDataGroup}}
}

func (p *DataGroupPool) Get() *model.DataGroup {
func (p *GaugeDataGroupPool) Get() *model.DataGroup {
return p.pool.Get().(*model.DataGroup)
}

func (p *DataGroupPool) Free(dataGroup *model.DataGroup) {
func (p *GaugeDataGroupPool) Free(dataGroup *model.DataGroup) {
dataGroup.Reset()
dataGroup.Name = constnames.NetRequestMetricGroupName
p.pool.Put(dataGroup)
Expand Down
17 changes: 17 additions & 0 deletions collector/pkg/component/analyzer/network/message_pair.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ func (evts *events) getEvent(index int) *model.KindlingEvent {
return nil
}

func (evts *events) putEventBack(originEvts *events) {
newEvt := evts.event
evts.event = originEvts.event
evts.mergable = originEvts.mergable
evts.mergeEvent(newEvt)
}

func (evts *events) mergeEvent(evt *model.KindlingEvent) {
if evts.mergable == nil {
firstEvt := evts.event
Expand Down Expand Up @@ -166,6 +173,16 @@ func (mps *messagePairs) mergeConnect(evt *model.KindlingEvent) {
mps.mutex.Unlock()
}

func (mps *messagePairs) putRequestBack(evts *events) {
mps.mutex.Lock()
if mps.requests == nil {
mps.requests = evts
} else {
mps.requests.putEventBack(evts)
}
mps.mutex.Unlock()
}

func (mps *messagePairs) mergeRequest(evt *model.KindlingEvent) {
mps.mutex.Lock()
if mps.requests == nil {
Expand Down
10 changes: 9 additions & 1 deletion collector/pkg/component/analyzer/network/network_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type NetworkAnalyzer struct {
parserFactory *factory.ParserFactory
parsers []*protocol.ProtocolParser

dataGroupPool *DataGroupPool
dataGroupPool DataGroupPool
requestMonitor sync.Map
tcpMessagePairSize int64
udpMessagePairSize int64
Expand Down Expand Up @@ -525,6 +525,14 @@ func (na *NetworkAnalyzer) getConnectFailRecords(mps *messagePairs) []*model.Dat

func (na *NetworkAnalyzer) getRecords(mps *messagePairs, protocol string, attributes *model.AttributeMap) []*model.DataGroup {
evt := mps.requests.event
if attributes.HasAttribute(constlabels.HttpContinue) {
dxsup marked this conversation as resolved.
Show resolved Hide resolved
pairInterface, ok := na.requestMonitor.Load(getMessagePairKey(evt))
if ok {
var oldPairs = pairInterface.(*messagePairs)
oldPairs.putRequestBack(mps.requests)
}
return []*model.DataGroup{}
}

slow := false
if mps.responses != nil {
Expand Down
67 changes: 55 additions & 12 deletions collector/pkg/component/analyzer/network/network_analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/hex"
"fmt"
"reflect"
"sort"
"sync"
"testing"

Expand All @@ -21,7 +22,9 @@ func TestHttpProtocol(t *testing.T) {
"http/server-trace-slow.yml",
"http/server-trace-error.yml",
"http/server-trace-split.yml",
"http/server-trace-normal.yml")
"http/server-trace-normal.yml",
"http/server-trace-continue.yml",
)
}

func TestMySqlProtocol(t *testing.T) {
Expand All @@ -45,9 +48,7 @@ func TestKafkaProtocol(t *testing.T) {
"kafka/provider-trace-produce-split.yml")

testProtocol(t, "kafka/consumer-event.yml",
"kafka/consumer-trace-fetch-split.yml")

testProtocol(t, "kafka/consumer-event.yml",
"kafka/consumer-trace-fetch-split.yml",
"kafka/consumer-trace-fetch-multi-topics.yml")
}

Expand All @@ -58,10 +59,8 @@ func TestDubboProtocol(t *testing.T) {

func TestRocketMQProtocol(t *testing.T) {
testProtocol(t, "rocketmq/server-event.yml",
"rocketmq/server-trace-json.yml")
testProtocol(t, "rocketmq/server-event.yml",
"rocketmq/server-trace-rocketmq.yml")
testProtocol(t, "rocketmq/server-event.yml",
"rocketmq/server-trace-json.yml",
"rocketmq/server-trace-rocketmq.yml",
"rocketmq/server-trace-error.yml")
}

Expand All @@ -70,10 +69,23 @@ type NopProcessor struct {

func (n NopProcessor) Consume(dataGroup *model.DataGroup) error {
// fmt.Printf("Consume %v\n", dataGroup)
results = append(results, dataGroup)
return nil
}

type NoCacheDataGroupPool struct {
}

func (p *NoCacheDataGroupPool) Get() *model.DataGroup {
dataGroup := createDataGroup()
return dataGroup.(*model.DataGroup)
}

func (p *NoCacheDataGroupPool) Free(dataGroup *model.DataGroup) {
}

var na *NetworkAnalyzer
var results []*model.DataGroup

func prepareNetworkAnalyzer() *NetworkAnalyzer {
if na == nil {
Expand All @@ -89,7 +101,7 @@ func prepareNetworkAnalyzer() *NetworkAnalyzer {

na = &NetworkAnalyzer{
cfg: config,
dataGroupPool: NewDataGroupPool(),
dataGroupPool: &NoCacheDataGroupPool{},
nextConsumers: []consumer.Consumer{&NopProcessor{}},
telemetry: component.NewDefaultTelemetryTools(),
}
Expand Down Expand Up @@ -130,9 +142,16 @@ func testProtocol(t *testing.T, eventYaml string, traceYamls ...string) {
}

t.Run(trace.Key, func(t *testing.T) {
mps := trace.PrepareMessagePairs(eventCommon)
result := na.parseProtocols(mps)
trace.Validate(t, result)
results = []*model.DataGroup{}
events := trace.getSortedEvents(eventCommon)
for _, event := range events {
na.ConsumeEvent(event)
}
if pairInterface, ok := na.requestMonitor.Load(getMessagePairKey(events[0])); ok {
var oldPairs = pairInterface.(*messagePairs)
na.distributeTraceMetric(oldPairs, nil)
}
trace.Validate(t, results)
})
}
}
Expand Down Expand Up @@ -298,6 +317,30 @@ func checkSize(t *testing.T, key string, expect int, got int) {
}
}

func (trace *Trace) getSortedEvents(common *EventCommon) []*model.KindlingEvent {
events := []*model.KindlingEvent{}
if trace.Connects != nil {
for _, connect := range trace.Connects {
events = append(events, connect.exchange(common))
}
}
if trace.Requests != nil {
for _, request := range trace.Requests {
events = append(events, request.exchange(common))
}
}
if trace.Responses != nil {
for _, response := range trace.Responses {
events = append(events, response.exchange(common))
}
}
// Sort By Event Timestamp.
sort.SliceStable(events, func(i, j int) bool {
return events[i].Timestamp < events[j].Timestamp
})
return events
}

type TraceEvent struct {
Name string `mapstructure:"name"`
Timestamp uint64 `mapstructure:"timestamp"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ func parseHttpResponse() protocol.ParsePkgFn {
statusCodeI = 0
}

if statusCodeI == 100 {
// Add http_continue for merge next request.
dxsup marked this conversation as resolved.
Show resolved Hide resolved
message.AddBoolAttribute(constlabels.HttpContinue, true)
}

if !message.HasAttribute(constlabels.HttpApmTraceType) {
headers := parseHeaders(message)
traceType, traceId := tools.ParseTraceHeader(headers)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ trace:
key: shortData
requests:
-
name: "read"
name: "write"
dxsup marked this conversation as resolved.
Show resolved Hide resolved
timestamp: 100000000
user_attributes:
latency: 5000
Expand All @@ -16,7 +16,7 @@ trace:
- "3022|Ljava/l"
responses:
-
name: "write"
name: "read"
timestamp: 101000000
user_attributes:
latency: 40000
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
trace:
# 0--100---104--108--------601
# READ WRITE(100)
# READ
# READ
# READ WRITE(200)
# WRITE
key: continueData
requests:
# Request Head
-
name: "read"
timestamp: 100000000
user_attributes:
latency: 1000
res: 174
data:
- "POST /io/bigBody?sleep=1 HTTP/1.1\r\n"
- "User-Agent: curl/7.29.0\r\n"
- "Host: 10.0.2.4:19999\r\n"
- "accept: */*\r\n"
- "Content-Type: application/json\r\n"
- "Content-Length: 16082\r\n"
- "Expect: 100-continue\r\n"
- "\r\n"
# Request Body[10220]
-
name: "read"
timestamp: 108000000
user_attributes:
latency: 5000
res: 10220
data:
- "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
- "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
# Request Body[5840]
-
name: "read"
timestamp: 108100000
user_attributes:
latency: 3000
res: 5840
data:
- "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
- "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
# Request Body[22]
-
name: "read"
timestamp: 108150000
user_attributes:
latency: 500
res: 22
data:
- "bbbbbbbbbbbbbbbbbbbbbb"
responses:
# Response Continue(100)
-
name: "write"
timestamp: 104000000
user_attributes:
latency: 1000
res: 44
data:
- "HTTP/1.1 100 Continue\r\n"
- "Content-Length: 0\r\n"
- "\r\n"
# Response Body[199]
-
name: "write"
timestamp: 601000000
user_attributes:
latency: 2000
res: 199
data:
- "HTTP/1.1 200 OK\r\n"
- "Connection: keep-alive\r\n"
- "Transfer-Encoding: chunked\r\n"
- "Content-Type: application/json\r\n"
- "Date: Mon, 12 Dec 2022 09:18:27 GMT\r\n"
- "\r\n"
- "35\r\n"
- '{"success":true,"data":"sleep=1, body size is 16082"}\r\n'
# Response Body[5]
-
name: "write"
timestamp: 601100000
user_attributes:
latency: 300
res: 5
data:
- "0\r\n"
- "\r\n"
expects:
-
Timestamp: 99999000
Values:
request_total_time: 501101000
connect_time: 0
request_sent_time: 8151000
waiting_ttfb_time: 492848000
content_download_time: 102000
request_io: 16256
response_io: 204
Labels:
comm: testdemo
pid: 12345
request_tid: 12346
response_tid: 12346
src_ip: "127.0.0.1"
src_port: 56266
dst_ip: "127.0.0.1"
dst_port: 9001
dnat_ip: ""
dnat_port: -1
container_id: ""
is_slow: true
is_server: true
protocol: "http"
is_error: false
error_type: 0
content_key: "/io/bigBody"
http_method: "POST"
http_url: "/io/bigBody?sleep=1"
http_status_code: 200
end_timestamp: 601100000
request_payload: "POST /io/bigBody?sleep=1 HTTP/1.1\r\nUser-Agent: curl/7.29.0\r\nHost: 10.0.2.4:19999"
response_payload: "HTTP/1.1 200 OK\r\nConnection: keep-alive\r\nTransfer-Encoding: chunked\r\nContent-Typ"
1 change: 1 addition & 0 deletions collector/pkg/model/constlabels/protocols.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const (
HttpApmTraceType = "trace_type"
HttpApmTraceId = "trace_id"
HttpStatusCode = "http_status_code"
HttpContinue = "http_continue"

DnsId = "dns_id"
DnsDomain = "dns_domain"
Expand Down