Skip to content

Commit

Permalink
Add a config to cgoreceiver for suppressing events according to proce…
Browse files Browse the repository at this point in the history
…sses' comm (#495)

Signed-off-by: sanyangji <songyujie@zju.edu.cn>
  • Loading branch information
sanyangji authored Apr 4, 2023
1 parent 9082a6e commit fe1a52e
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

## Unreleased
### Enhancements
- Add a config to cgoreceiver for suppressing events according to processes' comm ([#495](https://github.com/KindlingProject/kindling/pull/495))
- Add bind support to get the listening ip and port of a server. ([#493](https://github.com/KindlingProject/kindling/pull/493))
- Add an option `enable_fetch_replicaset` to control whether to fetch ReplicaSet metadata. The default value is false which aims to release pressure on Kubernetes API server. ([#492](https://github.com/KindlingProject/kindling/pull/492))

Expand Down
14 changes: 12 additions & 2 deletions collector/docker/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ receivers:
- name: kretprobe-tcp_connect
- name: kprobe-tcp_set_state
- name: tracepoint-procexit
process_filter:
# the length of a comm should be no more than 16
comms:
- "kindling-collec"
- "containerd"
- "dockerd"
- "containerd-shim"

analyzers:
cpuanalyzer:
# sampling_interval is the sampling interval for the same url. The unit is second.
Expand Down Expand Up @@ -104,13 +112,12 @@ analyzers:
ports: [ 9876, 10911 ]
slow_threshold: 500


processors:
k8smetadataprocessor:
# Set "enable" false if you want to run the agent in the non-Kubernetes environment.
# Otherwise, the agent will panic if it can't connect to the API-server.
enable: true
kube_auth_type: kubeConfig
kube_auth_type: serviceAccount
kube_config_dir: /root/.kube/config
# GraceDeletePeriod controls the delay interval after receiving delete event.
# The unit is seconds, and the default value is 60 seconds.
Expand Down Expand Up @@ -222,6 +229,9 @@ observability:
export_kind: stdout
prometheus:
port: :9501
# Self-metrics for special purpose
# "resource" for agent CPU and memory usage metricss
# extra_metrics: ["resource"]
otlp:
collect_period: 15s
# Note: DO NOT add the prefix "http://"
Expand Down
2 changes: 1 addition & 1 deletion collector/internal/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (a *Application) Shutdown() error {
}

func (a *Application) registerFactory() {
a.componentsFactory.RegisterReceiver(cgoreceiver.Cgo, cgoreceiver.NewCgoReceiver, &cgoreceiver.Config{})
a.componentsFactory.RegisterReceiver(cgoreceiver.Cgo, cgoreceiver.NewCgoReceiver, cgoreceiver.NewDefaultConfig())
a.componentsFactory.RegisterAnalyzer(network.Network.String(), network.NewNetworkAnalyzer, network.NewDefaultConfig())
a.componentsFactory.RegisterAnalyzer(cpuanalyzer.CpuProfile.String(), cpuanalyzer.NewCpuAnalyzer, cpuanalyzer.NewDefaultConfig())
a.componentsFactory.RegisterProcessor(k8sprocessor.K8sMetadata, k8sprocessor.NewKubernetesProcessor, &k8sprocessor.DefaultConfig)
Expand Down
3 changes: 2 additions & 1 deletion collector/pkg/component/receiver/cgoreceiver/cgo_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ extern "C" {
#endif
int runForGo();
int getKindlingEvent(void **kindlingEvent);
int subEventForGo(char* eventName, char* category, void *params);
void suppressEventsCommForGo(char *comm);
void subEventForGo(char* eventName, char* category, void *params);
int startProfile();
int stopProfile();
char* startAttachAgent(int pid);
Expand Down
19 changes: 18 additions & 1 deletion collector/pkg/component/receiver/cgoreceiver/cgoreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (r *CgoReceiver) Start() error {
go r.getCaptureStatistics()
go r.catchSignalUp()
time.Sleep(2 * time.Second)
r.suppressEventsComm()
_ = r.subEvent()
// Wait for the C routine running
time.Sleep(2 * time.Second)
Expand Down Expand Up @@ -182,6 +183,18 @@ func (r *CgoReceiver) sendToNextConsumer(evt *model.KindlingEvent) error {
return nil
}

func (r *CgoReceiver) suppressEventsComm() {
comms := r.cfg.ProcessFilterInfo.Comms
if len(comms) > 0 {
r.telemetry.Logger.Infof("Filter out process with command: %v", comms)
}
for _, comm := range comms {
csComm := C.CString(comm)
C.suppressEventsCommForGo(csComm)
C.free(unsafe.Pointer(csComm))
}
}

func (r *CgoReceiver) subEvent() error {
if len(r.cfg.SubscribeInfo) == 0 {
r.telemetry.Logger.Warn("No events are subscribed by cgoreceiver. Please check your configuration.")
Expand All @@ -194,7 +207,11 @@ func (r *CgoReceiver) subEvent() error {
var temp CEventParamsForSubscribe
temp.name = C.CString("terminator")
paramsList = append(paramsList, temp)
C.subEventForGo(C.CString(value.Name), C.CString(value.Category), (unsafe.Pointer)(&paramsList[0]))
csName := C.CString(value.Name)
csCategory := C.CString(value.Category)
C.subEventForGo(csName, csCategory, (unsafe.Pointer)(&paramsList[0]))
C.free(unsafe.Pointer(csName))
C.free(unsafe.Pointer(csCategory))
}
return nil
}
Expand Down
77 changes: 76 additions & 1 deletion collector/pkg/component/receiver/cgoreceiver/config.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,86 @@
package cgoreceiver

type Config struct {
SubscribeInfo []SubEvent `mapstructure:"subscribe"`
SubscribeInfo []SubEvent `mapstructure:"subscribe"`
ProcessFilterInfo ProcessFilter `mapstructure:"process_filter"`
}

type SubEvent struct {
Category string `mapstructure:"category"`
Name string `mapstructure:"name"`
Params map[string]string `mapstructure:"params"`
}

type ProcessFilter struct {
Comms []string `mapstructure:"comms"`
}

func NewDefaultConfig() *Config {
return &Config{
SubscribeInfo: []SubEvent{
{
Name: "syscall_exit-writev",
Category: "net",
},
{
Name: "syscall_exit-readv",
Category: "net",
},
{
Name: "syscall_exit-write",
Category: "net",
},
{
Name: "syscall_exit-read",
Category: "net",
},
{
Name: "syscall_exit-sendto",
Category: "net",
},
{
Name: "syscall_exit-recvfrom",
Category: "net",
},
{
Name: "syscall_exit-sendmsg",
Category: "net",
},
{
Name: "syscall_exit-recvmsg",
Category: "net",
},
{
Name: "syscall_exit-sendmmsg",
Category: "net",
},
{
Name: "kprobe-tcp_close",
},
{
Name: "kprobe-tcp_rcv_established",
},
{
Name: "kprobe-tcp_drop",
},
{
Name: "kprobe-tcp_retransmit_skb",
},
{
Name: "syscall_exit-connect",
},
{
Name: "kretprobe-tcp_connect",
},
{
Name: "kprobe-tcp_set_state",
},
{
Name: "tracepoint-procexit",
},
},
ProcessFilterInfo: ProcessFilter{
Comms: []string{"kindling-collec", "containerd", "dockerd", "containerd-shim"},
},
}
}
12 changes: 9 additions & 3 deletions deploy/agent/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ receivers:
category: net
- name: syscall_exit-sendmmsg
category: net
- name: syscall_exit-recvmmsg
category: net
- name: kprobe-tcp_close
- name: kprobe-tcp_rcv_established
- name: kprobe-tcp_drop
Expand All @@ -35,6 +33,14 @@ receivers:
- name: kretprobe-tcp_connect
- name: kprobe-tcp_set_state
- name: tracepoint-procexit
process_filter:
# the length of a comm should be no more than 16
comms:
- "kindling-collec"
- "containerd"
- "dockerd"
- "containerd-shim"

analyzers:
cpuanalyzer:
# sampling_interval is the sampling interval for the same url. The unit is second.
Expand Down Expand Up @@ -231,4 +237,4 @@ observability:
# Note: DO NOT add the prefix "http://"
endpoint: 10.10.10.10:8080
stdout:
collect_period: 15s
collect_period: 15s
4 changes: 2 additions & 2 deletions probe/src/cgo/cgo_func.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ int runForGo() { return init_probe(); }

int getKindlingEvent(void** kindlingEvent) { return getEvent(kindlingEvent); }


int startProfile() { return start_profile(); }
int stopProfile() { return stop_profile(); }

char* startAttachAgent(int pid) { return start_attach_agent(pid); }
char* stopAttachAgent(int pid) { return stop_attach_agent(pid); }

void suppressEventsCommForGo(char *comm) { suppress_events_comm(string(comm)); }
void subEventForGo(char* eventName, char* category, void *params) { sub_event(eventName, category, (event_params_for_subscribe *)params); }
void startProfileDebug(int pid, int tid) { start_profile_debug(pid, tid); }

void startProfileDebug(int pid, int tid) { start_profile_debug(pid, tid); }
void stopProfileDebug() { stop_profile_debug(); }

void getCaptureStatistics() { get_capture_statistics(); }
Expand Down
1 change: 1 addition & 0 deletions probe/src/cgo/cgo_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ extern "C" {
#endif
int runForGo();
int getKindlingEvent(void** kindlingEvent);
void suppressEventsCommForGo(char *comm);
void subEventForGo(char* eventName, char* category, void* params);
int startProfile();
int stopProfile();
Expand Down
13 changes: 4 additions & 9 deletions probe/src/cgo/kindling.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,10 @@ void sub_event(char* eventName, char* category, event_params_for_subscribe param
}
}

void suppress_events_comm(sinsp* inspector) {
const string comms[] = {"kindling-collec", "sshd", "containerd", "dockerd",
"containerd-shim", "kubelet", "kube-apiserver", "etcd",
"kube-controller", "kube-scheduler", "kube-rbac-proxy", "prometheus",
"node_exporter", "alertmanager", "adapter"};
for (auto& comm : comms) {
inspector->suppress_events_comm(comm);
}
void suppress_events_comm(string comm) {
printCurrentTime();
cout << "suppress_events for process " << comm << endl;
inspector->suppress_events_comm(comm);
}

void set_eventmask(sinsp* inspector) {
Expand Down Expand Up @@ -154,7 +150,6 @@ int init_probe() {
formatter = new sinsp_evt_formatter(inspector, output_format);
inspector->set_hostname_and_port_resolution_mode(false);
set_snaplen(inspector);
suppress_events_comm(inspector);
inspector->open("");
set_eventmask(inspector);

Expand Down
3 changes: 3 additions & 0 deletions probe/src/cgo/kindling.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ void get_capture_statistics();
uint16_t get_protocol(scap_l4_proto proto);
uint16_t get_type(ppm_param_type type);
uint16_t get_kindling_source(uint16_t etype);

void suppress_events_comm(string comm);

struct event {
string event_name;
ppm_event_type event_type;
Expand Down

0 comments on commit fe1a52e

Please sign in to comment.