Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduled cleaning in javatraces , and add null pointer checking #514

Merged
merged 23 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions collector/docker/build-asyncprofiler.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
ASYNC_PROFILER=async-profiler-1.0.3-linux-x64.tar.gz
KINDLING_JAVA=kindling-java-1.0.3.tar.gz
dxsup marked this conversation as resolved.
Show resolved Hide resolved
ASYNC_PROFILER=async-profiler-1.0.4-linux-x64.tar.gz
KINDLING_JAVA=kindling-java-1.0.4.tar.gz
APM_ALL=apm-all-3.1.0.jar

SCRIPT_DIR="$(cd "$(dirname "$SCRIPT_BIN")" > /dev/null 2>&1; pwd -P)"
Expand Down
8 changes: 7 additions & 1 deletion collector/docker/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ analyzers:
# edge_events_window_size is the size of the duration window that seats the edge events.
# The unit is second. The greater it is, the more data will be stored.
edge_events_window_size: 2
#java_trace_delete_interval is the interval for cleaning up expired data in javatraces.
#The unit is seconds.
java_trace_delete_interval: 10
#java_trace_expiration_time is the expiration time for data in javatraces.
#The unit is seconds.
java_trace_expiration_time: 30
dxsup marked this conversation as resolved.
Show resolved Hide resolved
tcpconnectanalyzer:
channel_size: 10000
wait_event_second: 10
Expand Down Expand Up @@ -242,4 +248,4 @@ observability:
# Note: DO NOT add the prefix "http://"
endpoint: 10.10.10.10:8080
stdout:
collect_period: 15s
collect_period: 15s
9 changes: 9 additions & 0 deletions collector/pkg/component/analyzer/cpuanalyzer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ type Config struct {
// EdgeEventsWindowSize is the size of the duration window that seats the edge events.
// The unit is seconds. The greater it is, the more data will be stored.
EdgeEventsWindowSize int `mapstructure:"edge_events_window_size"`
//JavaTraceDeleteInterval is the interval for cleaning up expired data in javatraces.
//The unit is seconds.
JavaTraceDeleteInterval int `mapstructure:"java_trace_delete_interval"`
//JavaTraceExpirationTime is the expiration time for data in javatraces.
//The unit is seconds.
JavaTraceExpirationTime int `mapstructure:"java_trace_expiration_time"`

}

func NewDefaultConfig() *Config {
Expand All @@ -29,5 +36,7 @@ func NewDefaultConfig() *Config {
JavaTraceSlowTime: 500,
SegmentSize: 40,
EdgeEventsWindowSize: 2,
JavaTraceDeleteInterval: 10,
JavaTraceExpirationTime: 30,
dxsup marked this conversation as resolved.
Show resolved Hide resolved
}
}
57 changes: 43 additions & 14 deletions collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,23 @@ const (
)

type CpuAnalyzer struct {
cfg *Config
cpuPidEvents map[uint32]map[uint32]*TimeSegments
routineSize *atomic.Int32
lock sync.RWMutex
telemetry *component.TelemetryTools
tidExpiredQueue *tidDeleteQueue
javaTraces map[string]*TransactionIdEvent
nextConsumers []consumer.Consumer
metadata *kubernetes.K8sMetaDataCache
cfg *Config
cpuPidEvents map[uint32]map[uint32]*TimeSegments
routineSize *atomic.Int32
lock sync.RWMutex
telemetry *component.TelemetryTools
tidExpiredQueue *tidDeleteQueue
javaTraces map[JavaTracesKey]*TransactionIdEvent
nextConsumers []consumer.Consumer
metadata *kubernetes.K8sMetaDataCache
cleanerTicker *time.Ticker
stopProfileChan chan struct{}
}

stopProfileChan chan struct{}
type JavaTracesKey struct{
TraceId string
PidString string
StartTime time.Time
}

func (ca *CpuAnalyzer) Type() analyzer.Type {
Expand All @@ -59,20 +65,34 @@ func NewCpuAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consum
}
ca.cpuPidEvents = make(map[uint32]map[uint32]*TimeSegments, 100000)
ca.tidExpiredQueue = newTidDeleteQueue()
ca.javaTraces = make(map[string]*TransactionIdEvent, 100000)
ca.javaTraces = make(map[JavaTracesKey]*TransactionIdEvent, 100000)
newSelfMetrics(telemetry.MeterProvider, ca)
return ca
}

func (ca *CpuAnalyzer) Start() error {
// Disable receiving and sending the profiling data by default.
ca.cleanerTicker = time.NewTicker(time.Duration(ca.cfg.JavaTraceDeleteInterval) * time.Second)
go func() {
for range ca.cleanerTicker.C {
dxsup marked this conversation as resolved.
Show resolved Hide resolved
ca.lock.Lock()
now := time.Now()
for key:= range ca.javaTraces {
if now.Sub(key.StartTime) > time.Duration(ca.cfg.JavaTraceExpirationTime)*time.Second {
delete(ca.javaTraces, key)
fmt.Print("Expired data has been released,pid = " + key.PidString)
dxsup marked this conversation as resolved.
Show resolved Hide resolved
}
}
ca.lock.Unlock()
}
}()
return nil
}

func (ca *CpuAnalyzer) Shutdown() error {
if enableProfile {
_ = ca.StopProfile()
}
ca.cleanerTicker.Stop()
return nil
}

Expand Down Expand Up @@ -116,10 +136,19 @@ func (ca *CpuAnalyzer) ConsumeTransactionIdEvent(event *model.KindlingEvent) {
}

func (ca *CpuAnalyzer) analyzerJavaTraceTime(ev *TransactionIdEvent) {
javatracekey := &JavaTracesKey{
TraceId: ev.TraceId,
PidString: ev.PidString,
StartTime: time.Now(),
dxsup marked this conversation as resolved.
Show resolved Hide resolved
}
if ev.IsEntry == 1 {
ca.javaTraces[ev.TraceId+ev.PidString] = ev
ca.javaTraces[*javatracekey] = ev
} else {
oldEvent := ca.javaTraces[ev.TraceId+ev.PidString]
oldEvent,ok := ca.javaTraces[*javatracekey]
if(!ok){
ca.telemetry.Logger.Warnf("No javaTraces traceid=%d, pid=%s", javatracekey.TraceId,javatracekey.PidString)
return
}
pid, _ := strconv.ParseInt(ev.PidString, 10, 64)
spendTime := ev.Timestamp - oldEvent.Timestamp
contentKey := oldEvent.Url
Expand Down
55 changes: 55 additions & 0 deletions collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package cpuanalyzer

import (
"math/rand"
"strconv"
"testing"
"time"
)

func TestStart(t *testing.T) {
dxsup marked this conversation as resolved.
Show resolved Hide resolved
keys := make([]JavaTracesKey, 10)
for i := 0; i < 10; i++ {
offset := time.Duration(rand.Int63n(int64(10 * time.Second)))
keys[i] = JavaTracesKey{
TraceId: strconv.Itoa(rand.Intn(1000000)),
PidString: strconv.Itoa(rand.Intn(1000000)),
StartTime: time.Now().Add(offset - 5*time.Second),
}
}
tevent := &TransactionIdEvent{
TraceId: "0",
PidString: "1",
}
javaTraces := make(map[JavaTracesKey]*TransactionIdEvent)
for _, key := range keys {
javaTraces[key] = tevent
}
config:= NewDefaultConfig()
ca := &CpuAnalyzer{
javaTraces: javaTraces,
cfg: config,
}
ca.cleanerTicker = time.NewTicker(time.Duration(ca.cfg.JavaTraceDeleteInterval) * time.Second)
go func() {
for range ca.cleanerTicker.C {
ca.lock.Lock()
now := time.Now()
for key:= range ca.javaTraces {
if now.Sub(key.StartTime) > time.Duration(ca.cfg.JavaTraceExpirationTime)*time.Second {
delete(ca.javaTraces, key)
t.Log("已删除pid="+
key.PidString+ ",当前时间:"+
time.Now().Truncate(time.Second).Format("15:04:05")+
",map剩余数量:"+strconv.Itoa(len(javaTraces)))
dxsup marked this conversation as resolved.
Show resolved Hide resolved
}
}
ca.lock.Unlock()
}
}()
time.Sleep(10*time.Minute)
}

func Test(t *testing.T){

}
dxsup marked this conversation as resolved.
Show resolved Hide resolved
8 changes: 7 additions & 1 deletion deploy/agent/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ analyzers:
# edge_events_window_size is the size of the duration window that seats the edge events.
# The unit is second. The greater it is, the more data will be stored.
edge_events_window_size: 2
#java_trace_delete_interval is the interval for cleaning up expired data in javatraces.
#The unit is seconds.
java_trace_delete_interval: 10
#java_trace_expiration_time is the expiration time for data in javatraces.
#The unit is seconds.
java_trace_expiration_time: 30
dxsup marked this conversation as resolved.
Show resolved Hide resolved
tcpconnectanalyzer:
channel_size: 10000
wait_event_second: 10
Expand Down Expand Up @@ -242,4 +248,4 @@ observability:
# Note: DO NOT add the prefix "http://"
endpoint: 10.10.10.10:8080
stdout:
collect_period: 15s
collect_period: 15s