Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rocketmq Protocol Identification and Analysis (KindlingProject#328) #2

Merged
merged 10 commits into from
Dec 1, 2022
Merged
2 changes: 1 addition & 1 deletion .github/workflows/kindling-ci.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: KINDLING-CI
name: Kindling-agent-CI

on:
push:
Expand Down
29 changes: 29 additions & 0 deletions .github/workflows/kindling-front-CI.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Kindling-front-CI

on:
push:
branches: [main]
workflow_dispatch:

jobs:
build-latest-test:
if: github.repository == 'CloudDectective-Harmonycloud/kindling'
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- name: Set TAG
run: echo "TAG=latesttest" >> $GITHUB_ENV
- uses: actions/checkout@v3
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_HUB_USERNAME}}
password: ${{ secrets.DOCKER_HUB_PASSWORD }}
- name: Build and push
uses: docker/build-push-action@v3
with:
context: ${{ github.workspace }}/camera-front
file: ${{ github.workspace }}/camera-front/Dockerfile
push: true
tags: kindlingproject/kindling-camera-front:${{ env.TAG }}
21 changes: 21 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,27 @@
1. All notable changes to this project will be documented in this file.
2. Records in this file are not identical to the title of their Pull Requests. A detailed description is necessary for understanding what changes are and why they are made.

## Unreleased
### New features
-
- Support the protocol RocketMQ.([#328](https://github.com/KindlingProject/kindling/pull/328))
- Add a new tool: A debug tool for Trace Profiling is provided for developers to troubleshoot problems.([#363](https://github.com/CloudDectective-Harmonycloud/kindling/pull/363))


### Enhancements
-
-
-

### Bug fixes
-
-
- Fix potential deadlock of exited thread delay queue. ([#373](https://github.com/CloudDectective-Harmonycloud/kindling/pull/373))
- Fix the bug that cpuEvent cache size continuously increases even if trace profiling is not enabled.([#362](https://github.com/CloudDectective-Harmonycloud/kindling/pull/362))
- Fix the bug that duplicate CPU events are indexed into Elasticsearch. ([#359](https://github.com/KindlingProject/kindling/pull/359))
- Implement the delay queue for exited thread, so as to avoid losing the data in the period before the thread exits. ([#365](https://github.com/CloudDectective-Harmonycloud/kindling/pull/365))
- Fix the bug of incomplete records when threads arrive at the cpu analyzer for the first time. ([#364](https://github.com/CloudDectective-Harmonycloud/kindling/pull/364))

## v0.5.0 - 2022-11-02
### New features
- Add a new feature: Trace Profiling. See more details about it on our [website](http://kindling.harmonycloud.cn). ([#335](https://github.com/CloudDectective-Harmonycloud/kindling/pull/335))
Expand Down
2 changes: 1 addition & 1 deletion camera-front/src/containers/thread/EventDetail.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ export default function EventDetail(props: Props) {

const init = (data) => {
let info: any = {};
if (data.type === 'net' && data.traceInfo) {
if (data.type === 'net' && !_.isEmpty(data.traceInfo)) {
let netColumns = _.cloneDeep(netTraceList);
let totalTime = _.find(data.traceInfo?.metrics, {Name: 'request_total_time'});
let traceData: any = {
Expand Down
6 changes: 6 additions & 0 deletions camera-front/src/containers/thread/camera/draw2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type ILineType = 'requestTime' | 'cruxTime' | 'trace' | 'rt';
// }
const eventList = (getStore('theme') || 'light') === 'light' ? LEvent : DEvent;
class Camera {
parentRef: any;
theme: 'light' | 'dark' = 'light';
data: IThread[] = [];
lineTimeList: ILineTime[] = [];
Expand Down Expand Up @@ -61,6 +62,7 @@ class Camera {
eventClick: (evt: any) => void;

constructor (option: IOption) {
this.parentRef = option.parentRef;
this.theme = (getStore('theme') as any) || 'light';
this.data = option.data;
this.lineTimeList = option.lineTimeList;
Expand Down Expand Up @@ -1133,6 +1135,7 @@ class Camera {
}
if (data.length === 0) {
data = _.concat([], this.data);
this.parentRef && this.parentRef.closeTraceAnaliysis();
}
} else {
data = _.concat([], this.data);
Expand All @@ -1158,6 +1161,9 @@ class Camera {
d3.select(`#thread_warp_${tid}`).attr('transform', `translate(0, ${this.threadHeight * idx})` );
});
}
closeShowTrace() {
this.showTraceFlag = false;
}

/**
* 底部brush 时间轴相关事件、绘制方法
Expand Down
9 changes: 7 additions & 2 deletions camera-front/src/containers/thread/camera/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class CameraWarp extends React.Component<IProps, IState> {
showLog: true
}
}
camera = new Camera(this.props.option);
camera = new Camera({...this.props.option, parentRef: this});
observer: any = null;

componentDidMount() {
Expand All @@ -39,7 +39,7 @@ class CameraWarp extends React.Component<IProps, IState> {
componentDidUpdate(prevProps: Readonly<IProps>): void {
if (!_.isEqual(prevProps.option.data, this.props.option.data)) {
this.clearSupport();
this.camera = new Camera(this.props.option);
this.camera = new Camera({...this.props.option, parentRef: this});
this.print();
}
}
Expand Down Expand Up @@ -94,6 +94,11 @@ class CameraWarp extends React.Component<IProps, IState> {
})
}
}
closeTraceAnaliysis = () => {
this.setState({
supportTrace: false
});
}
toggleChartBrush = () => {
this.camera.addChartBrush();
// this.setState((prevState) => ({
Expand Down
1 change: 1 addition & 0 deletions camera-front/src/containers/thread/camera/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export interface IOption {
barWidth?: number;
barPadding?: number;
padding?: number;
parentRef?: any;
eventClick: (evt: any) => void;
nameClick: (tid: string) => void;
}
Expand Down
6 changes: 6 additions & 0 deletions camera-front/src/containers/thread/camera/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,12 @@ export const dataHandle = (data: any, timeRange, trace: any) => {
// if (eventObj.endTime > timeRange[0] && eventObj.startTime < timeRange[1]) {
// threadObj.eventList.push(eventObj);
// }
} else {
if (type === '0') {
onFlag++;
} else {
offFlag++;
}
}
startTime = endTime;
});
Expand Down
2 changes: 2 additions & 0 deletions camera-front/src/containers/thread/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,10 @@ function Thread() {
fileList: selectFileList,
eventList: selectEventList.length > 0 ? selectEventList : [..._.map(eventList, 'value'), 'net', 'file']
};
(cameraRef.current as any).closeTraceAnaliysis();
let camera = (cameraRef.current as any)?.camera;
camera.reprintByFilter(filterParams);
camera.closeShowTrace();
}
// 清空重置所有的筛选条件
const resetAllField = () => {
Expand Down
6 changes: 5 additions & 1 deletion collector/docker/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ analyzers:
proc_root: /proc
# The protocol parsers which is enabled
# When dissectors are enabled, agent will analyze the payload and enrich metric/trace with its content.
protocol_parser: [ http, mysql, dns, redis, kafka ]
protocol_parser: [ http, mysql, dns, redis, kafka, rocketmq ]
# Which URL clustering method should be used to shorten the URL of HTTP request.
# This is useful for decrease the cardinality of URLs.
# Valid values: ["noparam", "alphabet"]
Expand Down Expand Up @@ -88,6 +88,10 @@ analyzers:
- key: "dns"
ports: [ 53 ]
slow_threshold: 100
- key: "rocketmq"
ports: [ 9876, 10911 ]
slow_threshold: 500


processors:
k8smetadataprocessor:
Expand Down
2 changes: 1 addition & 1 deletion collector/pkg/aggregator/label_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *LabelSelectors) AppendSelectors(selectors ...LabelSelector) {
s.selectors = append(s.selectors, selectors...)
}

const maxLabelKeySize = 35
const maxLabelKeySize = 37

type LabelKeys struct {
// LabelKeys will be used as key of map, so it is must be an array instead of a slice.
Expand Down
39 changes: 24 additions & 15 deletions collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package cpuanalyzer

import (
"fmt"
"strconv"
"sync"
"time"

"github.com/Kindling-project/kindling/collector/pkg/component"
"github.com/Kindling-project/kindling/collector/pkg/component/analyzer"
"github.com/Kindling-project/kindling/collector/pkg/component/consumer"
"github.com/Kindling-project/kindling/collector/pkg/model"
"github.com/Kindling-project/kindling/collector/pkg/model/constnames"
"go.uber.org/zap/zapcore"
"strconv"
"sync"
)

const (
Expand All @@ -23,8 +25,8 @@ type CpuAnalyzer struct {
sendEventsRoutineMap sync.Map
lock sync.Mutex
telemetry *component.TelemetryTools

nextConsumers []consumer.Consumer
tidExpiredQueue *tidDeleteQueue
nextConsumers []consumer.Consumer
}

func (ca *CpuAnalyzer) Type() analyzer.Type {
Expand All @@ -43,6 +45,8 @@ func NewCpuAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consum
nextConsumers: consumers,
}
ca.cpuPidEvents = make(map[uint32]map[uint32]*TimeSegments, 100000)
ca.tidExpiredQueue = newTidDeleteQueue()
go ca.TidDelete(30*time.Second, 10*time.Second)
return ca
}

Expand Down Expand Up @@ -221,20 +225,25 @@ func (ca *CpuAnalyzer) PutEventToSegments(pid uint32, tid uint32, threadName str
(newTimeSegments.BaseTime+uint64(i+1))*nanoToSeconds)
newTimeSegments.Segments.UpdateByIndex(i, segment)
}
val := newTimeSegments.Segments.GetByIndex(0)
segment := val.(*Segment)
segment.putTimedEvent(event)

endOffset := int(event.EndTimestamp()/nanoToSeconds - newTimeSegments.BaseTime)

for i := 0; i <= endOffset && i < maxSegmentSize; i++ {
val := newTimeSegments.Segments.GetByIndex(i)
segment := val.(*Segment)
segment.putTimedEvent(event)
segment.IsSend = 0
}

tidCpuEvents[tid] = newTimeSegments
}
}

func (ca *CpuAnalyzer) trimExitedThread(pid uint32, tid uint32) {
ca.lock.Lock()
defer ca.lock.Unlock()
tidEventsMap := ca.cpuPidEvents[pid]
if tidEventsMap == nil {
return
}
ca.telemetry.Logger.Debugf("Receive a procexit pid=%d, tid=%d, which will be deleted from map", pid, tid)
delete(tidEventsMap, tid)
ca.tidExpiredQueue.queueMutex.Lock()
defer ca.tidExpiredQueue.queueMutex.Unlock()
ca.telemetry.Logger.Debugf("Receive a procexit pid=%d, tid=%d, which will be deleted from map after 10 seconds. ", pid, tid)

cacheElem := deleteTid{pid: pid, tid: tid, exitTime: time.Now()}
ca.tidExpiredQueue.Push(cacheElem)
}
74 changes: 74 additions & 0 deletions collector/pkg/component/analyzer/cpuanalyzer/delete_tid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package cpuanalyzer

import (
"sync"
"time"
)

type tidDeleteQueue struct {
queueMutex sync.Mutex
queue []deleteTid
}

type deleteTid struct {
pid uint32
tid uint32
exitTime time.Time
}

func newTidDeleteQueue() *tidDeleteQueue {
return &tidDeleteQueue{queue: make([]deleteTid, 0)}
}

func (dq *tidDeleteQueue) GetFront() *deleteTid {
if len(dq.queue) > 0 {
return &dq.queue[0]
}
return nil
}

func (dq *tidDeleteQueue) Push(elem deleteTid) {
dq.queue = append(dq.queue, elem)
}

func (dq *tidDeleteQueue) Pop() {
if len(dq.queue) > 0 {
dq.queue = dq.queue[1:len(dq.queue)]
}
}

func (ca *CpuAnalyzer) TidDelete(interval time.Duration, expiredDuration time.Duration) {
for {
select {
case <-time.After(interval):
now := time.Now()
func() {
ca.tidExpiredQueue.queueMutex.Lock()
defer ca.tidExpiredQueue.queueMutex.Unlock()
for {
elem := ca.tidExpiredQueue.GetFront()
if elem == nil {
break
}
if elem.exitTime.Add(expiredDuration).After(now) {
break
}
//Delete expired threads (current_time >= thread_exit_time + interval_time).
func() {
ca.lock.Lock()
defer ca.lock.Unlock()
tidEventsMap := ca.cpuPidEvents[elem.pid]
if tidEventsMap == nil {
ca.tidExpiredQueue.Pop()
} else {
ca.telemetry.Logger.Debugf("Delete expired thread... pid=%d, tid=%d", elem.pid, elem.tid)
//fmt.Printf("Go Test: Delete expired thread... pid=%d, tid=%d\n", elem.pid, elem.tid)
delete(tidEventsMap, elem.tid)
ca.tidExpiredQueue.Pop()
}
}()
}
}()
}
}
}
Loading