Skip to content

Commit

Permalink
Merge pull request #2 from KindlingProject/main
Browse files Browse the repository at this point in the history
Rocketmq Protocol Identification and Analysis (KindlingProject#328)
  • Loading branch information
fengjixuchui authored Dec 1, 2022
2 parents 8751bab + 0f58b0d commit acd453d
Show file tree
Hide file tree
Showing 52 changed files with 3,089 additions and 1,736 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/kindling-ci.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: KINDLING-CI
name: Kindling-agent-CI

on:
push:
Expand Down
29 changes: 29 additions & 0 deletions .github/workflows/kindling-front-CI.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Kindling-front-CI

on:
push:
branches: [main]
workflow_dispatch:

jobs:
build-latest-test:
if: github.repository == 'CloudDectective-Harmonycloud/kindling'
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- name: Set TAG
run: echo "TAG=latesttest" >> $GITHUB_ENV
- uses: actions/checkout@v3
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_HUB_USERNAME}}
password: ${{ secrets.DOCKER_HUB_PASSWORD }}
- name: Build and push
uses: docker/build-push-action@v3
with:
context: ${{ github.workspace }}/camera-front
file: ${{ github.workspace }}/camera-front/Dockerfile
push: true
tags: kindlingproject/kindling-camera-front:${{ env.TAG }}
21 changes: 21 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,27 @@
1. All notable changes to this project will be documented in this file.
2. Records in this file are not identical to the title of their Pull Requests. A detailed description is necessary for understanding what changes are and why they are made.

## Unreleased
### New features
-
- Support the protocol RocketMQ.([#328](https://github.com/KindlingProject/kindling/pull/328))
- Add a new tool: A debug tool for Trace Profiling is provided for developers to troubleshoot problems.([#363](https://github.com/CloudDectective-Harmonycloud/kindling/pull/363))


### Enhancements
-
-
-

### Bug fixes
-
-
- Fix potential deadlock of exited thread delay queue. ([#373](https://github.com/CloudDectective-Harmonycloud/kindling/pull/373))
- Fix the bug that cpuEvent cache size continuously increases even if trace profiling is not enabled.([#362](https://github.com/CloudDectective-Harmonycloud/kindling/pull/362))
- Fix the bug that duplicate CPU events are indexed into Elasticsearch. ([#359](https://github.com/KindlingProject/kindling/pull/359))
- Implement the delay queue for exited thread, so as to avoid losing the data in the period before the thread exits. ([#365](https://github.com/CloudDectective-Harmonycloud/kindling/pull/365))
- Fix the bug of incomplete records when threads arrive at the cpu analyzer for the first time. ([#364](https://github.com/CloudDectective-Harmonycloud/kindling/pull/364))

## v0.5.0 - 2022-11-02
### New features
- Add a new feature: Trace Profiling. See more details about it on our [website](http://kindling.harmonycloud.cn). ([#335](https://github.com/CloudDectective-Harmonycloud/kindling/pull/335))
Expand Down
2 changes: 1 addition & 1 deletion camera-front/src/containers/thread/EventDetail.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ export default function EventDetail(props: Props) {

const init = (data) => {
let info: any = {};
if (data.type === 'net' && data.traceInfo) {
if (data.type === 'net' && !_.isEmpty(data.traceInfo)) {
let netColumns = _.cloneDeep(netTraceList);
let totalTime = _.find(data.traceInfo?.metrics, {Name: 'request_total_time'});
let traceData: any = {
Expand Down
6 changes: 6 additions & 0 deletions camera-front/src/containers/thread/camera/draw2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type ILineType = 'requestTime' | 'cruxTime' | 'trace' | 'rt';
// }
const eventList = (getStore('theme') || 'light') === 'light' ? LEvent : DEvent;
class Camera {
parentRef: any;
theme: 'light' | 'dark' = 'light';
data: IThread[] = [];
lineTimeList: ILineTime[] = [];
Expand Down Expand Up @@ -61,6 +62,7 @@ class Camera {
eventClick: (evt: any) => void;

constructor (option: IOption) {
this.parentRef = option.parentRef;
this.theme = (getStore('theme') as any) || 'light';
this.data = option.data;
this.lineTimeList = option.lineTimeList;
Expand Down Expand Up @@ -1133,6 +1135,7 @@ class Camera {
}
if (data.length === 0) {
data = _.concat([], this.data);
this.parentRef && this.parentRef.closeTraceAnaliysis();
}
} else {
data = _.concat([], this.data);
Expand All @@ -1158,6 +1161,9 @@ class Camera {
d3.select(`#thread_warp_${tid}`).attr('transform', `translate(0, ${this.threadHeight * idx})` );
});
}
closeShowTrace() {
this.showTraceFlag = false;
}

/**
* 底部brush 时间轴相关事件、绘制方法
Expand Down
9 changes: 7 additions & 2 deletions camera-front/src/containers/thread/camera/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class CameraWarp extends React.Component<IProps, IState> {
showLog: true
}
}
camera = new Camera(this.props.option);
camera = new Camera({...this.props.option, parentRef: this});
observer: any = null;

componentDidMount() {
Expand All @@ -39,7 +39,7 @@ class CameraWarp extends React.Component<IProps, IState> {
componentDidUpdate(prevProps: Readonly<IProps>): void {
if (!_.isEqual(prevProps.option.data, this.props.option.data)) {
this.clearSupport();
this.camera = new Camera(this.props.option);
this.camera = new Camera({...this.props.option, parentRef: this});
this.print();
}
}
Expand Down Expand Up @@ -94,6 +94,11 @@ class CameraWarp extends React.Component<IProps, IState> {
})
}
}
closeTraceAnaliysis = () => {
this.setState({
supportTrace: false
});
}
toggleChartBrush = () => {
this.camera.addChartBrush();
// this.setState((prevState) => ({
Expand Down
1 change: 1 addition & 0 deletions camera-front/src/containers/thread/camera/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export interface IOption {
barWidth?: number;
barPadding?: number;
padding?: number;
parentRef?: any;
eventClick: (evt: any) => void;
nameClick: (tid: string) => void;
}
Expand Down
6 changes: 6 additions & 0 deletions camera-front/src/containers/thread/camera/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,12 @@ export const dataHandle = (data: any, timeRange, trace: any) => {
// if (eventObj.endTime > timeRange[0] && eventObj.startTime < timeRange[1]) {
// threadObj.eventList.push(eventObj);
// }
} else {
if (type === '0') {
onFlag++;
} else {
offFlag++;
}
}
startTime = endTime;
});
Expand Down
2 changes: 2 additions & 0 deletions camera-front/src/containers/thread/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,10 @@ function Thread() {
fileList: selectFileList,
eventList: selectEventList.length > 0 ? selectEventList : [..._.map(eventList, 'value'), 'net', 'file']
};
(cameraRef.current as any).closeTraceAnaliysis();
let camera = (cameraRef.current as any)?.camera;
camera.reprintByFilter(filterParams);
camera.closeShowTrace();
}
// 清空重置所有的筛选条件
const resetAllField = () => {
Expand Down
6 changes: 5 additions & 1 deletion collector/docker/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ analyzers:
proc_root: /proc
# The protocol parsers which is enabled
# When dissectors are enabled, agent will analyze the payload and enrich metric/trace with its content.
protocol_parser: [ http, mysql, dns, redis, kafka ]
protocol_parser: [ http, mysql, dns, redis, kafka, rocketmq ]
# Which URL clustering method should be used to shorten the URL of HTTP request.
# This is useful for decrease the cardinality of URLs.
# Valid values: ["noparam", "alphabet"]
Expand Down Expand Up @@ -88,6 +88,10 @@ analyzers:
- key: "dns"
ports: [ 53 ]
slow_threshold: 100
- key: "rocketmq"
ports: [ 9876, 10911 ]
slow_threshold: 500


processors:
k8smetadataprocessor:
Expand Down
2 changes: 1 addition & 1 deletion collector/pkg/aggregator/label_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *LabelSelectors) AppendSelectors(selectors ...LabelSelector) {
s.selectors = append(s.selectors, selectors...)
}

const maxLabelKeySize = 35
const maxLabelKeySize = 37

type LabelKeys struct {
// LabelKeys will be used as key of map, so it is must be an array instead of a slice.
Expand Down
39 changes: 24 additions & 15 deletions collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package cpuanalyzer

import (
"fmt"
"strconv"
"sync"
"time"

"github.com/Kindling-project/kindling/collector/pkg/component"
"github.com/Kindling-project/kindling/collector/pkg/component/analyzer"
"github.com/Kindling-project/kindling/collector/pkg/component/consumer"
"github.com/Kindling-project/kindling/collector/pkg/model"
"github.com/Kindling-project/kindling/collector/pkg/model/constnames"
"go.uber.org/zap/zapcore"
"strconv"
"sync"
)

const (
Expand All @@ -23,8 +25,8 @@ type CpuAnalyzer struct {
sendEventsRoutineMap sync.Map
lock sync.Mutex
telemetry *component.TelemetryTools

nextConsumers []consumer.Consumer
tidExpiredQueue *tidDeleteQueue
nextConsumers []consumer.Consumer
}

func (ca *CpuAnalyzer) Type() analyzer.Type {
Expand All @@ -43,6 +45,8 @@ func NewCpuAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consum
nextConsumers: consumers,
}
ca.cpuPidEvents = make(map[uint32]map[uint32]*TimeSegments, 100000)
ca.tidExpiredQueue = newTidDeleteQueue()
go ca.TidDelete(30*time.Second, 10*time.Second)
return ca
}

Expand Down Expand Up @@ -221,20 +225,25 @@ func (ca *CpuAnalyzer) PutEventToSegments(pid uint32, tid uint32, threadName str
(newTimeSegments.BaseTime+uint64(i+1))*nanoToSeconds)
newTimeSegments.Segments.UpdateByIndex(i, segment)
}
val := newTimeSegments.Segments.GetByIndex(0)
segment := val.(*Segment)
segment.putTimedEvent(event)

endOffset := int(event.EndTimestamp()/nanoToSeconds - newTimeSegments.BaseTime)

for i := 0; i <= endOffset && i < maxSegmentSize; i++ {
val := newTimeSegments.Segments.GetByIndex(i)
segment := val.(*Segment)
segment.putTimedEvent(event)
segment.IsSend = 0
}

tidCpuEvents[tid] = newTimeSegments
}
}

func (ca *CpuAnalyzer) trimExitedThread(pid uint32, tid uint32) {
ca.lock.Lock()
defer ca.lock.Unlock()
tidEventsMap := ca.cpuPidEvents[pid]
if tidEventsMap == nil {
return
}
ca.telemetry.Logger.Debugf("Receive a procexit pid=%d, tid=%d, which will be deleted from map", pid, tid)
delete(tidEventsMap, tid)
ca.tidExpiredQueue.queueMutex.Lock()
defer ca.tidExpiredQueue.queueMutex.Unlock()
ca.telemetry.Logger.Debugf("Receive a procexit pid=%d, tid=%d, which will be deleted from map after 10 seconds. ", pid, tid)

cacheElem := deleteTid{pid: pid, tid: tid, exitTime: time.Now()}
ca.tidExpiredQueue.Push(cacheElem)
}
74 changes: 74 additions & 0 deletions collector/pkg/component/analyzer/cpuanalyzer/delete_tid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package cpuanalyzer

import (
"sync"
"time"
)

type tidDeleteQueue struct {
queueMutex sync.Mutex
queue []deleteTid
}

type deleteTid struct {
pid uint32
tid uint32
exitTime time.Time
}

func newTidDeleteQueue() *tidDeleteQueue {
return &tidDeleteQueue{queue: make([]deleteTid, 0)}
}

func (dq *tidDeleteQueue) GetFront() *deleteTid {
if len(dq.queue) > 0 {
return &dq.queue[0]
}
return nil
}

func (dq *tidDeleteQueue) Push(elem deleteTid) {
dq.queue = append(dq.queue, elem)
}

func (dq *tidDeleteQueue) Pop() {
if len(dq.queue) > 0 {
dq.queue = dq.queue[1:len(dq.queue)]
}
}

func (ca *CpuAnalyzer) TidDelete(interval time.Duration, expiredDuration time.Duration) {
for {
select {
case <-time.After(interval):
now := time.Now()
func() {
ca.tidExpiredQueue.queueMutex.Lock()
defer ca.tidExpiredQueue.queueMutex.Unlock()
for {
elem := ca.tidExpiredQueue.GetFront()
if elem == nil {
break
}
if elem.exitTime.Add(expiredDuration).After(now) {
break
}
//Delete expired threads (current_time >= thread_exit_time + interval_time).
func() {
ca.lock.Lock()
defer ca.lock.Unlock()
tidEventsMap := ca.cpuPidEvents[elem.pid]
if tidEventsMap == nil {
ca.tidExpiredQueue.Pop()
} else {
ca.telemetry.Logger.Debugf("Delete expired thread... pid=%d, tid=%d", elem.pid, elem.tid)
//fmt.Printf("Go Test: Delete expired thread... pid=%d, tid=%d\n", elem.pid, elem.tid)
delete(tidEventsMap, elem.tid)
ca.tidExpiredQueue.Pop()
}
}()
}
}()
}
}
}
Loading

0 comments on commit acd453d

Please sign in to comment.