From fe1a52e70ed4864d9ed709d13fa3d6412f5e221d Mon Sep 17 00:00:00 2001 From: sanyangji Date: Tue, 4 Apr 2023 17:15:06 +0800 Subject: [PATCH] Add a config to cgoreceiver for suppressing events according to processes' comm (#495) Signed-off-by: sanyangji --- CHANGELOG.md | 1 + .../docker/kindling-collector-config.yml | 14 +++- collector/internal/application/application.go | 2 +- .../component/receiver/cgoreceiver/cgo_func.h | 3 +- .../receiver/cgoreceiver/cgoreceiver.go | 19 ++++- .../component/receiver/cgoreceiver/config.go | 77 ++++++++++++++++++- deploy/agent/kindling-collector-config.yml | 12 ++- probe/src/cgo/cgo_func.cpp | 4 +- probe/src/cgo/cgo_func.h | 1 + probe/src/cgo/kindling.cpp | 13 +--- probe/src/cgo/kindling.h | 3 + 11 files changed, 129 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e370a720..59ee8ffae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ## Unreleased ### Enhancements +- Add a config to cgoreceiver for suppressing events according to processes' comm ([#495](https://github.com/KindlingProject/kindling/pull/495)) - Add bind support to get the listening ip and port of a server. ([#493](https://github.com/KindlingProject/kindling/pull/493)) - Add an option `enable_fetch_replicaset` to control whether to fetch ReplicaSet metadata. The default value is false which aims to release pressure on Kubernetes API server. ([#492](https://github.com/KindlingProject/kindling/pull/492)) diff --git a/collector/docker/kindling-collector-config.yml b/collector/docker/kindling-collector-config.yml index d343e69df..7c630dd14 100644 --- a/collector/docker/kindling-collector-config.yml +++ b/collector/docker/kindling-collector-config.yml @@ -33,6 +33,14 @@ receivers: - name: kretprobe-tcp_connect - name: kprobe-tcp_set_state - name: tracepoint-procexit + process_filter: + # the length of a comm should be no more than 16 + comms: + - "kindling-collec" + - "containerd" + - "dockerd" + - "containerd-shim" + analyzers: cpuanalyzer: # sampling_interval is the sampling interval for the same url. The unit is second. @@ -104,13 +112,12 @@ analyzers: ports: [ 9876, 10911 ] slow_threshold: 500 - processors: k8smetadataprocessor: # Set "enable" false if you want to run the agent in the non-Kubernetes environment. # Otherwise, the agent will panic if it can't connect to the API-server. enable: true - kube_auth_type: kubeConfig + kube_auth_type: serviceAccount kube_config_dir: /root/.kube/config # GraceDeletePeriod controls the delay interval after receiving delete event. # The unit is seconds, and the default value is 60 seconds. @@ -222,6 +229,9 @@ observability: export_kind: stdout prometheus: port: :9501 + # Self-metrics for special purpose + # "resource" for agent CPU and memory usage metricss + # extra_metrics: ["resource"] otlp: collect_period: 15s # Note: DO NOT add the prefix "http://" diff --git a/collector/internal/application/application.go b/collector/internal/application/application.go index e481c6f3a..5dd1e19c4 100644 --- a/collector/internal/application/application.go +++ b/collector/internal/application/application.go @@ -75,7 +75,7 @@ func (a *Application) Shutdown() error { } func (a *Application) registerFactory() { - a.componentsFactory.RegisterReceiver(cgoreceiver.Cgo, cgoreceiver.NewCgoReceiver, &cgoreceiver.Config{}) + a.componentsFactory.RegisterReceiver(cgoreceiver.Cgo, cgoreceiver.NewCgoReceiver, cgoreceiver.NewDefaultConfig()) a.componentsFactory.RegisterAnalyzer(network.Network.String(), network.NewNetworkAnalyzer, network.NewDefaultConfig()) a.componentsFactory.RegisterAnalyzer(cpuanalyzer.CpuProfile.String(), cpuanalyzer.NewCpuAnalyzer, cpuanalyzer.NewDefaultConfig()) a.componentsFactory.RegisterProcessor(k8sprocessor.K8sMetadata, k8sprocessor.NewKubernetesProcessor, &k8sprocessor.DefaultConfig) diff --git a/collector/pkg/component/receiver/cgoreceiver/cgo_func.h b/collector/pkg/component/receiver/cgoreceiver/cgo_func.h index 19a7077c3..fa49a5615 100644 --- a/collector/pkg/component/receiver/cgoreceiver/cgo_func.h +++ b/collector/pkg/component/receiver/cgoreceiver/cgo_func.h @@ -10,7 +10,8 @@ extern "C" { #endif int runForGo(); int getKindlingEvent(void **kindlingEvent); -int subEventForGo(char* eventName, char* category, void *params); +void suppressEventsCommForGo(char *comm); +void subEventForGo(char* eventName, char* category, void *params); int startProfile(); int stopProfile(); char* startAttachAgent(int pid); diff --git a/collector/pkg/component/receiver/cgoreceiver/cgoreceiver.go b/collector/pkg/component/receiver/cgoreceiver/cgoreceiver.go index a8ca8f686..e9cb4f860 100644 --- a/collector/pkg/component/receiver/cgoreceiver/cgoreceiver.go +++ b/collector/pkg/component/receiver/cgoreceiver/cgoreceiver.go @@ -68,6 +68,7 @@ func (r *CgoReceiver) Start() error { go r.getCaptureStatistics() go r.catchSignalUp() time.Sleep(2 * time.Second) + r.suppressEventsComm() _ = r.subEvent() // Wait for the C routine running time.Sleep(2 * time.Second) @@ -182,6 +183,18 @@ func (r *CgoReceiver) sendToNextConsumer(evt *model.KindlingEvent) error { return nil } +func (r *CgoReceiver) suppressEventsComm() { + comms := r.cfg.ProcessFilterInfo.Comms + if len(comms) > 0 { + r.telemetry.Logger.Infof("Filter out process with command: %v", comms) + } + for _, comm := range comms { + csComm := C.CString(comm) + C.suppressEventsCommForGo(csComm) + C.free(unsafe.Pointer(csComm)) + } +} + func (r *CgoReceiver) subEvent() error { if len(r.cfg.SubscribeInfo) == 0 { r.telemetry.Logger.Warn("No events are subscribed by cgoreceiver. Please check your configuration.") @@ -194,7 +207,11 @@ func (r *CgoReceiver) subEvent() error { var temp CEventParamsForSubscribe temp.name = C.CString("terminator") paramsList = append(paramsList, temp) - C.subEventForGo(C.CString(value.Name), C.CString(value.Category), (unsafe.Pointer)(¶msList[0])) + csName := C.CString(value.Name) + csCategory := C.CString(value.Category) + C.subEventForGo(csName, csCategory, (unsafe.Pointer)(¶msList[0])) + C.free(unsafe.Pointer(csName)) + C.free(unsafe.Pointer(csCategory)) } return nil } diff --git a/collector/pkg/component/receiver/cgoreceiver/config.go b/collector/pkg/component/receiver/cgoreceiver/config.go index afc4c4692..4dbfcf4f8 100644 --- a/collector/pkg/component/receiver/cgoreceiver/config.go +++ b/collector/pkg/component/receiver/cgoreceiver/config.go @@ -1,7 +1,8 @@ package cgoreceiver type Config struct { - SubscribeInfo []SubEvent `mapstructure:"subscribe"` + SubscribeInfo []SubEvent `mapstructure:"subscribe"` + ProcessFilterInfo ProcessFilter `mapstructure:"process_filter"` } type SubEvent struct { @@ -9,3 +10,77 @@ type SubEvent struct { Name string `mapstructure:"name"` Params map[string]string `mapstructure:"params"` } + +type ProcessFilter struct { + Comms []string `mapstructure:"comms"` +} + +func NewDefaultConfig() *Config { + return &Config{ + SubscribeInfo: []SubEvent{ + { + Name: "syscall_exit-writev", + Category: "net", + }, + { + Name: "syscall_exit-readv", + Category: "net", + }, + { + Name: "syscall_exit-write", + Category: "net", + }, + { + Name: "syscall_exit-read", + Category: "net", + }, + { + Name: "syscall_exit-sendto", + Category: "net", + }, + { + Name: "syscall_exit-recvfrom", + Category: "net", + }, + { + Name: "syscall_exit-sendmsg", + Category: "net", + }, + { + Name: "syscall_exit-recvmsg", + Category: "net", + }, + { + Name: "syscall_exit-sendmmsg", + Category: "net", + }, + { + Name: "kprobe-tcp_close", + }, + { + Name: "kprobe-tcp_rcv_established", + }, + { + Name: "kprobe-tcp_drop", + }, + { + Name: "kprobe-tcp_retransmit_skb", + }, + { + Name: "syscall_exit-connect", + }, + { + Name: "kretprobe-tcp_connect", + }, + { + Name: "kprobe-tcp_set_state", + }, + { + Name: "tracepoint-procexit", + }, + }, + ProcessFilterInfo: ProcessFilter{ + Comms: []string{"kindling-collec", "containerd", "dockerd", "containerd-shim"}, + }, + } +} diff --git a/deploy/agent/kindling-collector-config.yml b/deploy/agent/kindling-collector-config.yml index e8249a502..7c630dd14 100644 --- a/deploy/agent/kindling-collector-config.yml +++ b/deploy/agent/kindling-collector-config.yml @@ -25,8 +25,6 @@ receivers: category: net - name: syscall_exit-sendmmsg category: net - - name: syscall_exit-recvmmsg - category: net - name: kprobe-tcp_close - name: kprobe-tcp_rcv_established - name: kprobe-tcp_drop @@ -35,6 +33,14 @@ receivers: - name: kretprobe-tcp_connect - name: kprobe-tcp_set_state - name: tracepoint-procexit + process_filter: + # the length of a comm should be no more than 16 + comms: + - "kindling-collec" + - "containerd" + - "dockerd" + - "containerd-shim" + analyzers: cpuanalyzer: # sampling_interval is the sampling interval for the same url. The unit is second. @@ -231,4 +237,4 @@ observability: # Note: DO NOT add the prefix "http://" endpoint: 10.10.10.10:8080 stdout: - collect_period: 15s + collect_period: 15s \ No newline at end of file diff --git a/probe/src/cgo/cgo_func.cpp b/probe/src/cgo/cgo_func.cpp index 69f7f1123..66e45640a 100644 --- a/probe/src/cgo/cgo_func.cpp +++ b/probe/src/cgo/cgo_func.cpp @@ -10,16 +10,16 @@ int runForGo() { return init_probe(); } int getKindlingEvent(void** kindlingEvent) { return getEvent(kindlingEvent); } - int startProfile() { return start_profile(); } int stopProfile() { return stop_profile(); } char* startAttachAgent(int pid) { return start_attach_agent(pid); } char* stopAttachAgent(int pid) { return stop_attach_agent(pid); } +void suppressEventsCommForGo(char *comm) { suppress_events_comm(string(comm)); } void subEventForGo(char* eventName, char* category, void *params) { sub_event(eventName, category, (event_params_for_subscribe *)params); } -void startProfileDebug(int pid, int tid) { start_profile_debug(pid, tid); } +void startProfileDebug(int pid, int tid) { start_profile_debug(pid, tid); } void stopProfileDebug() { stop_profile_debug(); } void getCaptureStatistics() { get_capture_statistics(); } diff --git a/probe/src/cgo/cgo_func.h b/probe/src/cgo/cgo_func.h index dfb6ded65..971291395 100644 --- a/probe/src/cgo/cgo_func.h +++ b/probe/src/cgo/cgo_func.h @@ -10,6 +10,7 @@ extern "C" { #endif int runForGo(); int getKindlingEvent(void** kindlingEvent); +void suppressEventsCommForGo(char *comm); void subEventForGo(char* eventName, char* category, void* params); int startProfile(); int stopProfile(); diff --git a/probe/src/cgo/kindling.cpp b/probe/src/cgo/kindling.cpp index 6890f97f5..5c382b412 100644 --- a/probe/src/cgo/kindling.cpp +++ b/probe/src/cgo/kindling.cpp @@ -89,14 +89,10 @@ void sub_event(char* eventName, char* category, event_params_for_subscribe param } } -void suppress_events_comm(sinsp* inspector) { - const string comms[] = {"kindling-collec", "sshd", "containerd", "dockerd", - "containerd-shim", "kubelet", "kube-apiserver", "etcd", - "kube-controller", "kube-scheduler", "kube-rbac-proxy", "prometheus", - "node_exporter", "alertmanager", "adapter"}; - for (auto& comm : comms) { - inspector->suppress_events_comm(comm); - } +void suppress_events_comm(string comm) { + printCurrentTime(); + cout << "suppress_events for process " << comm << endl; + inspector->suppress_events_comm(comm); } void set_eventmask(sinsp* inspector) { @@ -154,7 +150,6 @@ int init_probe() { formatter = new sinsp_evt_formatter(inspector, output_format); inspector->set_hostname_and_port_resolution_mode(false); set_snaplen(inspector); - suppress_events_comm(inspector); inspector->open(""); set_eventmask(inspector); diff --git a/probe/src/cgo/kindling.h b/probe/src/cgo/kindling.h index 353cce0c3..e3dc66447 100644 --- a/probe/src/cgo/kindling.h +++ b/probe/src/cgo/kindling.h @@ -53,6 +53,9 @@ void get_capture_statistics(); uint16_t get_protocol(scap_l4_proto proto); uint16_t get_type(ppm_param_type type); uint16_t get_kindling_source(uint16_t etype); + +void suppress_events_comm(string comm); + struct event { string event_name; ppm_event_type event_type;