Skip to content

Commit

Permalink
upon self code read
Browse files Browse the repository at this point in the history
  • Loading branch information
renukamanavalan committed May 20, 2022
1 parent ff78343 commit 52a8a14
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 19 deletions.
27 changes: 16 additions & 11 deletions src/sonic-eventd/eventd/eventd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#define MAX_PUBLISHERS_COUNT 1000


eventd_server::eventd_server() : m_capture(NULL),
eventd_server::eventd_server() : m_capture(NULL)
{
m_ctx = zmq_ctx_new();
RET_ON_ERR(m_ctx != NULL, "Failed to get zmq ctx");
Expand Down Expand Up @@ -65,10 +65,10 @@ eventd_server::zproxy_service()
RET_ON_ERR(rc == 0, "Failing to bind XPUB to %s", get_config(XPUB_END_KEY));

void *capture = zmq_socket(m_ctx, ZMQ_PUB);
RET_ON_ERR(capture != NULL, "failing to get ZMQ_XSUB socket");
RET_ON_ERR(capture != NULL, "failing to get ZMQ_PUB socket for capture");

rc = zmq_bind(capture, get_config(CAPTURE_END_KEY));
RET_ON_ERR(rc == 0, "Failing to bind PAIR to %s", get_config(PAIR_END_KEY));
RET_ON_ERR(rc == 0, "Failing to bind capture PUB to %s", get_config((CAPTURE_END_KEY));

m_thread_proxy = thread(&eventd_server::zproxy_service_run, this, frontend,
backend, capture);
Expand Down Expand Up @@ -103,9 +103,6 @@ eventd_server::capture_events(events_data_lst_t &lst)
int ret = -1;
int i;

events_data_lst_t.swap(m_events);
last_events_t.swap(m_last_events);

/*
* Reserve a MAX_PUBLISHERS_COUNT entries for last events, as we use it only
* upon m_events/vector overflow, which might block adding new entries in map
Expand Down Expand Up @@ -164,7 +161,7 @@ eventd_server::capture_events(events_data_lst_t &lst)
while(m_events.size() < m_cache_max) {
string source, evt_str;

RET_ON_ERR(zmq_message_read(m_socket, ZMQ_DONTWAIT, source, evt_str) == 0,
RET_ON_ERR(zmq_message_read(m_socket, 0, source, evt_str) == 0,
"Failed to read from capture socket");
try
{
Expand All @@ -182,12 +179,11 @@ eventd_server::capture_events(events_data_lst_t &lst)


/* Save only last event per sender */
m_last_events.clear();
while(true) {
internal_event_t event;
string source, evt_str;

RET_ON_ERR(zmq_message_read(m_socket, ZMQ_DONTWAIT, source, evt_str) == 0,
RET_ON_ERR(zmq_message_read(m_socket, 0, source, evt_str) == 0,
"Failed to read from capture socket");

deserialize(evt_str, event);
Expand Down Expand Up @@ -227,10 +223,10 @@ eventd_server::eventd_service()
break;
}
m_capture = zmq_socket(m_ctx, ZMQ_SUB);
RET_ON_ERR(capture != NULL, "failing to get ZMQ_XSUB socket");
RET_ON_ERR(capture != NULL, "failing to get ZMQ_SUB socket");

rc = zmq_connect(capture, get_config(CAPTURE_END_KEY));
RET_ON_ERR(rc == 0, "Failing to bind PAIR to %s", get_config(PAIR_END_KEY));
RET_ON_ERR(rc == 0, "Failing to bind capture SUB to %s", get_config((CAPTURE_END_KEY));

rc = zmq_setsockopt(sub_read, ZMQ_SUBSCRIBE, "", 0);
RET_ON_ERR(rc == 0, "Failing to ZMQ_SUBSCRIBE");
Expand All @@ -244,6 +240,9 @@ eventd_server::eventd_service()
resp_code = -1;
break;
}
events_data_lst_t.swap(m_events);
last_events_t.swap(m_last_events);

/* Kick off the service */
m_thread_capture = thread(&eventd_server::capture_events, this, req_data);

Expand All @@ -253,6 +252,12 @@ eventd_server::eventd_service()

case EVENT_CACHE_STOP:
if (m_capture != NULL) {
/*
* Caller would have initiated SUBS channel.
* Read for CACHE_DRAIN_IN_MILLISECS to drain off cache
* before stopping.
*/
this_thread::sleep_for(chrono::milliseconds(CACHE_DRAIN_IN_MILLISECS));
close(m_capture);
m_capture = NULL;

Expand Down
15 changes: 7 additions & 8 deletions src/sonic-eventd/eventd/eventd.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*/
#include "events_service.h"

typedef map<runtime_id_t, events_cache_type_t> last_events_t;
typedef map<runtime_id_t, events_data_type_t> last_events_t;

class eventd_server {
public:
Expand Down Expand Up @@ -68,17 +68,17 @@ class eventd_server {
*
* It keeps two sets of data
* 1) List of all events received in vector in same order as received
* 2) Map of last event from each runtime id
* 2) Map of last event from each runtime id upon list overflow max size.
*
* We add to the vector as much as allowed by vector and as well
* the available memory. When mem exhausts, just keep updating map
* with last event from that sender.
* We add to the vector as much as allowed by vector and max limit,
* whichever comes first.
*
* The sequence number in map will help assess the missed count.
* The sequence number in internal event will help assess the missed count
* by the consumer of the cache data.
*
* Thread is started upon creating SUB end of capture socket.
*/
int capture_events(events_data_lst_t &);
int capture_events(events_data_lst_t &lst);


private:
Expand All @@ -92,7 +92,6 @@ class eventd_server {

void *m_capture;


thread m_thread_proxy;
thread m_thread_capture;
};
Expand Down

0 comments on commit 52a8a14

Please sign in to comment.