Skip to content

Commit

Permalink
[ycabled] fix exception-handling logic for ycabled (sonic-net#306)
Browse files Browse the repository at this point in the history
  • Loading branch information
vdahiya12 authored Nov 11, 2022
1 parent 905874d commit e8c5657
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 69 deletions.
84 changes: 61 additions & 23 deletions sonic-ycabled/tests/test_ycable.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import os
import sys
import time
import traceback

if sys.version_info >= (3, 3):
from unittest.mock import MagicMock, patch
Expand Down Expand Up @@ -40,20 +41,20 @@ def test_ycable_info_helper_class_run(self, mocked_sleep):
with patch('ycable.ycable.platform_sfputil') as patched_util:
patched_util.logical.return_value = ['Ethernet0', 'Ethernet4']
patched_util.get_asic_id_for_logical_port.return_value = 0
Y_cable_state_task = YcableStateUpdateTask()
Y_cable_state_task.task_process = MagicMock()
Y_cable_state_task.task_stopping_event = MagicMock()
y_cable_presence = [True]
stopping_event = MagicMock()
sfp_error_event = MagicMock()
y_cable_presence = [True]
Y_cable_state_task.task_run(sfp_error_event, y_cable_presence)
Y_cable_state_task.task_stop()
Y_cable_task = YcableInfoUpdateTask()
Y_cable_state_task = YcableStateUpdateTask(sfp_error_event, y_cable_presence)
Y_cable_state_task.task_process = MagicMock()
Y_cable_state_task.task_stopping_event = MagicMock()
Y_cable_state_task.start()
Y_cable_state_task.join()
Y_cable_task = YcableInfoUpdateTask(y_cable_presence)
Y_cable_task.task_thread = MagicMock()
Y_cable_task.task_stopping_event = MagicMock()
Y_cable_task.task_stopping_event.is_set = MagicMock()
Y_cable_task.task_run(y_cable_presence)
Y_cable_task.task_stop()
Y_cable_task.start()
Y_cable_task.join()
Y_cable_state_task.task_stopping_event.return_value.is_set.return_value = True
#Y_cable_state_task.task_worker(stopping_event, sfp_error_event, y_cable_presence)
# For now just check if exception is thrown for UT purposes
Expand All @@ -67,19 +68,20 @@ def test_ycable_info_helper_class_run(self, mocked_sleep):
@patch("swsscommon.swsscommon.Select.select", MagicMock())
def test_ycable_helper_class_run_loop(self):
Y_cable_task = YCableTableUpdateTask()
Y_cable_cli_task = YCableCliUpdateTask()
Y_cable_task.task_stopping_event = MagicMock()
Y_cable_cli_task.task_stopping_event = MagicMock()
Y_cable_task.task_thread = MagicMock()
Y_cable_task.task_thread.start = MagicMock()
Y_cable_task.task_thread.join = MagicMock()
Y_cable_task.task_cli_thead = MagicMock()
Y_cable_task.task_cli_thead.start = MagicMock()
Y_cable_task.task_cli_thead.join = MagicMock()
#Y_cable_task.task_stopping_event.return_value.is_set.return_value = False
swsscommon.SubscriberStateTable.return_value.pop.return_value = (True, True, {"read_side": "2"})
Y_cable_task.task_worker()
Y_cable_task.task_cli_worker()
Y_cable_task.task_run()
Y_cable_task.task_stop()
Y_cable_task.start()
Y_cable_task.join()
Y_cable_cli_task.task_cli_worker()
Y_cable_cli_task.start()
Y_cable_cli_task.join()

@patch("swsscommon.swsscommon.Select", MagicMock())
@patch("swsscommon.swsscommon.Select.addSelectable", MagicMock())
Expand All @@ -89,14 +91,10 @@ def test_ycable_helper_class_run(self):
Y_cable_task.task_thread = MagicMock()
Y_cable_task.task_thread.start = MagicMock()
Y_cable_task.task_thread.join = MagicMock()
Y_cable_task.task_cli_thead = MagicMock()
Y_cable_task.task_cli_thead.start = MagicMock()
Y_cable_task.task_cli_thead.join = MagicMock()
Y_cable_task.task_stopping_event.return_value.is_set.return_value = True
Y_cable_task.task_worker()
Y_cable_task.task_cli_worker()
Y_cable_task.task_run()
Y_cable_task.task_stop()
Y_cable_task.start()
Y_cable_task.join()

def test_detect_port_in_error_status(self):

Expand Down Expand Up @@ -291,9 +289,7 @@ def test_DaemonYcable_init_deinit(self):
@patch('ycable.ycable.platform_sfputil', MagicMock())
@patch('ycable.ycable.DaemonYcable.load_platform_util', MagicMock())
@patch('ycable.ycable.YcableInfoUpdateTask', MagicMock())
@patch('ycable.ycable.YcableInfoUpdateTask.task_run', MagicMock())
@patch('ycable.ycable.YcableStateUpdateTask', MagicMock())
@patch('ycable.ycable.YcableStateUpdateTask.task_run', MagicMock())
@patch('ycable.ycable_utilities.y_cable_helper.init_ports_status_for_y_cable', MagicMock())
def test_DaemonYcable_init_deinit_full(self):
ycable = DaemonYcable(SYSLOG_IDENTIFIER)
Expand Down Expand Up @@ -327,3 +323,45 @@ def wait_until(total_wait_time, interval, call_back, *args, **kwargs):
time.sleep(interval)
wait_time += interval
return False


class TestYcableScriptException(object):

@patch("swsscommon.swsscommon.Select", MagicMock(side_effect=NotImplementedError))
@patch("swsscommon.swsscommon.Select.addSelectable", MagicMock(side_effect=NotImplementedError))
@patch("swsscommon.swsscommon.Select.select", MagicMock(side_effect=NotImplementedError))
def test_ycable_helper_class_run_loop_with_exception(self):



Y_cable_cli_task = YCableCliUpdateTask()
expected_exception_start = None
expected_exception_join = None
trace = None
try:
Y_cable_cli_task.start()
Y_cable_cli_task.task_cli_worker()
except Exception as e1:
expected_exception_start = e1
trace = traceback.format_exc()


try:
Y_cable_cli_task.join()
except Exception as e2:
expected_exception_join = e2

"""
#Handy debug Helpers or else use import logging
#f = open("newfile", "w")
#f.write(format(e2))
#f.write(format(m1))
#f.write(trace)
"""

assert(type(expected_exception_start) == type(expected_exception_join))
assert(expected_exception_start.args == expected_exception_join.args)
assert("NotImplementedError" in str(trace) and "effect" in str(trace))
assert("sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py" in str(trace))
assert("swsscommon.Select" in str(trace))

101 changes: 72 additions & 29 deletions sonic-ycabled/ycable/ycable.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
"""

try:
import os
import signal
import sys
import time
import threading
import traceback

from enum import Enum
from sonic_py_common import daemon_base, device_info, logger
Expand Down Expand Up @@ -94,10 +96,14 @@ def handle_state_update_task(port, fvp_dict, y_cable_presence, stopping_event):
# Thread wrapper class to update ycable info periodically


class YcableInfoUpdateTask(object):
def __init__(self):
self.task_thread = None

class YcableInfoUpdateTask(threading.Thread):

def __init__(self, y_cable_presence):
threading.Thread.__init__(self)
self.exc = None
self.task_stopping_event = threading.Event()
self.y_cable_presence = y_cable_presence
self.table_helper = y_cable_table_helper.YcableInfoUpdateTableHelper()


Expand All @@ -122,25 +128,36 @@ def task_worker(self, y_cable_presence):

helper_logger.log_info("Stop DOM monitoring loop")

def task_run(self, y_cable_presence):
def run(self):
if self.task_stopping_event.is_set():
return

self.task_thread = threading.Thread(target=self.task_worker, args=(y_cable_presence,))
self.task_thread.start()
try:
self.task_worker(self.y_cable_presence)
except Exception as e:
helper_logger.log_error("Exception occured at child thread YcableInfoUpdateTask due to {} {}".format(repr(e), traceback.format_exc()))

self.exc = e

def task_stop(self):
self.task_stopping_event.set()
self.task_thread.join()
def join(self):
threading.Thread.join(self)

if self.exc:
raise self.exc

# Process wrapper class to update sfp state info periodically


class YcableStateUpdateTask(object):
def __init__(self):
self.task_process = None


class YcableStateUpdateTask(threading.Thread):
def __init__(self, sfp_error_event, y_cable_presence):
threading.Thread.__init__(self)
self.exc = None
self.task_stopping_event = threading.Event()
self.sfp_insert_events = {}
self.sfp_error_event = sfp_error_event
self.y_cable_presence = y_cable_presence
self.table_helper = y_cable_table_helper.YcableStateUpdateTableHelper()


Expand Down Expand Up @@ -192,18 +209,21 @@ def task_worker(self, stopping_event, sfp_error_event, y_cable_presence):

handle_state_update_task(port, fvp_dict, y_cable_presence, stopping_event)


def task_run(self, sfp_error_event, y_cable_presence):
def run(self):
if self.task_stopping_event.is_set():
return

self.task_process = threading.Thread(target=self.task_worker, args=(
self.task_stopping_event, sfp_error_event, y_cable_presence))
self.task_process.start()
try:
self.task_worker(self.task_stopping_event, self.sfp_error_event, self.y_cable_presence)
except Exception as e:
helper_logger.log_error("Exception occured at child thread YcableStateUpdateTask due to {} {}".format(repr(e), traceback.format_exc()))
self.exc = e

def join(self):
threading.Thread.join(self)

def task_stop(self):
self.task_stopping_event.set()
self.task_process.join()
if self.exc:
raise self.exc

#
# Daemon =======================================================================
Expand All @@ -220,6 +240,7 @@ def __init__(self, log_identifier):
self.sfp_error_event = threading.Event()
self.y_cable_presence = [False]
self.table_helper = y_cable_table_helper.DaemonYcableTableHelper()
self.threads = []

# Signal handler
def signal_handler(self, sig, frame):
Expand Down Expand Up @@ -349,36 +370,58 @@ def run(self):
self.init()

# Start the ycable task update thread
ycable_info_update = YcableInfoUpdateTask()
ycable_info_update.task_run(self.y_cable_presence)
ycable_info_update = YcableInfoUpdateTask(self.y_cable_presence)
ycable_info_update.start()
self.threads.append(ycable_info_update)

# Start the sfp state info update process
ycable_state_update = YcableStateUpdateTask()
ycable_state_update.task_run(self.sfp_error_event, self.y_cable_presence)
ycable_state_update = YcableStateUpdateTask(self.sfp_error_event, self.y_cable_presence)
ycable_state_update.start()
self.threads.append(ycable_state_update)

# Start the Y-cable state info update process if Y cable presence established
y_cable_state_worker_update = None
if self.y_cable_presence[0] is True:
y_cable_state_worker_update = y_cable_helper.YCableTableUpdateTask()
y_cable_state_worker_update.task_run()
y_cable_state_worker_update.start()
self.threads.append(y_cable_state_worker_update)
y_cable_cli_worker_update = y_cable_helper.YCableCliUpdateTask()
y_cable_cli_worker_update.start()
self.threads.append(y_cable_cli_worker_update)

# Start main loop
self.log_info("Start daemon main loop")

while not self.stop_event.wait(self.timeout):
self.log_info("Ycable main loop")
# check all threads are alive
for thread in self.threads:
if thread.is_alive() is False:
try:
thread.join()
except Exception as e:
self.log_error("Exception occured at child thread {} to {}".format(thread.getName(), repr(e)))
self.log_error("thread id {} is not running, exiting main loop".format(thread.getName()))
os.kill(os.getpid(), signal.SIGKILL)

self.log_info("Stop daemon main loop")

self.log_error("Stop daemon main loop")

# Stop the ycable periodic info info update thread
ycable_info_update.task_stop()
if ycable_info_update.is_alive():
ycable_info_update.join()

# Stop the ycable update process
ycable_state_update.task_stop()
if ycable_state_update.is_alive():
ycable_state_update.join()

# Stop the Y-cable state info update process
if self.y_cable_presence[0] is True:
y_cable_state_worker_update.task_stop()
if y_cable_state_worker_update.is_alive():
y_cable_state_worker_update.join()
if y_cable_cli_worker_update.is_alive():
y_cable_cli_worker_update.join()


# Start daemon deinitialization sequence
self.deinit()
Expand Down
Loading

0 comments on commit e8c5657

Please sign in to comment.