diff --git a/collector/docker/kindling-collector-config.yml b/collector/docker/kindling-collector-config.yml index 2675a735e..0be79b7f5 100644 --- a/collector/docker/kindling-collector-config.yml +++ b/collector/docker/kindling-collector-config.yml @@ -53,10 +53,10 @@ analyzers: edge_events_window_size: 2 # java_trace_delete_interval is the interval for cleaning up expired data in javatraces. # The unit is seconds. - java_trace_delete_interval: 10 + java_trace_delete_interval: 20 # java_trace_expiration_time is the expiration time for data in javatraces. # The unit is seconds. - java_trace_expiration_time: 30 + java_trace_expiration_time: 120 tcpconnectanalyzer: channel_size: 10000 wait_event_second: 10 diff --git a/collector/pkg/component/analyzer/cpuanalyzer/config.go b/collector/pkg/component/analyzer/cpuanalyzer/config.go index 1a1da3329..64e395d42 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/config.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/config.go @@ -26,17 +26,16 @@ type Config struct { // JavaTraceExpirationTime is the expiration time for data in javatraces. // The unit is seconds. JavaTraceExpirationTime int `mapstructure:"java_trace_expiration_time"` - } func NewDefaultConfig() *Config { return &Config{ - SamplingInterval: 5, - OpenJavaTraceSampling: false, - JavaTraceSlowTime: 500, - SegmentSize: 40, - EdgeEventsWindowSize: 2, - JavaTraceDeleteInterval: 10, - JavaTraceExpirationTime: 30, + SamplingInterval: 5, + OpenJavaTraceSampling: false, + JavaTraceSlowTime: 500, + SegmentSize: 40, + EdgeEventsWindowSize: 2, + JavaTraceDeleteInterval: 20, + JavaTraceExpirationTime: 120, } } diff --git a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go index b7fc9ed55..f37ff65ac 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go @@ -27,10 +27,11 @@ const ( ) type CpuAnalyzer struct { - cfg *Config + cfg *Config cpuPidEvents map[uint32]map[uint32]*TimeSegments routineSize *atomic.Int32 lock sync.RWMutex + jtlock sync.RWMutex telemetry *component.TelemetryTools tidExpiredQueue *tidDeleteQueue javaTraces map[string]*TransactionIdEvent @@ -67,8 +68,8 @@ func NewCpuAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consum func (ca *CpuAnalyzer) Start() error { interval := time.Duration(ca.cfg.JavaTraceDeleteInterval) * time.Second - expiredDuration :=time.Duration(ca.cfg.JavaTraceExpirationTime) * time.Second - go ca.JavaTraceDelete(interval,expiredDuration) + expiredDuration := time.Duration(ca.cfg.JavaTraceExpirationTime) * time.Second + go ca.JavaTraceDelete(interval, expiredDuration) return nil } @@ -119,14 +120,16 @@ func (ca *CpuAnalyzer) ConsumeTransactionIdEvent(event *model.KindlingEvent) { } func (ca *CpuAnalyzer) analyzerJavaTraceTime(ev *TransactionIdEvent) { - key := ev.TraceId+ev.PidString - ca.javaTraceExpiredQueue.Push(deleteVal{key: key,enterTime: time.Now()}) + ca.jtlock.Lock() + defer ca.jtlock.Unlock() + key := ev.TraceId + ev.PidString + ca.javaTraceExpiredQueue.Push(deleteVal{key: key, enterTime: time.Now()}) if ev.IsEntry == 1 { ca.javaTraces[key] = ev } else { - oldEvent,ok := ca.javaTraces[key] - if(!ok){ - ca.telemetry.Logger.Warnf("No javaTraces traceid=%s, pid=%s", ev.TraceId,ev.PidString) + oldEvent, ok := ca.javaTraces[key] + if !ok { + ca.telemetry.Logger.Warnf("No javaTraces traceid=%s, pid=%s", ev.TraceId, ev.PidString) return } pid, _ := strconv.ParseInt(ev.PidString, 10, 64) diff --git a/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace.go b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace.go index 636f9ef8e..8028d26b3 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace.go @@ -16,7 +16,7 @@ type deleteVal struct { } func newJavaTraceDeleteQueue() *javaTraceDeleteQueue { - return &javaTraceDeleteQueue{queue: make([]deleteVal,0)} + return &javaTraceDeleteQueue{queue: make([]deleteVal, 0)} } func (dq *javaTraceDeleteQueue) GetFront() *deleteVal { @@ -57,8 +57,8 @@ func (ca *CpuAnalyzer) JavaTraceDelete(interval time.Duration, expiredDuration t } func() { - ca.lock.Lock() - defer ca.lock.Unlock() + ca.jtlock.Lock() + defer ca.jtlock.Unlock() event := ca.javaTraces[val.key] if event == nil { ca.javaTraceExpiredQueue.Pop() diff --git a/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go index 311434213..9571bc1ce 100644 --- a/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go +++ b/collector/pkg/component/analyzer/cpuanalyzer/delete_javatrace_test.go @@ -10,16 +10,15 @@ import ( ) var ( - cnt int - quitCnt int + cnt int + quitCnt int ) - func TestJavaTraceDeleteQueue(t *testing.T) { jt := make(map[string]*TransactionIdEvent, 100000) testTelemetry := component.NewTelemetryManager().GetGlobalTelemetryTools() - mycfg := &Config{SegmentSize: 40,JavaTraceDeleteInterval:15,JavaTraceExpirationTime: 10} + mycfg := &Config{SegmentSize: 40, JavaTraceDeleteInterval: 15, JavaTraceExpirationTime: 10} ca = &CpuAnalyzer{javaTraces: jt, telemetry: testTelemetry, cfg: mycfg} ca.javaTraceExpiredQueue = newJavaTraceDeleteQueue() ca.Start() @@ -29,13 +28,13 @@ func TestJavaTraceDeleteQueue(t *testing.T) { ev.TraceId = strconv.Itoa(rand.Intn(10000)) ev.PidString = strconv.Itoa(rand.Intn(10000)) ev.IsEntry = 1 - key:= ev.TraceId + ev.PidString + key := ev.TraceId + ev.PidString ca.javaTraces[key] = ev val := new(deleteVal) - val.key = ev.TraceId+ev.PidString + val.key = ev.TraceId + ev.PidString val.enterTime = time.Now() ca.javaTraceExpiredQueue.Push(*val) - t.Logf("pid=%s, tid=%s enter time=%s\n",ev.PidString, ev.TraceId, val.enterTime.Format("2006-01-02 15:04:05.000")) + t.Logf("pid=%s, tid=%s enter time=%s\n", ev.PidString, ev.TraceId, val.enterTime.Format("2006-01-02 15:04:05.000")) cnt++ time.Sleep(3 * time.Second) } @@ -44,8 +43,8 @@ func TestJavaTraceDeleteQueue(t *testing.T) { if cnt != quitCnt { t.Fatalf("The number of javatraces that entering and exiting the queue is not equal! enterCount=%d, exitCount=%d\n", cnt, quitCnt) } else { - t.Logf("All javatraces have cleaned normally. enterCount=%d, exitCount=%d\n", cnt, quitCnt) + t.Logf("All javatraces have cleaned normally. enterCount=%d, exitCount=%d\n", cnt, quitCnt) } - time.Sleep(10*time.Minute) + time.Sleep(10 * time.Minute) } diff --git a/deploy/agent/kindling-collector-config.yml b/deploy/agent/kindling-collector-config.yml index 2675a735e..0be79b7f5 100644 --- a/deploy/agent/kindling-collector-config.yml +++ b/deploy/agent/kindling-collector-config.yml @@ -53,10 +53,10 @@ analyzers: edge_events_window_size: 2 # java_trace_delete_interval is the interval for cleaning up expired data in javatraces. # The unit is seconds. - java_trace_delete_interval: 10 + java_trace_delete_interval: 20 # java_trace_expiration_time is the expiration time for data in javatraces. # The unit is seconds. - java_trace_expiration_time: 30 + java_trace_expiration_time: 120 tcpconnectanalyzer: channel_size: 10000 wait_event_second: 10