diff --git a/CHANGELOG.md b/CHANGELOG.md index 96fe38e04..48e618d12 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - Expand the histogram bucket of otelexpoerter (Add 1500ms). ([#563](https://github.com/KindlingProject/kindling/pull/563)) - Set default values of `store_external_src_ip` and `StoreExternalSrcIP` to false to reduce occurrences of unexpected src IP data. ([#562](https://github.com/KindlingProject/kindling/pull/562)) - Optimized the `networkanalyzer` component of the probe analyzer by utilizing Go's goroutines, enabling concurrent execution. ([#558](https://github.com/KindlingProject/kindling/pull/558)) +- Improved event processing efficiency with batch event retrieval in cgo. ([#560](https://github.com/KindlingProject/kindling/pull/560)) ### Bug fixes - Enhance dns with udp which is out of order.([#565](https://github.com/KindlingProject/kindling/pull/565)) diff --git a/collector/pkg/component/receiver/cgoreceiver/cgo_func.h b/collector/pkg/component/receiver/cgoreceiver/cgo_func.h index 383762907..b32bcdaed 100644 --- a/collector/pkg/component/receiver/cgoreceiver/cgo_func.h +++ b/collector/pkg/component/receiver/cgoreceiver/cgo_func.h @@ -9,7 +9,7 @@ extern "C" { #endif int runForGo(); -int getKindlingEvent(void **kindlingEvent); +int getEventsByInterval(int interval, void** kindlingEvent, void* count); void suppressEventsCommForGo(char *comm); void subEventForGo(char* eventName, char* category, void *params); int startProfile(); diff --git a/collector/pkg/component/receiver/cgoreceiver/cgoreceiver.go b/collector/pkg/component/receiver/cgoreceiver/cgoreceiver.go index 3ae931aed..7f84e4325 100644 --- a/collector/pkg/component/receiver/cgoreceiver/cgoreceiver.go +++ b/collector/pkg/component/receiver/cgoreceiver/cgoreceiver.go @@ -75,30 +75,34 @@ func (r *CgoReceiver) Start() error { // Wait for the C routine running time.Sleep(2 * time.Second) go r.consumeEvents() - go r.startGetEvent() + go r.startGetEvents() go r.getCaptureStatisticsByInterval(15 * time.Second) + return nil } -func (r *CgoReceiver) startGetEvent() { - var pKindlingEvent unsafe.Pointer - r.shutdownWG.Add(1) - for { - select { - case <-r.stopCh: - r.shutdownWG.Done() - return - default: - res := int(C.getKindlingEvent(&pKindlingEvent)) - if res == 1 { - event := convertEvent((*CKindlingEventForGo)(pKindlingEvent)) - r.eventChannel <- event - r.stats.add(event.Name, 1) - } - } - } +func (r *CgoReceiver) startGetEvents() { + var count = 0 + var pKindlingEvents [1000]unsafe.Pointer + r.shutdownWG.Add(1) + + for { + select { + case <-r.stopCh: + r.shutdownWG.Done() + return + default: + evt_count := int(C.getEventsByInterval(C.int(100000000), &pKindlingEvents[0], (unsafe.Pointer)(&count))) + for i := 0; i < evt_count; i++ { + event := convertEvent((*CKindlingEventForGo)(pKindlingEvents[i])) + r.eventChannel <- event + r.stats.add(event.Name, 1) + } + } + } } + func (r *CgoReceiver) consumeEvents() { r.shutdownWG.Add(1) for { diff --git a/probe/src/cgo/cgo_func.cpp b/probe/src/cgo/cgo_func.cpp index b5f1480e1..de9e8ed45 100644 --- a/probe/src/cgo/cgo_func.cpp +++ b/probe/src/cgo/cgo_func.cpp @@ -8,7 +8,9 @@ int runForGo() { return init_probe(); } -int getKindlingEvent(void** kindlingEvent) { return getEvent(kindlingEvent); } +int getEventsByInterval(int interval, void** kindlingEvent, void* count) { + return get_events_by_interval((uint64_t)interval, kindlingEvent, count); +} int startProfile() { return start_profile(); } int stopProfile() { return stop_profile(); } diff --git a/probe/src/cgo/cgo_func.h b/probe/src/cgo/cgo_func.h index 93abe64dd..d9bee3d91 100644 --- a/probe/src/cgo/cgo_func.h +++ b/probe/src/cgo/cgo_func.h @@ -9,7 +9,7 @@ extern "C" { #endif int runForGo(); -int getKindlingEvent(void** kindlingEvent); +int getEventsByInterval(int interval, void** kindlingEvent, void* count); void suppressEventsCommForGo(char *comm); void subEventForGo(char* eventName, char* category, void* params); int startProfile(); diff --git a/probe/src/cgo/kindling.cpp b/probe/src/cgo/kindling.cpp index 63686e1fd..0dc361eb0 100644 --- a/probe/src/cgo/kindling.cpp +++ b/probe/src/cgo/kindling.cpp @@ -44,6 +44,7 @@ char* duration_char = new char[32]; char* span_char = new char[1024]; int16_t event_filters[1024][16]; +uint64_t receiver_ts = 0; void init_sub_label() { for (auto e : kindling_to_sysdig) { @@ -89,6 +90,24 @@ void sub_event(char* eventName, char* category, event_params_for_subscribe param } } +#define MAX_EVENTS 998 +int get_events_by_interval(uint64_t interval, void** kindlingEvents, void* count) { + uint64_t tmp_ts = receiver_ts; + int i = 0; + while (true) { + if (getEvent(interval, &kindlingEvents[i], (int*)count) == 1){ + if(i >= MAX_EVENTS) { + break; + } + i++; + } + if (tmp_ts != receiver_ts) { + break; + } + } + return i; +} + void suppress_events_comm(string comm) { printCurrentTime(); cout << "suppress_events for process " << comm << endl; @@ -161,7 +180,7 @@ int init_probe() { return 0; } -int getEvent(void** pp_kindling_event) { +int getEvent(uint64_t interval, void** pp_kindling_event, int* event_count) { int32_t res; sinsp_evt* ev; res = inspector->next(&ev); @@ -192,6 +211,7 @@ int getEvent(void** pp_kindling_event) { } uint16_t kindling_category = get_kindling_category(ev); uint16_t ev_type = ev->get_type(); + int evtcnt = *event_count; print_event(ev); if (ev_type != PPME_CPU_ANALYSIS_E && is_profile_debug && threadInfo->m_tid == debug_tid && @@ -438,6 +458,11 @@ int getEvent(void** pp_kindling_event) { } strcpy(p_kindling_event->context.tinfo.comm, tmp_comm); strcpy(p_kindling_event->context.tinfo.containerId, (char*)threadInfo->m_container_id.data()); + evtcnt++; + *event_count = evtcnt; + if (ev->get_ts() - receiver_ts >= interval || *event_count > 998) { + receiver_ts = ev->get_ts(); + } return 1; } diff --git a/probe/src/cgo/kindling.h b/probe/src/cgo/kindling.h index 8f9532639..97ddf5993 100644 --- a/probe/src/cgo/kindling.h +++ b/probe/src/cgo/kindling.h @@ -24,7 +24,9 @@ void stop_perf(); void exipre_window_cache(); -int getEvent(void** kindlingEvent); +int getEvent(uint64_t interval, void** kindlingEvent, int* event_count); + +int get_events_by_interval(uint64_t interval, void** kindlingEvent, void* count); uint16_t get_kindling_category(sinsp_evt* sEvt);