Skip to content

Commit

Permalink
build in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
renukamanavalan committed Jun 7, 2022
1 parent 99fa2c3 commit c903f02
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 59 deletions.
41 changes: 41 additions & 0 deletions src/sonic-eventd/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
RM := rm -rf
EVENTD_TARGET := eventd
CP := cp
MKDIR := mkdir
CC := g++
MV := mv
LIBS := -levent -lhiredis -lswsscommon -pthread -lboost_thread -lboost_system
CFLAGS += -Wall -std=c++17 -fPIE -I$(PWD)/../sonic-swss-common/common
PWD := $(shell pwd)

ifneq ($(MAKECMDGOALS),clean)
ifneq ($(strip $(C_DEPS)),)
-include $(C_DEPS) $(OBJS)
endif
endif

-include src/subdir.mk

all: sonic-eventd

sonic-eventd: $(OBJS)
@echo 'Building target: $@'
@echo 'Invoking: G++ Linker'
$(CC) $(LDFLAGS) -o $(EVENTD_TARGET) $(OBJS) $(LIBS)
@echo 'Finished building target: $@'
@echo ' '

install:
$(MKDIR) -p $(DESTDIR)/usr/sbin
$(MV) $(EVENTD_TARGET) $(DESTDIR)/usr/sbin

deinstall:
$(RM) $(DESTDIR)/usr/sbin/$(EVENTD_TARGET)
$(RM) -rf $(DESTDIR)/usr/sbin

clean:
-@echo ' '

.PHONY: all clean dependents


2 changes: 0 additions & 2 deletions src/sonic-eventd/Makefile.am

This file was deleted.

17 changes: 0 additions & 17 deletions src/sonic-eventd/eventd/Makefile.am

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,31 @@
/* Count of elements returned in each read */
#define READ_SET_SIZE 100

#define VEC_SIZE(p) ((int)p.size())

int
eventd_proxy::init()
{
int ret = -1;
int ret = -1, rc = 0;
SWSS_LOG_INFO("Start xpub/xsub proxy");

m_frontend = zmq_socket(m_ctx, ZMQ_XSUB);
RET_ON_ERR(m_frontend != NULL, "failing to get ZMQ_XSUB socket");

int rc = zmq_bind(m_frontend, get_config(XSUB_END_KEY));
RET_ON_ERR(rc == 0, "Failing to bind XSUB to %s", get_config(XSUB_END_KEY));
rc = zmq_bind(m_frontend, get_config(string(XSUB_END_KEY)).c_str());
RET_ON_ERR(rc == 0, "Failing to bind XSUB to %s", get_config(string(XSUB_END_KEY)).c_str());

m_backend = zmq_socket(m_ctx, ZMQ_XPUB);
RET_ON_ERR(m_backend != NULL, "failing to get ZMQ_XPUB socket");

rc = zmq_bind(m_backend, get_config(XPUB_END_KEY));
RET_ON_ERR(rc == 0, "Failing to bind XPUB to %s", get_config(XPUB_END_KEY));
rc = zmq_bind(m_backend, get_config(string(XPUB_END_KEY)).c_str());
RET_ON_ERR(rc == 0, "Failing to bind XPUB to %s", get_config(string(XPUB_END_KEY)).c_str());

m_capture = zmq_socket(m_ctx, ZMQ_PUB);
RET_ON_ERR(m_capture != NULL, "failing to get ZMQ_PUB socket for capture");

rc = zmq_bind(m_capture, get_config(CAPTURE_END_KEY));
RET_ON_ERR(rc == 0, "Failing to bind capture PUB to %s", get_config(CAPTURE_END_KEY));
rc = zmq_bind(m_capture, get_config(string(CAPTURE_END_KEY)).c_str());
RET_ON_ERR(rc == 0, "Failing to bind capture PUB to %s", get_config(string(CAPTURE_END_KEY)).c_str());

m_thr = thread(&eventd_proxy::run, this);
ret = 0;
Expand All @@ -58,8 +60,6 @@ eventd_proxy::run()

/* runs forever until zmq context is terminated */
zmq_proxy(m_frontend, m_backend, m_capture);

return 0;
}


Expand All @@ -68,6 +68,7 @@ capture_service::~capture_service()
stop_capture();
}

void
capture_service::stop_capture()
{
if (m_socket != NULL) {
Expand All @@ -80,7 +81,7 @@ capture_service::stop_capture()
}

static bool
validate_event(const internal_event_t &event, string &rid, string &seq)
validate_event(const internal_event_t &event, runtime_id_t &rid, sequence_t &seq)
{
bool ret = false;

Expand All @@ -90,26 +91,25 @@ validate_event(const internal_event_t &event, string &rid, string &seq)
itc_e = event.find(EVENT_STR_DATA);

if ((itc_r != event.end()) && (itc_s != event.end()) && (itc_e != event.end())) {
invalid_evt = true;

ret = true;
rid = itc_r->second;
seq = itc_s->second;
seq = str_to_seq(itc_s->second);
}
else {
SWSS_LOG_ERROR("Invalid evt: %s", map_to_str(event).str());
SWSS_LOG_ERROR("Invalid evt: %s", map_to_str(event).c_str());
}

return ret;
}


void
capture_service::init_capture_cache(events_data_lst_t &lst)
capture_service::init_capture_cache(const events_data_lst_t &lst)
{
/* clean any pre-existing cache */
int ret = -1;
int i;
string rid, seq;
runtime_id_t rid;
sequence_t seq;

/*
* Reserve a MAX_PUBLISHERS_COUNT entries for last events, as we use it only
Expand All @@ -121,13 +121,14 @@ capture_service::init_capture_cache(events_data_lst_t &lst)
m_last_events[to_string(i)] = "";
}

/* Cache last
for (events_data_lst_t::it = lst.begin(); it != lst.end(); ++it) {
/* Cache last events -- as only the last instance */
/* This is required to compute missed count */
for (events_data_lst_t::const_iterator itc = lst.begin(); itc != lst.end(); ++itc) {
internal_event_t event;

if (deserialize(*itc, event) == 0) {
if (validate_event(event, rid, seq)) {
m_pre_exist_id[event[EVENT_RUNTIME_ID]] = rid;
m_pre_exist_id[rid] = seq;
m_events.push_back(*itc);
}
}
Expand All @@ -142,12 +143,11 @@ void
capture_service::do_capture()
{
/* clean any pre-existing cache */
int ret = -1;
int i;
string rid, seq;
runtime_id_t rid;
sequence_t seq;

/* Check read events against provided cache for 2 seconds to skip */
chrono::steady_clock::timepoint start = chrono::steady_clock::now();
chrono::steady_clock::time_point start = chrono::steady_clock::now();
while(!m_pre_exist_id.empty()) {
internal_event_t event;
string source, evt_str;
Expand All @@ -158,10 +158,9 @@ capture_service::do_capture()
if (validate_event(event, rid, seq)) {

serialize(event, evt_str);
m_pre_exist_id_t::iterator it = m_pre_exist_id.find(rid);
pre_exist_id_t::iterator it = m_pre_exist_id.find(rid);

if (it != m_pre_exist_id.end()) {
seq = events_base::str_to_seq(seq);
if (seq > it->second) {
m_events.push_back(evt_str);
}
Expand All @@ -176,7 +175,7 @@ capture_service::do_capture()
pre_exist_id_t().swap(m_pre_exist_id);

/* Save until max allowed */
while(m_events.size() < m_cache_max) {
while(VEC_SIZE(m_events) < m_cache_max) {
internal_event_t event;
string source, evt_str;

Expand All @@ -193,7 +192,7 @@ capture_service::do_capture()
stringstream ss;
ss << e.what();
SWSS_LOG_ERROR("Cache save event failed with %s events:size=%d",
ss.str().c_str(), m_events.size());
ss.str().c_str(), VEC_SIZE(m_events));
break;
}
}
Expand All @@ -218,13 +217,13 @@ capture_service::do_capture()
* Capture stop will close the socket which fail the read
* and hence bail out.
*/
return 0;
}

int
capture_service::set_control(capture_control_t ctrl, events_data_lst_t *lst)
{
int ret = -1;
int rc;

/* Can go in single step only. */
RET_ON_ERR((ctrl - m_ctrl) == 1, "m_ctrl(%d) > ctrl(%d)", m_ctrl, ctrl);
Expand All @@ -237,8 +236,8 @@ capture_service::set_control(capture_control_t ctrl, events_data_lst_t *lst)
sock = zmq_socket(m_ctx, ZMQ_SUB);
RET_ON_ERR(sock != NULL, "failing to get ZMQ_SUB socket");

rc = zmq_connect(sock, get_config(CAPTURE_END_KEY));
RET_ON_ERR(rc == 0, "Failing to bind capture SUB to %s", get_config(CAPTURE_END_KEY));
rc = zmq_connect(sock, get_config(string(CAPTURE_END_KEY)).c_str());
RET_ON_ERR(rc == 0, "Failing to bind capture SUB to %s", get_config(string(CAPTURE_END_KEY)).c_str());

rc = zmq_setsockopt(sock, ZMQ_SUBSCRIBE, "", 0);
RET_ON_ERR(rc == 0, "Failing to ZMQ_SUBSCRIBE");
Expand Down Expand Up @@ -303,6 +302,7 @@ capture_service::read_cache(events_data_lst_t &lst_fifo,
void
run_eventd_service()
{
int cache_max;
event_service service;
eventd_proxy *proxy = NULL;
capture_service *capture = NULL;
Expand All @@ -315,7 +315,7 @@ run_eventd_service()
void *zctx = zmq_ctx_new();
RET_ON_ERR(ctx != NULL, "Failed to get zmq ctx");

cache_max = get_config_data(CACHE_MAX_CNT, (uint32_t)MAX_CACHE_SIZE);
cache_max = get_config_data(string(CACHE_MAX_CNT), (int)MAX_CACHE_SIZE));
RET_ON_ERR(cache_max > 0, "Failed to get CACHE_MAX_CNT");

proxy = new eventd_proxy(zctx);
Expand Down Expand Up @@ -389,20 +389,22 @@ run_eventd_service()
last_events_t().swap(capture_last_events);
}

int sz = capture_fifo_events.size() < READ_SET_SIZE ?
capture_fifo_events.size() : READ_SET_SIZE;
{
int sz = VEC_SIZE(capture_fifo_events) < READ_SET_SIZE ?
VEC_SIZE(capture_fifo_events) : READ_SET_SIZE;

if (sz != 0) {
auto it = std::next(capture_fifo_events.begin(), sz);
move(capture_fifo_events.begin(), capture_fifo_events.end(),
back_inserter(resp_data));

if (sz == capture_fifo_events.size()) {
if (sz == VEC_SIZE(capture_fifo_events)) {
events_data_lst_t().swap(capture_fifo_events);
} else {
events.erase(capture_fifo_events.begin(), it);
}
}
}
break;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ class eventd_proxy
zmq_close(m_backend);
zmq_close(m_capture);

if (m_thr.joinable)
if (m_thr.joinable())
m_thr.join();
}

int init();

private:
int run();
void run();

void *m_ctx;
void *m_frontend;
Expand Down Expand Up @@ -96,7 +96,7 @@ class capture_service
{
public:
capture_service(void *ctx, int cache_max) : m_ctx(ctx), m_socket(NULL),
m_ctl(NEED_INIT), m_status(0), m_cache_max(cache_max)
m_ctrl(NEED_INIT), m_cache_max(cache_max)
{}

~capture_service();
Expand All @@ -107,7 +107,7 @@ class capture_service
last_events_t &lst_last);

private:
void init_capture_cache(events_data_lst_t &lst);
void init_capture_cache(const events_data_lst_t &lst);
void do_capture();

void stop_capture();
Expand All @@ -117,7 +117,7 @@ class capture_service
capture_control_t m_ctrl;
thread m_thr;

uint32_t m_cache_max;
int m_cache_max;

events_data_lst_t m_events;

Expand Down
12 changes: 12 additions & 0 deletions src/sonic-eventd/src/subdir.mk
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CC := g++

OBJS += ./src/eventd.o

C_DEPS += ./src/eventd.d

src/%.o: src/%.cpp
@echo 'Building file: $<'
@echo 'Invoking: GCC C++ Compiler'
$(CC) -D__FILENAME__="$(subst src/,,$<)" $(CFLAGS) -c -fmessage-length=0 -MMD -MP -MF"$(@:%.o=%.d)" -MT"$(@)" -o "$@" "$<"
@echo 'Finished building: $<'
@echo ' '

0 comments on commit c903f02

Please sign in to comment.