Skip to content

Commit

Permalink
Batch event retrieval in cgo for performance enhancement (#560)
Browse files Browse the repository at this point in the history
Signed-off-by: YDMsama <ydmsama@gmail.com>
Signed-off-by: YDMsama <127646431+YDMsama@users.noreply.github.com>
  • Loading branch information
YDMsama authored Aug 28, 2023
1 parent 9da153e commit d937235
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- Expand the histogram bucket of otelexpoerter (Add 1500ms). ([#563](https://github.com/KindlingProject/kindling/pull/563))
- Set default values of `store_external_src_ip` and `StoreExternalSrcIP` to false to reduce occurrences of unexpected src IP data. ([#562](https://github.com/KindlingProject/kindling/pull/562))
- Optimized the `networkanalyzer` component of the probe analyzer by utilizing Go's goroutines, enabling concurrent execution. ([#558](https://github.com/KindlingProject/kindling/pull/558))
- Improved event processing efficiency with batch event retrieval in cgo. ([#560](https://github.com/KindlingProject/kindling/pull/560))

### Bug fixes
- Enhance dns with udp which is out of order.([#565](https://github.com/KindlingProject/kindling/pull/565))
Expand Down
2 changes: 1 addition & 1 deletion collector/pkg/component/receiver/cgoreceiver/cgo_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
extern "C" {
#endif
int runForGo();
int getKindlingEvent(void **kindlingEvent);
int getEventsByInterval(int interval, void** kindlingEvent, void* count);
void suppressEventsCommForGo(char *comm);
void subEventForGo(char* eventName, char* category, void *params);
int startProfile();
Expand Down
40 changes: 22 additions & 18 deletions collector/pkg/component/receiver/cgoreceiver/cgoreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,30 +75,34 @@ func (r *CgoReceiver) Start() error {
// Wait for the C routine running
time.Sleep(2 * time.Second)
go r.consumeEvents()
go r.startGetEvent()
go r.startGetEvents()
go r.getCaptureStatisticsByInterval(15 * time.Second)

return nil
}

func (r *CgoReceiver) startGetEvent() {
var pKindlingEvent unsafe.Pointer
r.shutdownWG.Add(1)
for {
select {
case <-r.stopCh:
r.shutdownWG.Done()
return
default:
res := int(C.getKindlingEvent(&pKindlingEvent))
if res == 1 {
event := convertEvent((*CKindlingEventForGo)(pKindlingEvent))
r.eventChannel <- event
r.stats.add(event.Name, 1)
}
}
}
func (r *CgoReceiver) startGetEvents() {
var count = 0
var pKindlingEvents [1000]unsafe.Pointer
r.shutdownWG.Add(1)

for {
select {
case <-r.stopCh:
r.shutdownWG.Done()
return
default:
evt_count := int(C.getEventsByInterval(C.int(100000000), &pKindlingEvents[0], (unsafe.Pointer)(&count)))
for i := 0; i < evt_count; i++ {
event := convertEvent((*CKindlingEventForGo)(pKindlingEvents[i]))
r.eventChannel <- event
r.stats.add(event.Name, 1)
}
}
}
}


func (r *CgoReceiver) consumeEvents() {
r.shutdownWG.Add(1)
for {
Expand Down
4 changes: 3 additions & 1 deletion probe/src/cgo/cgo_func.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@

int runForGo() { return init_probe(); }

int getKindlingEvent(void** kindlingEvent) { return getEvent(kindlingEvent); }
int getEventsByInterval(int interval, void** kindlingEvent, void* count) {
return get_events_by_interval((uint64_t)interval, kindlingEvent, count);
}

int startProfile() { return start_profile(); }
int stopProfile() { return stop_profile(); }
Expand Down
2 changes: 1 addition & 1 deletion probe/src/cgo/cgo_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
extern "C" {
#endif
int runForGo();
int getKindlingEvent(void** kindlingEvent);
int getEventsByInterval(int interval, void** kindlingEvent, void* count);
void suppressEventsCommForGo(char *comm);
void subEventForGo(char* eventName, char* category, void* params);
int startProfile();
Expand Down
27 changes: 26 additions & 1 deletion probe/src/cgo/kindling.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ char* duration_char = new char[32];
char* span_char = new char[1024];

int16_t event_filters[1024][16];
uint64_t receiver_ts = 0;

void init_sub_label() {
for (auto e : kindling_to_sysdig) {
Expand Down Expand Up @@ -89,6 +90,24 @@ void sub_event(char* eventName, char* category, event_params_for_subscribe param
}
}

#define MAX_EVENTS 998
int get_events_by_interval(uint64_t interval, void** kindlingEvents, void* count) {
uint64_t tmp_ts = receiver_ts;
int i = 0;
while (true) {
if (getEvent(interval, &kindlingEvents[i], (int*)count) == 1){
if(i >= MAX_EVENTS) {
break;
}
i++;
}
if (tmp_ts != receiver_ts) {
break;
}
}
return i;
}

void suppress_events_comm(string comm) {
printCurrentTime();
cout << "suppress_events for process " << comm << endl;
Expand Down Expand Up @@ -161,7 +180,7 @@ int init_probe() {
return 0;
}

int getEvent(void** pp_kindling_event) {
int getEvent(uint64_t interval, void** pp_kindling_event, int* event_count) {
int32_t res;
sinsp_evt* ev;
res = inspector->next(&ev);
Expand Down Expand Up @@ -192,6 +211,7 @@ int getEvent(void** pp_kindling_event) {
}
uint16_t kindling_category = get_kindling_category(ev);
uint16_t ev_type = ev->get_type();
int evtcnt = *event_count;

print_event(ev);
if (ev_type != PPME_CPU_ANALYSIS_E && is_profile_debug && threadInfo->m_tid == debug_tid &&
Expand Down Expand Up @@ -438,6 +458,11 @@ int getEvent(void** pp_kindling_event) {
}
strcpy(p_kindling_event->context.tinfo.comm, tmp_comm);
strcpy(p_kindling_event->context.tinfo.containerId, (char*)threadInfo->m_container_id.data());
evtcnt++;
*event_count = evtcnt;
if (ev->get_ts() - receiver_ts >= interval || *event_count > 998) {
receiver_ts = ev->get_ts();
}
return 1;
}

Expand Down
4 changes: 3 additions & 1 deletion probe/src/cgo/kindling.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ void stop_perf();

void exipre_window_cache();

int getEvent(void** kindlingEvent);
int getEvent(uint64_t interval, void** kindlingEvent, int* event_count);

int get_events_by_interval(uint64_t interval, void** kindlingEvent, void* count);

uint16_t get_kindling_category(sinsp_evt* sEvt);

Expand Down

0 comments on commit d937235

Please sign in to comment.