Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a config to cgoreceiver for suppressing events according to processes' comm #495

Merged
merged 9 commits into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

## Unreleased
### Enhancements
- Make suppress processes configurable. ([#495](https://github.com/KindlingProject/kindling/pull/495))
dxsup marked this conversation as resolved.
Show resolved Hide resolved
- Add bind support to get the listening ip and port of a server. ([#493](https://github.com/KindlingProject/kindling/pull/493))
- Add an option `enable_fetch_replicaset` to control whether to fetch ReplicaSet metadata. The default value is false which aims to release pressure on Kubernetes API server. ([#492](https://github.com/KindlingProject/kindling/pull/492))

Expand Down
14 changes: 12 additions & 2 deletions collector/docker/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ receivers:
- name: kretprobe-tcp_connect
- name: kprobe-tcp_set_state
- name: tracepoint-procexit
process_filter:
# the length of a comm should be no more than 16
comms:
- "kindling-collec"
- "containerd"
- "dockerd"
- "containerd-shim"

analyzers:
cpuanalyzer:
# sampling_interval is the sampling interval for the same url. The unit is second.
Expand Down Expand Up @@ -104,13 +112,12 @@ analyzers:
ports: [ 9876, 10911 ]
slow_threshold: 500


processors:
k8smetadataprocessor:
# Set "enable" false if you want to run the agent in the non-Kubernetes environment.
# Otherwise, the agent will panic if it can't connect to the API-server.
enable: true
kube_auth_type: kubeConfig
kube_auth_type: serviceAccount
kube_config_dir: /root/.kube/config
# GraceDeletePeriod controls the delay interval after receiving delete event.
# The unit is seconds, and the default value is 60 seconds.
Expand Down Expand Up @@ -222,6 +229,9 @@ observability:
export_kind: stdout
prometheus:
port: :9501
# Self-metrics for special purpose
# "resource" for agent CPU and memory usage metricss
# extra_metrics: ["resource"]
otlp:
collect_period: 15s
# Note: DO NOT add the prefix "http://"
Expand Down
2 changes: 1 addition & 1 deletion collector/internal/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (a *Application) Shutdown() error {
}

func (a *Application) registerFactory() {
a.componentsFactory.RegisterReceiver(cgoreceiver.Cgo, cgoreceiver.NewCgoReceiver, &cgoreceiver.Config{})
a.componentsFactory.RegisterReceiver(cgoreceiver.Cgo, cgoreceiver.NewCgoReceiver, cgoreceiver.NewDefaultConfig())
a.componentsFactory.RegisterAnalyzer(network.Network.String(), network.NewNetworkAnalyzer, network.NewDefaultConfig())
a.componentsFactory.RegisterAnalyzer(cpuanalyzer.CpuProfile.String(), cpuanalyzer.NewCpuAnalyzer, cpuanalyzer.NewDefaultConfig())
a.componentsFactory.RegisterProcessor(k8sprocessor.K8sMetadata, k8sprocessor.NewKubernetesProcessor, &k8sprocessor.DefaultConfig)
Expand Down
3 changes: 2 additions & 1 deletion collector/pkg/component/receiver/cgoreceiver/cgo_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ extern "C" {
#endif
int runForGo();
int getKindlingEvent(void **kindlingEvent);
int subEventForGo(char* eventName, char* category, void *params);
void suppressEventsCommForGo(char *comm);
void subEventForGo(char* eventName, char* category, void *params);
int startProfile();
int stopProfile();
char* startAttachAgent(int pid);
Expand Down
11 changes: 11 additions & 0 deletions collector/pkg/component/receiver/cgoreceiver/cgoreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (r *CgoReceiver) Start() error {
go r.getCaptureStatistics()
go r.catchSignalUp()
time.Sleep(2 * time.Second)
r.suppressEventsComm()
_ = r.subEvent()
// Wait for the C routine running
time.Sleep(2 * time.Second)
Expand Down Expand Up @@ -182,6 +183,16 @@ func (r *CgoReceiver) sendToNextConsumer(evt *model.KindlingEvent) error {
return nil
}

func (r *CgoReceiver) suppressEventsComm() {
comms := r.cfg.ProcessFilterInfo.Comms
if len(comms) > 0 {
r.telemetry.Logger.Infof("Filter out process with command: %v", comms)
}
for _, comm := range comms {
C.suppressEventsCommForGo(C.CString(comm))
dxsup marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (r *CgoReceiver) subEvent() error {
if len(r.cfg.SubscribeInfo) == 0 {
r.telemetry.Logger.Warn("No events are subscribed by cgoreceiver. Please check your configuration.")
Expand Down
15 changes: 14 additions & 1 deletion collector/pkg/component/receiver/cgoreceiver/config.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
package cgoreceiver

type Config struct {
SubscribeInfo []SubEvent `mapstructure:"subscribe"`
SubscribeInfo []SubEvent `mapstructure:"subscribe"`
ProcessFilterInfo ProcessFilter `mapstructure:"process_filter"`
}

type SubEvent struct {
Category string `mapstructure:"category"`
Name string `mapstructure:"name"`
Params map[string]string `mapstructure:"params"`
}

type ProcessFilter struct {
Comms []string `mapstructure:"comms"`
}

func NewDefaultConfig() *Config {
dxsup marked this conversation as resolved.
Show resolved Hide resolved
return &Config{
ProcessFilterInfo: ProcessFilter{
Comms: []string{"kindling-collec", "containerd", "dockerd", "containerd-shim"},
},
}
}
12 changes: 9 additions & 3 deletions deploy/agent/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ receivers:
category: net
- name: syscall_exit-sendmmsg
category: net
- name: syscall_exit-recvmmsg
category: net
- name: kprobe-tcp_close
- name: kprobe-tcp_rcv_established
- name: kprobe-tcp_drop
Expand All @@ -35,6 +33,14 @@ receivers:
- name: kretprobe-tcp_connect
- name: kprobe-tcp_set_state
- name: tracepoint-procexit
process_filter:
# the length of a comm should be no more than 16
comms:
- "kindling-collec"
- "containerd"
- "dockerd"
- "containerd-shim"

analyzers:
cpuanalyzer:
# sampling_interval is the sampling interval for the same url. The unit is second.
Expand Down Expand Up @@ -231,4 +237,4 @@ observability:
# Note: DO NOT add the prefix "http://"
endpoint: 10.10.10.10:8080
stdout:
collect_period: 15s
collect_period: 15s
4 changes: 2 additions & 2 deletions probe/src/cgo/cgo_func.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ int runForGo() { return init_probe(); }

int getKindlingEvent(void** kindlingEvent) { return getEvent(kindlingEvent); }


int startProfile() { return start_profile(); }
int stopProfile() { return stop_profile(); }

char* startAttachAgent(int pid) { return start_attach_agent(pid); }
char* stopAttachAgent(int pid) { return stop_attach_agent(pid); }

void suppressEventsCommForGo(char *comm) { suppress_events_comm(string(comm)); }
void subEventForGo(char* eventName, char* category, void *params) { sub_event(eventName, category, (event_params_for_subscribe *)params); }
void startProfileDebug(int pid, int tid) { start_profile_debug(pid, tid); }

void startProfileDebug(int pid, int tid) { start_profile_debug(pid, tid); }
void stopProfileDebug() { stop_profile_debug(); }

void getCaptureStatistics() { get_capture_statistics(); }
Expand Down
1 change: 1 addition & 0 deletions probe/src/cgo/cgo_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ extern "C" {
#endif
int runForGo();
int getKindlingEvent(void** kindlingEvent);
void suppressEventsCommForGo(char *comm);
void subEventForGo(char* eventName, char* category, void* params);
int startProfile();
int stopProfile();
Expand Down
13 changes: 4 additions & 9 deletions probe/src/cgo/kindling.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,10 @@ void sub_event(char* eventName, char* category, event_params_for_subscribe param
}
}

void suppress_events_comm(sinsp* inspector) {
const string comms[] = {"kindling-collec", "sshd", "containerd", "dockerd",
"containerd-shim", "kubelet", "kube-apiserver", "etcd",
"kube-controller", "kube-scheduler", "kube-rbac-proxy", "prometheus",
"node_exporter", "alertmanager", "adapter"};
for (auto& comm : comms) {
inspector->suppress_events_comm(comm);
}
void suppress_events_comm(string comm) {
printCurrentTime();
cout << "suppress_events for process " << comm << endl;
inspector->suppress_events_comm(comm);
}

void set_eventmask(sinsp* inspector) {
Expand Down Expand Up @@ -154,7 +150,6 @@ int init_probe() {
formatter = new sinsp_evt_formatter(inspector, output_format);
inspector->set_hostname_and_port_resolution_mode(false);
set_snaplen(inspector);
suppress_events_comm(inspector);
inspector->open("");
set_eventmask(inspector);

Expand Down
3 changes: 3 additions & 0 deletions probe/src/cgo/kindling.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ void get_capture_statistics();
uint16_t get_protocol(scap_l4_proto proto);
uint16_t get_type(ppm_param_type type);
uint16_t get_kindling_source(uint16_t etype);

void suppress_events_comm(string comm);

struct event {
string event_name;
ppm_event_type event_type;
Expand Down