Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch event retrieval in cgo for performance enhancement #560

Merged
merged 6 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- Expand the histogram bucket of otelexpoerter (Add 1500ms). ([#563](https://github.com/KindlingProject/kindling/pull/563))
- Set default values of `store_external_src_ip` and `StoreExternalSrcIP` to false to reduce occurrences of unexpected src IP data. ([#562](https://github.com/KindlingProject/kindling/pull/562))
- Optimized the `networkanalyzer` component of the probe analyzer by utilizing Go's goroutines, enabling concurrent execution. ([#558](https://github.com/KindlingProject/kindling/pull/558))
- Improved event processing efficiency with batch event retrieval in cgo. ([#560](https://github.com/KindlingProject/kindling/pull/560))

### Bug fixes
- Enhance dns with udp which is out of order.([#565](https://github.com/KindlingProject/kindling/pull/565))
Expand Down
2 changes: 1 addition & 1 deletion collector/pkg/component/receiver/cgoreceiver/cgo_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
extern "C" {
#endif
int runForGo();
int getKindlingEvent(void **kindlingEvent);
int getEventsByInterval(int interval, void** kindlingEvent, void* count);
void suppressEventsCommForGo(char *comm);
void subEventForGo(char* eventName, char* category, void *params);
int startProfile();
Expand Down
40 changes: 22 additions & 18 deletions collector/pkg/component/receiver/cgoreceiver/cgoreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,30 +75,34 @@ func (r *CgoReceiver) Start() error {
// Wait for the C routine running
time.Sleep(2 * time.Second)
go r.consumeEvents()
go r.startGetEvent()
go r.startGetEvents()
go r.getCaptureStatisticsByInterval(15 * time.Second)

return nil
}

func (r *CgoReceiver) startGetEvent() {
var pKindlingEvent unsafe.Pointer
r.shutdownWG.Add(1)
for {
select {
case <-r.stopCh:
r.shutdownWG.Done()
return
default:
res := int(C.getKindlingEvent(&pKindlingEvent))
if res == 1 {
event := convertEvent((*CKindlingEventForGo)(pKindlingEvent))
r.eventChannel <- event
r.stats.add(event.Name, 1)
}
}
}
func (r *CgoReceiver) startGetEvents() {
var count = 0
var pKindlingEvents [1000]unsafe.Pointer
r.shutdownWG.Add(1)

for {
select {
case <-r.stopCh:
r.shutdownWG.Done()
return
default:
evt_count := int(C.getEventsByInterval(C.int(100000000), &pKindlingEvents[0], (unsafe.Pointer)(&count)))
for i := 0; i < evt_count; i++ {
event := convertEvent((*CKindlingEventForGo)(pKindlingEvents[i]))
r.eventChannel <- event
r.stats.add(event.Name, 1)
}
}
}
}


func (r *CgoReceiver) consumeEvents() {
r.shutdownWG.Add(1)
for {
Expand Down
4 changes: 3 additions & 1 deletion probe/src/cgo/cgo_func.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@

int runForGo() { return init_probe(); }

int getKindlingEvent(void** kindlingEvent) { return getEvent(kindlingEvent); }
int getEventsByInterval(int interval, void** kindlingEvent, void* count) {
return get_events_by_interval((uint64_t)interval, kindlingEvent, count);
}

int startProfile() { return start_profile(); }
int stopProfile() { return stop_profile(); }
Expand Down
2 changes: 1 addition & 1 deletion probe/src/cgo/cgo_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
extern "C" {
#endif
int runForGo();
int getKindlingEvent(void** kindlingEvent);
int getEventsByInterval(int interval, void** kindlingEvent, void* count);
void suppressEventsCommForGo(char *comm);
void subEventForGo(char* eventName, char* category, void* params);
int startProfile();
Expand Down
27 changes: 26 additions & 1 deletion probe/src/cgo/kindling.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ char* duration_char = new char[32];
char* span_char = new char[1024];

int16_t event_filters[1024][16];
uint64_t receiver_ts = 0;

void init_sub_label() {
for (auto e : kindling_to_sysdig) {
Expand Down Expand Up @@ -89,6 +90,24 @@ void sub_event(char* eventName, char* category, event_params_for_subscribe param
}
}

#define MAX_EVENTS 998
int get_events_by_interval(uint64_t interval, void** kindlingEvents, void* count) {
uint64_t tmp_ts = receiver_ts;
int i = 0;
while (true) {
if (getEvent(interval, &kindlingEvents[i], (int*)count) == 1){
if(i >= MAX_EVENTS) {
break;
}
i++;
}
if (tmp_ts != receiver_ts) {
break;
}
}
return i;
}

void suppress_events_comm(string comm) {
printCurrentTime();
cout << "suppress_events for process " << comm << endl;
Expand Down Expand Up @@ -161,7 +180,7 @@ int init_probe() {
return 0;
}

int getEvent(void** pp_kindling_event) {
int getEvent(uint64_t interval, void** pp_kindling_event, int* event_count) {
int32_t res;
sinsp_evt* ev;
res = inspector->next(&ev);
Expand Down Expand Up @@ -192,6 +211,7 @@ int getEvent(void** pp_kindling_event) {
}
uint16_t kindling_category = get_kindling_category(ev);
uint16_t ev_type = ev->get_type();
int evtcnt = *event_count;

print_event(ev);
if (ev_type != PPME_CPU_ANALYSIS_E && is_profile_debug && threadInfo->m_tid == debug_tid &&
Expand Down Expand Up @@ -438,6 +458,11 @@ int getEvent(void** pp_kindling_event) {
}
strcpy(p_kindling_event->context.tinfo.comm, tmp_comm);
strcpy(p_kindling_event->context.tinfo.containerId, (char*)threadInfo->m_container_id.data());
evtcnt++;
*event_count = evtcnt;
if (ev->get_ts() - receiver_ts >= interval || *event_count > 998) {
receiver_ts = ev->get_ts();
}
return 1;
}

Expand Down
4 changes: 3 additions & 1 deletion probe/src/cgo/kindling.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ void stop_perf();

void exipre_window_cache();

int getEvent(void** kindlingEvent);
int getEvent(uint64_t interval, void** kindlingEvent, int* event_count);

int get_events_by_interval(uint64_t interval, void** kindlingEvent, void* count);

uint16_t get_kindling_category(sinsp_evt* sEvt);

Expand Down