Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rocketmq Protocol Identification and Analysis #328

Merged
merged 5 commits into from
Nov 30, 2022

Conversation

thenicetgp
Copy link
Contributor

@thenicetgp thenicetgp commented Oct 10, 2022

Summary

Support rocketmq protocol identification and analysis to provide kindling analysis application and rocketmq interactive performance analysis

1. Design of Header

Since there are two types of rocketmq codecs, JSON and ROCKETMQ, when SerializeType is JSON, the json string is directly intercepted according to headerLength and then deserialized to obtain the header. When SerializeType is ROCKETMQ, it is read in sequence. Combining the two types of headers can be designed as follows:

type rocketmqHeader struct {
	Code                    int16             `json:"code"` //request code or response code
	LanguageCode            uint8             //Language code, such as 0 means `JAVA`, please refer to RocketMQLanguageCode
	Version                 int16             `json:"version"`                 //version number
	Opaque                  int32             `json:"opaque"`                  //Identification code, which identifies the request, the responder will return the original value of this identification code
	Flag                    int32             `json:"flag"`                    //flag bit if request flag==0; else if response flag==1
	Remark                  string            `json:"remark"`                  //Stores the text information of the error message, which is convenient for developers to locate, may be empty
	ExtFields               map[string]string `json:"extFields"`               //Custom field, different requests will have different parameters, can also be empty
	SerializeTypeCurrentRPC string            `json:"serializeTypeCurrentRPC"` //Encoding serializeType
	Language                string            `json:"language"`                //Language, string type in json sequence,such as `JAVA`
}

2. parseRocketMQRequest and parseRocketMQResponse

// Parsing the message content
message.ReadInt32(0, &payloadLength)
header := &rocketmqHeader{ExtFields: map[string]string{}}
//When serializeType==0, the serialization type is JSON, and the json sequence is directly intercepted for deserialization
if serializeType = message.Data[4]; serializeType == 0 {
	message.ReadInt32(4, &headerLength)
	_, headerBytes, err := message.ReadBytes(8, int(headerLength))
	if err != nil {
		return false, true
	}
	if err = json.Unmarshal(headerBytes, header); err != nil {
		return false, true
	}
} else if serializeType == 1 {
	parseHeader(message, header)
} else {
	return false, true
}

//When serializeType==1, the serialization type is RocketMQ, and the fields are stored strictly in the order of code, languagecode, version, opaque, flag, remark, and extfields
func parseHeader(message *protocol.PayloadMessage, header *rocketmqHeader) {
	var (
		remarkLen   int32
		extFieldLen int32
		offset      int
	)
        message.ReadInt16(8, &header.Code)
	header.LanguageCode = message.Data[10]
	message.ReadInt16(11, &header.Version)
	message.ReadInt32(13, &header.Opaque)
	message.ReadInt32(17, &header.Flag)

	message.ReadInt32(21, &remarkLen)
	offset, _ = message.ReadInt32(25, &extFieldLen)
	if extFieldLen > 0 && remarkLen == 0 {
		extFieldMap := make(map[string]string)
		var (
			keyLen   int16
			valueLen int32
			key      []byte
			value    []byte
		)
		//offset starts from 29
		var extFieldBytesLen = 0
		for extFieldBytesLen < int(extFieldLen) && extFieldBytesLen+29 < len(message.Data) {
			offset, _ = message.ReadInt16(offset, &keyLen)
			offset, key, _ = message.ReadBytes(offset, int(keyLen))
			offset, _ = message.ReadInt32(offset, &valueLen)
			offset, value, _ = message.ReadBytes(offset, int(valueLen))
			extFieldMap[string(key)] = string(value)
			extFieldBytesLen = extFieldBytesLen + 2 + int(keyLen) + 4 + int(valueLen)
			if string(key) == "topic" || string(key) == "b" {
				break
			}
		}
		//Update the field `ExtFields` of the header
		header.ExtFields = extFieldMap
	}
}

3. Test whether the parsing process is correct TestParseRocketmqJsonAndRocketMQ

kindling/collector/pkg/component/analyzer/network/protocol/rocketmq/rocketmq_parser_test.go

Screenshots

Screen Shot 2022-09-28 at 14 04 51

4. Add Attribute indicator

4.1 rocketmq_request
// Store the parsed attribute via AddStringAttribute() or AttIntAttribute()
message.AddStringAttribute(constlabels.RocketMQRequestMsg, requestMsgMap[header.Code])
message.AddIntAttribute(constlabels.RocketMQOpaque, int64(header.Opaque))

//topicName maybe be stored in key `topic` or `b`
if header.ExtFields["topic"] != "" {
	message.AddStringAttribute(constlabels.ContentKey, fmt.Sprintf("Topic:%v", header.ExtFields["topic"]))
} else if header.ExtFields["b"] != "" {
	message.AddStringAttribute(constlabels.ContentKey, fmt.Sprintf("Topic:%v", header.ExtFields["b"]))
} else {
	message.AddStringAttribute(constlabels.ContentKey, requestMsgMap[header.Code])
}
4.2 rocketmq_response
if !message.HasAttribute(constlabels.RocketMQOpaque) ||
	message.GetIntAttribute(constlabels.RocketMQOpaque) != int64(header.Opaque) {

	return false, true
}
message.AddIntAttribute(constlabels.RocketMQErrCode, int64(header.Code))

//add RocketMQErrMsg if responseCode > 0
if header.Code > 0 {
	if _, ok := responseErrMsgMap[header.Code]; ok {
		message.AddStringAttribute(constlabels.RocketMQErrMsg, responseErrMsgMap[header.Code])
	} else if header.Remark != "" {
		message.AddStringAttribute(constlabels.RocketMQErrMsg, header.Remark)
	} else {
		message.AddStringAttribute(constlabels.RocketMQErrMsg, fmt.Sprintf("error:response code is %v", header.Code))
	}
	message.AddBoolAttribute(constlabels.IsError, true)
	message.AddIntAttribute(constlabels.ErrorType, int64(constlabels.ProtocolError))
}

5. Unit Test and Benchmark Test

5.1 Attribute indicators to be confirmed

        RocketMQOpaque     = "rocketmq_opaque"
	RocketMQRequestMsg = "rocketmq_request_msg"
	RocketMQErrMsg     = "rocketmq_error_msg"
	RocketMQErrCode    = "rocketmq_error_code"

5.2 created testdata/rocketmq/XXX.yml

Screen Shot 2022-09-23 at 12 19 38

5.3 Unit Test

Screen Shot 2022-09-23 at 12 33 44

5.4 Benchmark Test

Screen Shot 2022-10-10 at 14 01 01

6. label key convertor

// collector/pkg/component/consumer/exporter/tools/adapter/net_dict.go

var entityProtocol = []extraLabelsParam{
   //other protocol
   ...
   {[]dictionary{
      {constlabels.RequestContent, constlabels.ContentKey, String},
      {constlabels.ResponseContent, constlabels.RocketMQErrCode, FromInt64ToString},
   }, extraLabelsKey{ROCKETMQ}},
   ...
}

var spanProtocol = []extraLabelsParam{
   //other protocol
   ...
   {[]dictionary{
      {constlabels.SpanRocketMQRequestMsg, constlabels.RocketMQRequestMsg, String},
      {constlabels.SpanRocketMQErrMsg, constlabels.RocketMQErrMsg, String},
   }, extraLabelsKey{ROCKETMQ}},
   ...
}

var topologyProtocol = []extraLabelsParam{
   //other protocol
   ...
   {[]dictionary{
      {constlabels.StatusCode, constlabels.RocketMQErrCode, FromInt64ToString},
   }, extraLabelsKey{ROCKETMQ}},
   ...
}

// collector/pkg/component/consumer/processor/aggregateprocessor/processor.go
func newNetRequestLabelSelectors() *aggregator.LabelSelectors {
    return aggregator.NewLabelSelectors(
        ...
        aggregator.LabelSelector{Name: constlabels.RocketMQErrCode, VType: aggregator.IntType},
    )
}

7. Grafana

Screen Shot 2022-10-10 at 14 05 24

Does this close any open issues?

Closes #227

Other Information

Any other information that is important to this PR.

…ation and add some attributes

Signed-off-by: thenicetgp <13120413800@163.com>
Signed-off-by: thenicetgp <13120413800@163.com>
Signed-off-by: thenicetgp <13120413800@163.com>
@thenicetgp thenicetgp changed the title Support rocketmq protocol identification and analysis Rocketmq protocol identification and analysis Oct 10, 2022
@thenicetgp thenicetgp changed the title Rocketmq protocol identification and analysis Rocketmq Protocol Identification And Analysis Oct 10, 2022
@thenicetgp thenicetgp changed the title Rocketmq Protocol Identification And Analysis Rocketmq Protocol Identification and Analysis Oct 10, 2022
Copy link
Member

@dxsup dxsup left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll merge this PR after you update the changelog.

@thenicetgp
Copy link
Contributor Author

I'll merge this PR after you update the changelog.

Thanks, the CHANGELOG.md and docs/prometheus_metrics.md are updated.

@@ -140,6 +147,7 @@ These two terms are composed of two parts.
- **mysql**: `Error Code` of the error response.
- **dubbo**: `Error Code` of Dubbo request.
- **redis**: `0` if there is no error; `1` otherwise.
- - **rocketmq**: `Response Code` of RocketMQ response.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an extra -.

Suggested change
- - **rocketmq**: `Response Code` of RocketMQ response.
- **rocketmq**: `Response Code` of RocketMQ response.

Signed-off-by: thenicetgp <13120413800@163.com>
@dxsup dxsup merged commit 0f58b0d into KindlingProject:main Nov 30, 2022
fengjixuchui added a commit to fengjixuchui/kindling that referenced this pull request Dec 1, 2022
Rocketmq Protocol Identification and Analysis (KindlingProject#328)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support RocketMQ protocol analysis
2 participants