diff --git a/src/sonic-eventd/Makefile b/src/sonic-eventd/Makefile index dca689929330..13b88dffc3cc 100644 --- a/src/sonic-eventd/Makefile +++ b/src/sonic-eventd/Makefile @@ -5,6 +5,9 @@ EVENTD_TOOL := tools/events_tool EVENTD_PUBLISH_TOOL := tools/events_publish_tool.py RSYSLOG-PLUGIN_TARGET := rsyslog_plugin/rsyslog_plugin RSYSLOG-PLUGIN_TEST := rsyslog_plugin_tests/tests +EVENTD_MONIT := tools/events_monit_test.py +EVENTD_MONIT_CONF := tools/monit_events + CP := cp MKDIR := mkdir CC := g++ @@ -69,15 +72,20 @@ rsyslog-plugin-tests: $(RSYSLOG-PLUGIN-TEST_OBJS) install: $(MKDIR) -p $(DESTDIR)/usr/sbin + $(MKDIR) -p $(DESTDIR)/usr/local/bin + $(MKDIR) -p $(DESTDIR)/etc/monit/conf.d $(CP) $(EVENTD_TARGET) $(DESTDIR)/usr/sbin $(CP) $(EVENTD_TOOL) $(DESTDIR)/usr/sbin $(CP) $(EVENTD_PUBLISH_TOOL) $(DESTDIR)/usr/sbin - $(CP) $(RSYSLOG-PLUGIN_TARGET) $(DESTDIR)/usr/sbin + $(CP) $(RSYSLOG-PLUGIN_TARGET) $(DESTDIR)/usr/local/bin + $(CP) $(EVENTD_MONIT) $(DESTDIR)/usr/local/bin + $(CP) $(EVENTD_MONIT_CONF) $(DESTDIR)/etc/monit/conf.d deinstall: $(RM) $(DESTDIR)/usr/sbin/$(EVENTD_TARGET) $(RM) $(DESTDIR)/usr/sbin/$(RSYSLOG-PLUGIN_TARGET) - $(RM) -rf $(DESTDIR)/usr/sbin + $(RM) -rf $(DESTDIR)/usr + $(RM) -rf $(DESTDIR)/etc clean: -@echo ' ' diff --git a/src/sonic-eventd/debian/sonic-rsyslog-plugin.install b/src/sonic-eventd/debian/sonic-rsyslog-plugin.install index 77f0909abfd3..8b48d83dcf70 100644 --- a/src/sonic-eventd/debian/sonic-rsyslog-plugin.install +++ b/src/sonic-eventd/debian/sonic-rsyslog-plugin.install @@ -1 +1,3 @@ -usr/sbin/ryslog_plugin +usr/local/bin/rsyslog_plugin +usr/local/bin/events_monit_test.py +etc/monit/conf.d/monit_events diff --git a/src/sonic-eventd/tools/events_monit_test.py b/src/sonic-eventd/tools/events_monit_test.py new file mode 100644 index 000000000000..ee89b885e646 --- /dev/null +++ b/src/sonic-eventd/tools/events_monit_test.py @@ -0,0 +1,235 @@ +#! /usr/bin/python -u + +from inspect import getframeinfo, stack +from swsscommon.swsscommon import events_init_publisher, event_publish, FieldValueMap +from swsscommon.swsscommon import event_receive_op_t, event_receive, events_init_subscriber +from swsscommon.swsscommon import events_deinit_subscriber, events_deinit_publisher +import argparse +import os +import threading +import time +import syslog +import uuid + +chk_log_level = syslog.LOG_ERR + +test_source = "sonic-host" +test_event_tag = "device-test-event" +test_event_key = "{}:{}".format(test_source, test_event_tag) +test_event_params = { + "sender": os.path.basename(__file__), + "reason": "monit periodic test", + "batch-id": str(uuid.uuid1()), + "index": "0" +} + +# Async connection wait time in milliseconds. +ASYNC_CONN_WAIT = 300 + + +# Thread results +rc_test_receive = -1 + + +def _log_msg(lvl, pfx, msg): + if lvl <= chk_log_level: + caller = getframeinfo(stack()[2][0]) + fmsg = "{}:{}:{}".format(caller.function, caller.lineno, msg) + print("{}: {}".format(pfx, fmsg)) + syslog.syslog(lvl, fmsg) + +def log_err(m): + _log_msg(syslog.LOG_ERR, "Err", m) + + +def log_info(m): + _log_msg(syslog.LOG_INFO, "Info", m) + + +def log_debug(m): + _log_msg(syslog.LOG_DEBUG, "Debug", m) + + +def map_dict_fvm(s, d): + for k, v in s.items(): + d[k] = v + + +# Invoked in a separate thread +def test_receiver(event_obj, cnt): + global rc_test_receive + + sh = events_init_subscriber() + + # Sleep ASYNC_CONN_WAIT to ensure async connectivity is complete. + time.sleep(ASYNC_CONN_WAIT/1000) + + exp_params = dict(test_event_params) + + event_obj.set() + cnt_done = 0 + + for i in range(cnt): + p = event_receive_op_t() + rc = event_receive(sh, p) + + if (rc != 0): + log_err("Failed to receive. {}/{} rc={}".format(i, cnt, rc)) + break + + if test_event_key != p.key: + log_err("key mismatch {} != {} {}/{}".format(test_event_key, + p.key, i, cnt)) + break + + exp_params["index"] = str(i) + rcv_params = {} + map_dict_fvm(p.params, rcv_params) + + for k, v in exp_params.items(): + if k in rcv_params: + if (rcv_params[k] != v): + log_err("key:{} exp:{} != exist:{}".format( + k, v, rcv_params[k])) + rc = -1 + else: + log_err("key:{} is missing", k) + rc = -1 + + if (rc != 0): + log_err("params mismatch {}/{}".format(i,cnt)) + break + + if p.missed_cnt != 0: + log_err("Expect missed_cnt {} == 0 {}/{}".format(p.missed_cnt,i,cnt)) + break + + if p.publish_epoch_ms == 0: + log_err("Expect publish_epoch_ms != 0 {}/{}".format(i,cnt)) + break + + cnt_done += 1 + log_debug("Received {}/{}".format(i+1, cnt)) + + if (cnt_done == cnt): + rc_test_receive = 0 + else: + log_err("test receive abort {}/{}".format(cnt_done, cnt)) + + # wait for a max of 5 secs for main thread to clear the event. + tout = 5000 + while(event_obj.is_set()): + # main thread yet to consume last set event + if tout > 0: + t_sleep = 100 + time.sleep(t_sleep / 1000) + tout -= t_sleep + else: + log_err("test_receiver:Internal err: event not cleared by main") + break + + event_obj.set() + + events_deinit_subscriber(sh) + + +def publish_events(cnt): + rc = -1 + ph = events_init_publisher(test_source) + if not ph: + log_err("Failed to get publisher handle") + return rc + + # Sleep ASYNC_CONN_WAIT to ensure async connectivity is complete. + # Messages published before connection are silently dropped by ZMQ. + time.sleep(ASYNC_CONN_WAIT/1000) + + pub_params = dict(test_event_params) + + for i in range(cnt): + pd = FieldValueMap() + pub_params["index"] = str(i) + map_dict_fvm(pub_params, pd) + + rc = event_publish(ph, test_event_tag, pd) + if (rc != 0): + log_err("Failed to publish. {}/{} rc={}".format(i, cnt, rc)) + break + log_debug("published: {}/{}".format(i+1, cnt)) + + # Sleep ASYNC_CONN_WAIT to ensure publish complete, before closing channel. + time.sleep(ASYNC_CONN_WAIT/1000) + + events_deinit_publisher(ph) + + log_debug("publish_events Done. cnt={}".format(cnt)) + + return rc + + + +def run_test(cnt): + global rc_test_receive + + # Initialising event objects + event_sub = threading.Event() + + # Start subscriber thread + thread_sub = threading.Thread(target=test_receiver, args=(event_sub, cnt)) + thread_sub.start() + + # Subscriber would wait for ASYNC_CONN_WAIT. Wait additional 200ms + # for signal from test_receiver as ready. + event_sub.wait((ASYNC_CONN_WAIT + 200)/1000) + event_sub.clear() + + rc_pub = publish_events(cnt) + if (rc_pub != 0): + log_err("Failed in publish_events") + else: + # Wait for subscriber to complete with 1 sec timeout. + event_sub.wait(1) + if (rc_test_receive != 0): + log_err("Failed to receive events") + + log_debug("run_test_DONE rc_pub={} rc_test_receive={}".format( + rc_pub, rc_test_receive)) + + if (rc_pub != 0): + return rc_pub + + if (rc_test_receive == 0): + return rc_test_receive + + return 0 + + +def main(): + global chk_log_level + + parser=argparse.ArgumentParser( + description="Check events from publish to receive via gNMI") + parser.add_argument('-l', "--loglvl", default=syslog.LOG_ERR, type=int, + help="log level") + parser.add_argument('-n', "--cnt", default=5, type=int, + help="count of events to publish/receive") + args = parser.parse_args() + + chk_log_level = args.loglvl + rc = run_test(args.cnt) + + if(rc == 0): + log_info("eventd test succeeded") + else: + log_err("eventd monit test failed rc={}".format(rc)) + + +if __name__ == "__main__": + main() + + + + + + + diff --git a/src/sonic-eventd/tools/monit_events b/src/sonic-eventd/tools/monit_events new file mode 100644 index 000000000000..d48d8a382a8f --- /dev/null +++ b/src/sonic-eventd/tools/monit_events @@ -0,0 +1,5 @@ +############################################################################### +## Monit configuration for telemetry container +############################################################################### +check program container_eventd with path "/usr/local/bin/events_monit_test.py" + every 5 cycles