Skip to content

Commit

Permalink
No logical chenges; little re-order
Browse files Browse the repository at this point in the history
  • Loading branch information
renukamanavalan committed Jun 20, 2022
1 parent fea1de6 commit ec671c0
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 88 deletions.
115 changes: 67 additions & 48 deletions src/sonic-eventd/src/eventd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@

#define VEC_SIZE(p) ((int)p.size())

extern int zerrno;

int
eventd_proxy::init()
{
Expand Down Expand Up @@ -137,15 +135,26 @@ void
capture_service::do_capture()
{
int rc;
runtime_id_t rid;
sequence_t seq;
int block_ms=300;
internal_event_t event;
string source, evt_str;
int init_cnt;

void *sock = NULL;
sock = zmq_socket(m_ctx, ZMQ_SUB);
typedef enum {
/*
* In this state every event read is compared with init cache given
* Only new events are saved.
*/
CAP_STATE_INIT = 0,

/* In this state, all events read are cached until max limit */
CAP_STATE_ACTIVE,

/* Cache has hit max. Hence only save last event for each runime ID */
CAP_STATE_LAST
} cap_state_t;

cap_state_t cap_state = CAP_STATE_INIT;

void *sock = zmq_socket(m_ctx, ZMQ_SUB);
RET_ON_ERR(sock != NULL, "failing to get ZMQ_SUB socket");

rc = zmq_connect(sock, get_config(string(CAPTURE_END_KEY)).c_str());
Expand All @@ -165,11 +174,6 @@ capture_service::do_capture()
}

/*
* Check read events against provided cache until as many events are read.
* to avoid sending duplicates.
* After starting cache service, the caller drains his local cache, which
* could potentially return a new event, that both caller & cache service reads.
*
* The cache service connects but defers any reading until caller provides
* the startup cache. But all events that arrived since connect, though not read
* will be held by ZMQ in its local cache.
Expand All @@ -183,71 +187,86 @@ capture_service::do_capture()
* is empty, do this check.
*/
init_cnt = (int)m_events.size();
while((m_ctrl == START_CAPTURE) && !m_pre_exist_id.empty() && (init_cnt > 0)) {

if (zmq_message_read(sock, 0, source, event) == -1) {
RET_ON_ERR(zerrno == EAGAIN,
"0:Failed to read from capture socket");
/* Read until STOP_CAPTURE */
while(m_ctrl == START_CAPTURE) {
runtime_id_t rid;
sequence_t seq;
internal_event_t event;
string source, evt_str;

if ((rc = zmq_message_read(sock, 0, source, event)) != 0) {
/*
* The capture socket captures SUBSCRIBE requests too.
* The messge could contain subscribe filter strings and binary code.
* Empty string with binary code will fail to deserialize.
* Else would fail event validation.
*/
RET_ON_ERR((rc == EAGAIN) || (rc == ERR_MESSAGE_INVALID),
"0:Failed to read from capture socket");
continue;
}
if (!validate_event(event, rid, seq)) {
continue;
}
else if (validate_event(event, rid, seq)) {
serialize(event, evt_str);

switch(cap_state) {
case CAP_STATE_INIT:
{
bool add = true;
init_cnt--;
serialize(event, evt_str);
pre_exist_id_t::iterator it = m_pre_exist_id.find(rid);

if (it != m_pre_exist_id.end()) {
if (seq <= it->second) {
/* Duplicate; Later/same seq in cache. */
add = false;
}
if (seq >= it->second) {
/* new one; This runtime ID need not be checked again */
m_pre_exist_id.erase(it);
}
}
if (add) {
m_events.push_back(evt_str);
}
}
}
pre_exist_id_t().swap(m_pre_exist_id);

/* Save until max allowed */
while((m_ctrl == START_CAPTURE) && (VEC_SIZE(m_events) < m_cache_max)) {
}
if(m_pre_exist_id.empty() || (init_cnt <= 0)) {
/* Init check is no more needed. */
pre_exist_id_t().swap(m_pre_exist_id);
cap_state = CAP_STATE_ACTIVE;
}
break;

if (zmq_message_read(sock, 0, source, event) == -1) {
RET_ON_ERR(zerrno == EAGAIN,
"1: Failed to read from capture socket");
}
else if (validate_event(event, rid, seq)) {
serialize(event, evt_str);
case CAP_STATE_ACTIVE:
/* Save until max allowed */
try
{
m_events.push_back(evt_str);
if (VEC_SIZE(m_events) >= m_cache_max) {
cap_state = CAP_STATE_LAST;
}
break;
}
catch (exception& e)
catch (bad_alloc& e)
{
stringstream ss;
ss << e.what();
SWSS_LOG_ERROR("Cache save event failed with %s events:size=%d",
ss.str().c_str(), VEC_SIZE(m_events));
break;
cap_state = CAP_STATE_LAST;
// fall through to save this event in last set.
}
}
}

/* Clear the map, created to ensure memory space available */
m_last_events.clear();
m_last_events_init = true;

/* Save only last event per sender */
while(m_ctrl == START_CAPTURE) {

if (zmq_message_read(sock, 0, source, event) == -1) {
RET_ON_ERR(zerrno == EAGAIN,
"2:Failed to read from capture socket");

} else if (validate_event(event, rid, seq)) {
serialize(event, evt_str);
case CAP_STATE_LAST:
if (!m_last_events_init) {
/* Clear the map, created to ensure memory space available */
m_last_events.clear();
m_last_events_init = true;
}
m_last_events[rid] = evt_str;
break;
}
}

Expand Down
81 changes: 45 additions & 36 deletions src/sonic-eventd/tests/eventd_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,35 +172,13 @@ void run_cap(void *zctx, bool &term, string &read_source,
EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms)));

while(!term) {
int rc;
#if 0
/*
* Don't call zmq_message_read as that is not thread safe
* Subscriber thread is already calling.
*/
zmq_msg_t source, data;
zmq_msg_init(&source);
zmq_msg_init(&data);
rc = zmq_msg_recv(&source, mock_cap, 0);
if (rc != -1) {
rc = zmq_msg_recv(&data, mock_cap, 0);
}
#else
/*
* Intending to make it thread safe.
* Fix, if test fails. Else it is already good.
*/
{
string source;
internal_event_t ev_int;
rc = zmq_message_read(mock_cap, 0, source, ev_int);
}
#endif
if (rc != -1) {
string source;
internal_event_t ev_int;

if (0 == zmq_message_read(mock_cap, 0, source, ev_int)) {
cnt = ++i;
}
}

zmq_close(mock_cap);
}

Expand Down Expand Up @@ -321,6 +299,10 @@ TEST(eventd, proxy)

zmq_close(mock_pub);
zmq_ctx_term(zctx);

/* Provide time for async proxy removal to complete */
this_thread::sleep_for(chrono::milliseconds(200));

printf("eventd_proxy is tested GOOD\n");
}

Expand Down Expand Up @@ -362,10 +344,6 @@ TEST(eventd, capture)
EXPECT_EQ(0, pxy->init());

/* Run subscriber; Else publisher will drop events on floor, with no subscriber. */
/*
* Block sub from calling zmq_message_read as capture service is calling
* and zmq_message_read crashes on access from more than one thread.
*/
thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz));

/* Create capture service */
Expand All @@ -374,6 +352,9 @@ TEST(eventd, capture)
/* Expect START_CAPTURE */
EXPECT_EQ(-1, pcap->set_control(STOP_CAPTURE));

/* Initialize the capture */
EXPECT_EQ(0, pcap->set_control(INIT_CAPTURE));

EXPECT_TRUE(init_cache > 1);
EXPECT_TRUE((cache_max+3) < ARRAY_SIZE(ldata));

Expand Down Expand Up @@ -409,7 +390,6 @@ TEST(eventd, capture)
}
}

EXPECT_EQ(0, pcap->set_control(INIT_CAPTURE));
EXPECT_EQ(0, pcap->set_control(START_CAPTURE, &evts_start));

/* Init pub connection */
Expand All @@ -422,7 +402,7 @@ TEST(eventd, capture)
run_pub(mock_pub, wr_source, wr_evts);

/* Provide time for async message receive. */
this_thread::sleep_for(chrono::milliseconds(100));
this_thread::sleep_for(chrono::milliseconds(200));

/* Stop capture, closes socket & terminates the thread */
EXPECT_EQ(0, pcap->set_control(STOP_CAPTURE));
Expand All @@ -433,7 +413,21 @@ TEST(eventd, capture)
/* Read the cache */
EXPECT_EQ(0, pcap->read_cache(evts_read, last_evts_read));

#ifdef DEBUG_TEST
if ((evts_read.size() != evts_expect.size()) ||
(last_evts_read.size() != last_evts_exp.size())) {
printf("size: sub_evts_sz=%d sub_evts=%d\n", sub_evts_sz, (int)sub_evts.size());
printf("init_cache=%d cache_max=%d\n", init_cache, cache_max);
printf("evts_start=%d evts_expect=%d evts_read=%d\n",
(int)evts_start.size(), (int)evts_expect.size(), (int)evts_read.size());
printf("last_evts_exp=%d last_evts_read=%d\n", (int)last_evts_exp.size(),
(int)last_evts_read.size());
}
#endif

EXPECT_EQ(evts_read.size(), evts_expect.size());
EXPECT_EQ(evts_read, evts_expect);
EXPECT_EQ(last_evts_read.size(), last_evts_exp.size());
EXPECT_EQ(last_evts_read, last_evts_exp);

delete pxy;
Expand All @@ -446,6 +440,10 @@ TEST(eventd, capture)

zmq_close(mock_pub);
zmq_ctx_term(zctx);

/* Provide time for async proxy removal to complete */
this_thread::sleep_for(chrono::milliseconds(200));

printf("Capture TEST completed\n");
}

Expand Down Expand Up @@ -486,10 +484,6 @@ TEST(eventd, captureCacheMax)
EXPECT_EQ(0, pxy->init());

/* Run subscriber; Else publisher will drop events on floor, with no subscriber. */
/*
* Block sub from calling zmq_message_read as capture service is calling
* and zmq_message_read crashes on access from more than one thread.
*/
thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz));

/* Create capture service */
Expand Down Expand Up @@ -551,6 +545,17 @@ TEST(eventd, captureCacheMax)
/* Read the cache */
EXPECT_EQ(0, pcap->read_cache(evts_read, last_evts_read));

#ifdef DEBUG_TEST
if ((evts_read.size() != evts_expect.size()) ||
!last_evts_read.empty()) {
printf("size: sub_evts_sz=%d sub_evts=%d\n", sub_evts_sz, (int)sub_evts.size());
printf("init_cache=%d cache_max=%d\n", init_cache, cache_max);
printf("evts_start=%d evts_expect=%d evts_read=%d\n",
(int)evts_start.size(), (int)evts_expect.size(), (int)evts_read.size());
printf("last_evts_read=%d\n", (int)last_evts_read.size());
}
#endif

EXPECT_EQ(evts_read, evts_expect);
EXPECT_TRUE(last_evts_read.empty());

Expand All @@ -564,6 +569,10 @@ TEST(eventd, captureCacheMax)

zmq_close(mock_pub);
zmq_ctx_term(zctx);

/* Provide time for async proxy removal to complete */
this_thread::sleep_for(chrono::milliseconds(200));

printf("Capture TEST with matchinhg cache-max completed\n");
}

Expand Down
8 changes: 4 additions & 4 deletions src/sonic-eventd/tools/events_tool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ do_receive(const event_subscribe_sources_t filter, const string outfile, int cnt
int missed_cnt=-1;

int rc = event_receive(h, key, params, missed_cnt);
if (rc == -1) {
ASSERT(event_last_error() == EAGAIN, "Failed to receive rc=%d err=%d\n",
rc, event_last_error());
if (rc != 0) {
ASSERT(rc == EAGAIN, "Failed to receive rc=%d index=%d\n",
rc, index);
continue;
}
ASSERT(!key.empty(), "received EMPTY key");
Expand Down Expand Up @@ -253,7 +253,7 @@ do_send(const string infile, int cnt, int pause)
}

int rc = event_publish(h, evt.tag, evt.params.empty() ? NULL : &evt.params);
ASSERT(rc == 0, "Failed to publish index=%d", index);
ASSERT(rc == 0, "Failed to publish index=%d rc=%d", index, rc);

if ((cnt > 0) && (--cnt == 0)) {
/* set to termninate */
Expand Down

0 comments on commit ec671c0

Please sign in to comment.