Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rocketmq Protocol Identification and Analysis #328

Merged
merged 5 commits into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
## Unreleased
### New features
-
-
- Support the protocol RocketMQ.([#328](https://github.com/KindlingProject/kindling/pull/328))
- Add a new tool: A debug tool for Trace Profiling is provided for developers to troubleshoot problems.([#363](https://github.com/CloudDectective-Harmonycloud/kindling/pull/363))


Expand Down
6 changes: 5 additions & 1 deletion collector/docker/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ analyzers:
proc_root: /proc
# The protocol parsers which is enabled
# When dissectors are enabled, agent will analyze the payload and enrich metric/trace with its content.
protocol_parser: [ http, mysql, dns, redis, kafka ]
protocol_parser: [ http, mysql, dns, redis, kafka, rocketmq ]
# Which URL clustering method should be used to shorten the URL of HTTP request.
# This is useful for decrease the cardinality of URLs.
# Valid values: ["noparam", "alphabet"]
Expand Down Expand Up @@ -88,6 +88,10 @@ analyzers:
- key: "dns"
ports: [ 53 ]
slow_threshold: 100
- key: "rocketmq"
ports: [ 9876, 10911 ]
slow_threshold: 500


processors:
k8smetadataprocessor:
Expand Down
2 changes: 1 addition & 1 deletion collector/pkg/aggregator/label_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *LabelSelectors) AppendSelectors(selectors ...LabelSelector) {
s.selectors = append(s.selectors, selectors...)
}

const maxLabelKeySize = 35
const maxLabelKeySize = 37

type LabelKeys struct {
// LabelKeys will be used as key of map, so it is must be an array instead of a slice.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
BENCH_CASE_KAFKA_PRODUCER = "kafka_producer"
BENCH_CASE_KAFKA_FETCHER = "kafka_fetcher"
BENCH_CASE_DUBBO = "dubbo"
BENCH_CASE_ROCKETMQ = "rocketmq"
)

var benchCaseMap = map[string]benchCase{
Expand All @@ -31,6 +32,7 @@ var benchCaseMap = map[string]benchCase{
BENCH_CASE_KAFKA_PRODUCER: {protocol.KAFKA, "kafka/provider-event.yml", "kafka/1k-provider-trace.yml"},
BENCH_CASE_KAFKA_FETCHER: {protocol.KAFKA, "kafka/consumer-event.yml", "kafka/1k-consumer-trace.yml"},
BENCH_CASE_DUBBO: {protocol.DUBBO, "dubbo/server-event.yml", "dubbo/1k-trace.yml"},
BENCH_CASE_ROCKETMQ: {protocol.ROCKETMQ, "rocketmq/server-event.yml", "rocketmq/1k-trace.yml"},
}

const (
Expand Down Expand Up @@ -65,6 +67,10 @@ func BenchmarkDubo(b *testing.B) {
testProtocolBench(b, b.N, SIZE_MESSAGE_PAIR, BENCH_CASE_DUBBO)
}

func BenchmarkRocketmq(b *testing.B) {
testProtocolBench(b, b.N, SIZE_MESSAGE_PAIR, BENCH_CASE_ROCKETMQ)
}

func testProtocolBench(b *testing.B, tps int, mpSize int, caseKey string) {
na := prepareNetworkAnalyzer()
if na == nil {
Expand Down
12 changes: 12 additions & 0 deletions collector/pkg/component/analyzer/network/network_analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ func TestDubboProtocol(t *testing.T) {
"dubbo/server-trace-short.yml")
}

func TestRocketMQProtocol(t *testing.T) {
testProtocol(t, "rocketmq/server-event.yml",
"rocketmq/server-trace-json.yml")
testProtocol(t, "rocketmq/server-event.yml",
"rocketmq/server-trace-rocketmq.yml")
testProtocol(t, "rocketmq/server-event.yml",
"rocketmq/server-trace-error.yml")
}

type NopProcessor struct {
}

Expand Down Expand Up @@ -359,6 +368,9 @@ func getSplit(data string) int {
if len(data) >= 5 && data[4] == '|' {
return 4
}
if len(data) >= 9 && data[8] == '|' {
return 8
}
return 0
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package factory

import (
"github.com/Kindling-project/kindling/collector/pkg/component/analyzer/network/protocol/rocketmq"
"sync"

"github.com/Kindling-project/kindling/collector/pkg/component/analyzer/network/protocol"
Expand Down Expand Up @@ -36,6 +37,7 @@ func NewParserFactory(options ...Option) *ParserFactory {
factory.protocolParsers[protocol.REDIS] = redis.NewRedisParser()
factory.protocolParsers[protocol.DUBBO] = dubbo.NewDubboParser()
factory.protocolParsers[protocol.DNS] = dns.NewDnsParser()
factory.protocolParsers[protocol.ROCKETMQ] = rocketmq.NewRocketMQParser()
factory.protocolParsers[protocol.NOSUPPORT] = generic.NewGenericParser()

return factory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const (
MYSQL = "mysql"
REDIS = "redis"
DUBBO = "dubbo"
ROCKETMQ = "rocketmq"
NOSUPPORT = "NOSUPPORT"
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package rocketmq

type rocketmqHeader struct {
Code int16 `json:"code"` //request code or response code
LanguageCode uint8 //Language code, such as 0 means `JAVA`, please refer to RocketMQLanguageCode
Version int16 `json:"version"` //version number
Opaque int32 `json:"opaque"` //Identification code, which identifies the request, the responder will return the original value of this identification code
Flag int32 `json:"flag"` //flag bit if request flag==0; else if response flag==1
Remark string `json:"remark"` //Stores the text information of the error message, which is convenient for developers to locate, may be empty
ExtFields map[string]string `json:"extFields"` //Custom field, different requests will have different parameters, can also be empty
SerializeTypeCurrentRPC string `json:"serializeTypeCurrentRPC"` //Encoding serializeType
Language string `json:"language"` //Language, string type in json sequence,such as `JAVA`
}

// requestMsgMap convert requestCode to requestMsg
var requestMsgMap = map[int16]string{
10: "SEND_MESSAGE",
11: "PULL_MESSAGE",
12: "QUERY_MESSAGE",
13: "QUERY_BROKER_OFFSET",
14: "QUERY_CONSUMER_OFFSET",
15: "UPDATE_CONSUMER_OFFSET",
17: "UPDATE_AND_CREATE_TOPIC",
21: "GET_ALL_TOPIC_CONFIG",
22: "GET_TOPIC_CONFIG_LIST",
23: "GET_TOPIC_NAME_LIST",
25: "UPDATE_BROKER_CONFIG",
26: "GET_BROKER_CONFIG",
27: "TRIGGER_DELETE_FILES",
28: "GET_BROKER_RUNTIME_INFO",
29: "SEARCH_OFFSET_BY_TIMESTAMP",
30: "GET_MAX_OFFSET",
31: "GET_MIN_OFFSET",
32: "GET_EARLIEST_MSG_STORETIME",
33: "VIEW_MESSAGE_BY_ID",
34: "HEART_BEAT",
36: "CONSUMER_SEND_MSG_BACK",
37: "END_TRANSACTION",
38: "GET_CONSUMER_LIST_BY_GROUP",
39: "CHECK_TRANSACTION_STATE",
40: "NOTIFY_CONSUMER_IDS_CHANGED",
41: "LOCK_BATCH_MQ",
42: "UNLOCK_BATCH_MQ",
43: "GET_ALL_CONSUMER_OFFSET",
45: "GET_ALL_DELAY_OFFSET",
46: "CHECK_CLIENT_CONFIG",
50: "UPDATE_AND_CREATE_ACL_CONFIG",
51: "DELETE_ACL_CONFIG",
52: "GET_BROKER_CLUSTER_ACL_INFO",
53: "UPDATE_GLOBAL_WHITE_ADDRS_CONFIG",
54: "GET_BROKER_CLUSTER_ACL_CONFIG",
100: "PUT_KV_CONFIG",
101: "GET_KV_CONFIG",
102: "DELETE_KV_CONFIG",
103: "REGISTER_BROKER",
104: "UNREGISTER_BROKER",
105: "GET_ROUTEINFO_BY_TOPIC",
106: "GET_BROKER_CLUSTER_INFO",
200: "UPDATE_AND_CREATE_SUBSCRIPTIONGROUP",
201: "GET_ALL_SUBSCRIPTIONGROUP_CONFIG",
202: "GET_TOPIC_STATS_INFO",
203: "GET_CONSUMER_CONNECTION_LIST",
204: "GET_PRODUCER_CONNECTION_LIST",
205: "WIPE_WRITE_PERM_OF_BROKER",
206: "GET_ALL_TOPIC_LIST_FROM_NAMESERVER",
207: "DELETE_SUBSCRIPTIONGROUP",
208: "GET_CONSUME_STATS",
209: "SUSPEND_CONSUMER",
210: "RESUME_CONSUMER",
211: "RESET_CONSUMER_OFFSET_IN_CONSUMER",
212: "RESET_CONSUMER_OFFSET_IN_BROKER",
213: "ADJUST_CONSUMER_THREAD_POOL",
214: "WHO_CONSUME_THE_MESSAGE",
215: "DELETE_TOPIC_IN_BROKER",
216: "DELETE_TOPIC_IN_NAMESRV",
219: "GET_KVLIST_BY_NAMESPACE",
220: "RESET_CONSUMER_CLIENT_OFFSET",
221: "GET_CONSUMER_STATUS_FROM_CLIENT",
222: "INVOKE_BROKER_TO_RESET_OFFSET",
223: "INVOKE_BROKER_TO_GET_CONSUMER_STATUS",
224: "GET_TOPICS_BY_CLUSTER",
300: "QUERY_TOPIC_CONSUME_BY_WHO",
301: "REGISTER_FILTER_SERVER",
302: "REGISTER_MESSAGE_FILTER_CLASS",
303: "QUERY_CONSUME_TIME_SPAN",
304: "GET_SYSTEM_TOPIC_LIST_FROM_NS",
305: "GET_SYSTEM_TOPIC_LIST_FROM_BROKER",
306: "CLEAN_EXPIRED_CONSUMEQUEUE",
307: "GET_CONSUMER_RUNNING_INFO",
308: "QUERY_CORRECTION_OFFSET",
309: "CONSUME_MESSAGE_DIRECTLY",
310: "SEND_MESSAGE_V2",
311: "GET_UNIT_TOPIC_LIST",
312: "GET_HAS_UNIT_SUB_TOPIC_LIST",
313: "GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST",
314: "CLONE_GROUP_OFFSET",
315: "VIEW_BROKER_STATS_DATA",
316: "CLEAN_UNUSED_TOPIC",
317: "GET_BROKER_CONSUME_STATS",
318: "UPDATE_NAMESRV_CONFIG",
319: "GET_NAMESRV_CONFIG",
320: "SEND_BATCH_MESSAGE",
321: "QUERY_CONSUME_QUEUE",
322: "QUERY_DATA_VERSION",
323: "RESUME_CHECK_HALF_MESSAGE",
324: "SEND_REPLY_MESSAGE",
325: "SEND_REPLY_MESSAGE_V2",
326: "PUSH_REPLY_MESSAGE_TO_CLIENT",
327: "ADD_WRITE_PERM_OF_BROKER",
328: "GET_ALL_PRODUCER_INFO",
329: "DELETE_EXPIRED_COMMITLOG",
}

// responseErrMsgMap convert responseCode to ErrMsg
var responseErrMsgMap = map[int16]string{
10: "FLUSH_DISK_TIMEOUT",
11: "SLAVE_NOT_AVAILABLE",
12: "FLUSH_SLAVE_TIMEOUT",
13: "MESSAGE_ILLEGAL",
14: "SERVICE_NOT_AVAILABLE",
15: "VERSION_NOT_SUPPORTED",
16: "NO_PERMISSION",
17: "TOPIC_NOT_EXIST",
18: "TOPIC_EXIST_ALREADY",
19: "PULL_NOT_FOUND",
20: "PULL_RETRY_IMMEDIATELY",
21: "PULL_OFFSET_MOVED",
22: "QUERY_NOT_FOUND",
23: "SUBSCRIPTION_PARSE_FAILED",
24: "SUBSCRIPTION_NOT_EXIST",
25: "SUBSCRIPTION_NOT_LATEST",
26: "SUBSCRIPTION_GROUP_NOT_EXIST",
27: "FILTER_DATA_NOT_EXIST",
28: "FILTER_DATA_NOT_LATEST",
200: "TRANSACTION_SHOULD_COMMIT",
201: "TRANSACTION_SHOULD_ROLLBACK",
202: "TRANSACTION_STATE_UNKNOW",
203: "TRANSACTION_STATE_GROUP_WRONG",
204: "NO_BUYER_ID",
205: "NOT_IN_CURRENT_UNIT",
206: "CONSUMER_NOT_ONLINE",
207: "CONSUME_MSG_TIMEOUT",
208: "NO_MESSAGE",
209: "UPDATE_AND_CREATE_ACL_CONFIG_FAILED",
210: "DELETE_ACL_CONFIG_FAILED",
211: "UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED",
}

// rocketMQLanguageCode convert languageCode to language
var rocketmqLanguageCode = map[uint8]string{
0: "JAVA",
1: "CPP",
2: "DOTNET",
3: "PYTHON",
4: "DELPHI",
5: "ERLANG",
6: "RUBY",
7: "OTHER",
8: "HTTP",
9: "GO",
10: "PHP",
11: "OMS",
12: "RUST",
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package rocketmq

import "github.com/Kindling-project/kindling/collector/pkg/component/analyzer/network/protocol"

func NewRocketMQParser() *protocol.ProtocolParser {
requestParser := protocol.CreatePkgParser(fastfailRocketMQRequest(), parseRocketMQRequest())
responseParser := protocol.CreatePkgParser(fastfailRocketMQResponse(), parseRocketMQResponse())

return protocol.NewProtocolParser(protocol.ROCKETMQ, requestParser, responseParser, nil)
}
Loading