From e86e25acd560487141fbe22e93399a23167b12ab Mon Sep 17 00:00:00 2001 From: Zhaohui Sun Date: Thu, 12 Dec 2024 12:15:12 +0000 Subject: [PATCH 01/11] Restart caclmgrd whenever catch exception in child thread or in main thread Signed-off-by: Zhaohui Sun --- scripts/caclmgrd | 206 ++++++++++++++++++++++++++++------------------- 1 file changed, 121 insertions(+), 85 deletions(-) diff --git a/scripts/caclmgrd b/scripts/caclmgrd index 30b166e7..c415992a 100755 --- a/scripts/caclmgrd +++ b/scripts/caclmgrd @@ -17,6 +17,9 @@ try: import sys import threading import time + import traceback + import signal + from queue import Empty, Queue from sonic_py_common.general import getstatusoutput_noshell_pipe from sonic_py_common import daemon_base, device_info, multi_asic from swsscommon import swsscommon @@ -129,6 +132,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): self.update_thread = {} self.lock = {} self.num_changes = {} + self.stop_event = threading.Event() # Initialize update-thread-specific data for default namespace self.update_thread[DEFAULT_NAMESPACE] = None @@ -707,7 +711,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): table_ip_version = 4 # Read DST_PORT info from Config DB, insert it back to ACL_SERVICES - if acl_service == 'EXTERNAL_CLIENT' and "L4_DST_PORT" in rule_props: + if acl_service == 'EXTERNAL_CLIENT':# and "L4_DST_PORT" in rule_props: dst_ports = [rule_props["L4_DST_PORT"]] self.ACL_SERVICES[acl_service]["dst_ports"] = dst_ports elif acl_service == 'EXTERNAL_CLIENT' and "L4_DST_PORT_RANGE" in rule_props: @@ -852,7 +856,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): self.run_commands(dualtor_iptables_cmds) - def check_and_update_control_plane_acls(self, namespace, num_changes): + def check_and_update_control_plane_acls(self, namespace, num_changes, exception_queue): """ This function is intended to be spawned in a separate thread. Its purpose is to prevent unnecessary iptables updates if we receive @@ -869,6 +873,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): while True: # Sleep for our delay interval time.sleep(self.UPDATE_DELAY_SECS) + self.log_info("After delay {}s, checking for ACL table changes in namespace '{}'".format(self.UPDATE_DELAY_SECS, namespace)) with self.lock[namespace]: if self.num_changes[namespace] > num_changes: @@ -890,6 +895,18 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): self.num_changes[namespace] = 0 self.update_thread[namespace] = None return + except Exception as e: + self.log_error("Exception occured at {} thread due to {}".format(threading.current_thread().getName(), repr(e))) + exc_type, exc_value, exc_traceback = sys.exc_info() + msg = traceback.format_exception(exc_type, exc_value, exc_traceback) + for tb_line in msg: + for tb_line_split in tb_line.splitlines(): + self.log_error(tb_line_split) + #self.stop_event.set() + exc_info = traceback.format_exc() + exception_queue.put((namespace, repr(e), exc_info)) # Add the exception to the queue + self.log_error("Exiting thread {}, put it into exception_queue {}".format( + threading.current_thread().getName(), exception_queue)) finally: new_config_db_connector.close("CONFIG_DB") @@ -988,6 +1005,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): # set up state_db connector state_db_connector = swsscommon.DBConnector("STATE_DB", 0) config_db_connector = swsscommon.DBConnector("CONFIG_DB", 0) + exception_queue = Queue() if self.DualToR: self.log_info("Dual ToR mode") @@ -1031,103 +1049,121 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): # Get the ACL rule table seprator acl_rule_table_seprator = subscribe_acl_rule_table.getTableNameSeparator() + try: + # Loop on select to see if any event happen on state db or config db of any namespace + while True: + # Periodically check for exceptions from child threads + try: + namespace, error, _ = exception_queue.get_nowait() # Non-blocking + self.log_error(f"Exception in namespace '{namespace}': {error}") + self.log_error("Detect exception in Child thread {} , generating SIGKILL for main thread".format(self.update_thread[namespace].getName())) + os.kill(os.getpid(), signal.SIGKILL) + except Empty: + # No exceptions in the queue + pass + (state, selectableObj) = sel.select(SELECT_TIMEOUT_MS) + # Continue if select is timeout or selectable object is not return + if state != swsscommon.Select.OBJECT: + continue - # Loop on select to see if any event happen on state db or config db of any namespace - while True: - (state, selectableObj) = sel.select(SELECT_TIMEOUT_MS) - # Continue if select is timeout or selectable object is not return - if state != swsscommon.Select.OBJECT: - continue - - # Get the redisselect object from selectable object - redisSelectObj = swsscommon.CastSelectableToRedisSelectObj(selectableObj) - - # Get the corresponding namespace and db_id from redisselect - namespace = redisSelectObj.getDbConnector().getNamespace() - db_id = redisSelectObj.getDbConnector().getDbId() - - if db_id == state_db_id: - while True: - key, op, fvs = subscribe_bfd_session.pop() - if not key: - break + # Get the redisselect object from selectable object + redisSelectObj = swsscommon.CastSelectableToRedisSelectObj(selectableObj) - if op == 'SET' and not self.bfdAllowed: - self.allow_bfd_protocol(namespace) - self.bfdAllowed = True - sel.removeSelectable(subscribe_bfd_session) + # Get the corresponding namespace and db_id from redisselect + namespace = redisSelectObj.getDbConnector().getNamespace() + db_id = redisSelectObj.getDbConnector().getDbId() - if self.DualToR: - '''dhcp packet mark update''' + if db_id == state_db_id: while True: - key, op, fvs = subscribe_dhcp_packet_mark.pop() + key, op, fvs = subscribe_bfd_session.pop() if not key: break - self.log_info("dhcp packet mark update : '%s'" % str((key, op, fvs))) - '''initial value is None''' - pre_mark = None if key not in dhcp_packet_mark_tbl else dhcp_packet_mark_tbl[key] - cur_mark = None if op == 'DEL' else dict(fvs)['mark'] - dhcp_packet_mark_tbl[key] = cur_mark - self.update_dhcp_acl_for_mark_change(key, pre_mark, cur_mark) + if op == 'SET' and not self.bfdAllowed: + self.allow_bfd_protocol(namespace) + self.bfdAllowed = True + sel.removeSelectable(subscribe_bfd_session) + + if self.DualToR: + '''dhcp packet mark update''' + while True: + key, op, fvs = subscribe_dhcp_packet_mark.pop() + if not key: + break + self.log_info("dhcp packet mark update : '%s'" % str((key, op, fvs))) + + '''initial value is None''' + pre_mark = None if key not in dhcp_packet_mark_tbl else dhcp_packet_mark_tbl[key] + cur_mark = None if op == 'DEL' else dict(fvs)['mark'] + dhcp_packet_mark_tbl[key] = cur_mark + self.update_dhcp_acl_for_mark_change(key, pre_mark, cur_mark) + + '''mux cable update''' + while True: + key, op, fvs = subscribe_mux_cable.pop() + if not key: + break + self.log_info("mux cable update : '%s'" % str((key, op, fvs))) + + mark = None if key not in dhcp_packet_mark_tbl else dhcp_packet_mark_tbl[key] + self.update_dhcp_acl(key, op, dict(fvs), mark) + continue - '''mux cable update''' + if db_id == config_db_id: while True: - key, op, fvs = subscribe_mux_cable.pop() + key, op, fvs = subscribe_vxlan_table.pop() if not key: break - self.log_info("mux cable update : '%s'" % str((key, op, fvs))) + if op == 'SET' and not self.VxlanAllowed: + self.allow_vxlan_port(namespace, fvs) + elif op == 'DEL' and self.VxlanAllowed: + self.block_vxlan_port(namespace) - mark = None if key not in dhcp_packet_mark_tbl else dhcp_packet_mark_tbl[key] - self.update_dhcp_acl(key, op, dict(fvs), mark) - continue + ctrl_plane_acl_notification = set() - if db_id == config_db_id: - while True: - key, op, fvs = subscribe_vxlan_table.pop() - if not key: - break - if op == 'SET' and not self.VxlanAllowed: - self.allow_vxlan_port(namespace, fvs) - elif op == 'DEL' and self.VxlanAllowed: - self.block_vxlan_port(namespace) - - ctrl_plane_acl_notification = set() - - # Pop data of both Subscriber Table object of namespace that got config db acl table event - for table in config_db_subscriber_table_map[namespace]: - while True: - (key, op, fvp) = table.pop() - # Pop of table that does not have data so break - if key == '': - break - # ACL Table notification. We will take Control Plane ACTION for any ACL Table Event - # This can be optimize further but we should not have many acl table set/del events in normal - # scenario - if acl_rule_table_seprator not in key: - ctrl_plane_acl_notification.add(namespace) - # Check ACL Rule notification and make sure Rule point to ACL Table which is Controlplane - else: - acl_table = key.split(acl_rule_table_seprator)[0] - if self.config_db_map[namespace].get_table(self.ACL_TABLE)[acl_table]["type"] == self.ACL_TABLE_TYPE_CTRLPLANE: + # Pop data of both Subscriber Table object of namespace that got config db acl table event + for table in config_db_subscriber_table_map[namespace]: + while True: + (key, op, fvp) = table.pop() + # Pop of table that does not have data so break + if key == '': + break + # ACL Table notification. We will take Control Plane ACTION for any ACL Table Event + # This can be optimize further but we should not have many acl table set/del events in normal + # scenario + if acl_rule_table_seprator not in key: ctrl_plane_acl_notification.add(namespace) - - # Update the Control Plane ACL of the namespace that got config db acl table event - for namespace in ctrl_plane_acl_notification: - with self.lock[namespace]: - if self.num_changes[namespace] == 0: - self.log_info("ACL change detected for namespace '{}'".format(namespace)) - - # Increment the number of change events we've received for this namespace - self.num_changes[namespace] += 1 - - # If an update thread is not already spawned for the namespace which we received - # the ACL table update event, spawn one now - if not self.update_thread[namespace]: - self.log_info("Spawning ACL update thread for namepsace '{}' ...".format(namespace)) - self.update_thread[namespace] = threading.Thread(target=self.check_and_update_control_plane_acls, - args=(namespace, self.num_changes[namespace])) - self.update_thread[namespace].start() + # Check ACL Rule notification and make sure Rule point to ACL Table which is Controlplane + else: + acl_table = key.split(acl_rule_table_seprator)[0] + if self.config_db_map[namespace].get_table(self.ACL_TABLE)[acl_table]["type"] == self.ACL_TABLE_TYPE_CTRLPLANE: + ctrl_plane_acl_notification.add(namespace) + + # Update the Control Plane ACL of the namespace that got config db acl table event + for namespace in ctrl_plane_acl_notification: + with self.lock[namespace]: + if self.num_changes[namespace] == 0: + self.log_info("ACL change detected for namespace '{}'".format(namespace)) + + # Increment the number of change events we've received for this namespace + self.num_changes[namespace] += 1 + + # If an update thread is not already spawned for the namespace which we received + # the ACL table update event, spawn one now + if not self.update_thread[namespace]: + self.log_info("Spawning ACL update thread for namepsace '{}' ...".format(namespace)) + self.update_thread[namespace] = threading.Thread(target=self.check_and_update_control_plane_acls, + args=(namespace, self.num_changes[namespace], exception_queue)) + self.update_thread[namespace].start() + except Exception as e: + self.log_error("Exception occured at main thread due to {}".format(repr(e))) + exc_type, exc_value, exc_traceback = sys.exc_info() + msg = traceback.format_exception(exc_type, exc_value, exc_traceback) + for tb_line in msg: + for tb_line_split in tb_line.splitlines(): + self.log_error(tb_line_split) + self.log_error("Catch exception in main thread, generating SIGKILL for main thread") + os.kill(os.getpid(), signal.SIGKILL) # ============================= Functions ============================= From 31c8af6db4b506f83436e36cd3ab41280a298f44 Mon Sep 17 00:00:00 2001 From: Zhaohui Sun Date: Tue, 17 Dec 2024 06:49:58 +0000 Subject: [PATCH 02/11] remove the commented code Signed-off-by: Zhaohui Sun --- scripts/caclmgrd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/caclmgrd b/scripts/caclmgrd index c415992a..e74d6182 100755 --- a/scripts/caclmgrd +++ b/scripts/caclmgrd @@ -711,7 +711,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): table_ip_version = 4 # Read DST_PORT info from Config DB, insert it back to ACL_SERVICES - if acl_service == 'EXTERNAL_CLIENT':# and "L4_DST_PORT" in rule_props: + if acl_service == 'EXTERNAL_CLIENT' and "L4_DST_PORT" in rule_props: dst_ports = [rule_props["L4_DST_PORT"]] self.ACL_SERVICES[acl_service]["dst_ports"] = dst_ports elif acl_service == 'EXTERNAL_CLIENT' and "L4_DST_PORT_RANGE" in rule_props: From cb5e88b3a143bcdc15022e5bf17efbdc3c988066 Mon Sep 17 00:00:00 2001 From: Zhaohui Sun Date: Tue, 17 Dec 2024 07:38:58 +0000 Subject: [PATCH 03/11] Fix unit test failure Signed-off-by: Zhaohui Sun --- scripts/caclmgrd | 2 -- tests/caclmgrd/caclmgrd_bfd_test.py | 5 ++++- tests/caclmgrd/caclmgrd_scale_test.py | 4 +++- tests/caclmgrd/caclmgrd_test.py | 7 +++++-- tests/caclmgrd/caclmgrd_vxlan_test.py | 4 +++- 5 files changed, 15 insertions(+), 7 deletions(-) diff --git a/scripts/caclmgrd b/scripts/caclmgrd index e74d6182..3033da84 100755 --- a/scripts/caclmgrd +++ b/scripts/caclmgrd @@ -132,7 +132,6 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): self.update_thread = {} self.lock = {} self.num_changes = {} - self.stop_event = threading.Event() # Initialize update-thread-specific data for default namespace self.update_thread[DEFAULT_NAMESPACE] = None @@ -902,7 +901,6 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): for tb_line in msg: for tb_line_split in tb_line.splitlines(): self.log_error(tb_line_split) - #self.stop_event.set() exc_info = traceback.format_exc() exception_queue.put((namespace, repr(e), exc_info)) # Add the exception to the queue self.log_error("Exiting thread {}, put it into exception_queue {}".format( diff --git a/tests/caclmgrd/caclmgrd_bfd_test.py b/tests/caclmgrd/caclmgrd_bfd_test.py index 14da370a..b30e370f 100644 --- a/tests/caclmgrd/caclmgrd_bfd_test.py +++ b/tests/caclmgrd/caclmgrd_bfd_test.py @@ -7,6 +7,8 @@ from unittest import TestCase, mock from pyfakefs.fake_filesystem_unittest import patchfs +from queue import Queue + from .test_bfd_vectors import CACLMGRD_BFD_TEST_VECTOR from tests.common.mock_configdb import MockConfigDb from unittest.mock import MagicMock, patch @@ -51,7 +53,8 @@ def test_caclmgrd_bfd(self, test_name, test_data, fs): caclmgrd_daemon.bfdAllowed = True mocked_subprocess.Popen.reset_mock() caclmgrd_daemon.num_changes[''] = 1 - caclmgrd_daemon.check_and_update_control_plane_acls('', 1) + exception_queue = Queue() + caclmgrd_daemon.check_and_update_control_plane_acls('', 1, exception_queue) #Ensure BFD rules are installed before ip2me rules to avoid traffic loss during update of control plane acl rules bfd_ipv4_idx = 0 diff --git a/tests/caclmgrd/caclmgrd_scale_test.py b/tests/caclmgrd/caclmgrd_scale_test.py index 5769ad7c..414234b4 100644 --- a/tests/caclmgrd/caclmgrd_scale_test.py +++ b/tests/caclmgrd/caclmgrd_scale_test.py @@ -10,6 +10,7 @@ from .test_scale_vectors import CACLMGRD_SCALE_TEST_VECTOR from tests.common.mock_configdb import MockConfigDb from unittest.mock import MagicMock, patch +from queue import Queue DBCONFIG_PATH = '/var/run/redis/sonic-db/database_config.json' @@ -47,5 +48,6 @@ def test_caclmgrd_scale(self, test_name, test_data, fs): caclmgrd_daemon = self.caclmgrd.ControlPlaneAclManager("caclmgrd") caclmgrd_daemon.num_changes[''] = 150 - caclmgrd_daemon.check_and_update_control_plane_acls('', 150) + exception_queue = Queue() + caclmgrd_daemon.check_and_update_control_plane_acls('', 150, exception_queue) mocked_subprocess.Popen.assert_has_calls(test_data["expected_subprocess_calls"], any_order=True) diff --git a/tests/caclmgrd/caclmgrd_test.py b/tests/caclmgrd/caclmgrd_test.py index 13dc6874..5da80ce0 100644 --- a/tests/caclmgrd/caclmgrd_test.py +++ b/tests/caclmgrd/caclmgrd_test.py @@ -5,7 +5,11 @@ from unittest import TestCase, mock from tests.common.mock_configdb import MockConfigDb from sonic_py_common.general import load_module_from_source - +import threading +import time +import sys +import traceback +from queue import Queue class TestCaclmgrd(TestCase): def setUp(self): swsscommon.swsscommon.ConfigDBConnector = MockConfigDb @@ -30,4 +34,3 @@ def test_get_chain_list(self): with mock.patch("caclmgrd.ControlPlaneAclManager.run_commands_pipe") as mock_run_commands_pipe: caclmgrd_daemon.get_chain_list([], ['']) mock_run_commands_pipe.assert_has_calls(expected_calls) - diff --git a/tests/caclmgrd/caclmgrd_vxlan_test.py b/tests/caclmgrd/caclmgrd_vxlan_test.py index 8a3a2028..ac1d1495 100644 --- a/tests/caclmgrd/caclmgrd_vxlan_test.py +++ b/tests/caclmgrd/caclmgrd_vxlan_test.py @@ -10,6 +10,7 @@ from .test_vxlan_vectors import CACLMGRD_VXLAN_TEST_VECTOR from tests.common.mock_configdb import MockConfigDb from unittest.mock import MagicMock, patch +from queue import Queue DBCONFIG_PATH = '/var/run/redis/sonic-db/database_config.json' @@ -58,6 +59,7 @@ def test_caclmgrd_vxlan(self, test_name, test_data, fs): caclmgrd_daemon.allow_vxlan_port('', data) mocked_subprocess.Popen.reset_mock() caclmgrd_daemon.num_changes[''] = 1 - caclmgrd_daemon.check_and_update_control_plane_acls('', 1) + exception_queue = Queue() + caclmgrd_daemon.check_and_update_control_plane_acls('', 1, exception_queue) mocked_subprocess.Popen.assert_has_calls(test_data["expected_add_subprocess_calls"], any_order=True) From d6efce093346885f720543443ac3683adce8ddb0 Mon Sep 17 00:00:00 2001 From: Zhaohui Sun Date: Tue, 17 Dec 2024 08:44:57 +0000 Subject: [PATCH 04/11] Add unit test Signed-off-by: Zhaohui Sun --- tests/caclmgrd/caclmgrd_test.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/tests/caclmgrd/caclmgrd_test.py b/tests/caclmgrd/caclmgrd_test.py index 5da80ce0..fe081201 100644 --- a/tests/caclmgrd/caclmgrd_test.py +++ b/tests/caclmgrd/caclmgrd_test.py @@ -1,14 +1,14 @@ import os import sys import swsscommon -from unittest.mock import call +from unittest.mock import call, patch from unittest import TestCase, mock from tests.common.mock_configdb import MockConfigDb from sonic_py_common.general import load_module_from_source import threading import time import sys -import traceback + from queue import Queue class TestCaclmgrd(TestCase): def setUp(self): @@ -34,3 +34,21 @@ def test_get_chain_list(self): with mock.patch("caclmgrd.ControlPlaneAclManager.run_commands_pipe") as mock_run_commands_pipe: caclmgrd_daemon.get_chain_list([], ['']) mock_run_commands_pipe.assert_has_calls(expected_calls) + + @patch('caclmgrd.ControlPlaneAclManager.update_control_plane_acls') + def test_update_control_plane_acls_exception(self, mock_update): + # Set the side effect to raise an exception + mock_update.side_effect = Exception('Test exception') + # Mock the necessary attributes and methods + manager = self.caclmgrd.ControlPlaneAclManager("caclmgrd") + manager.UPDATE_DELAY_SECS = 1 + manager.lock = {'': threading.Lock()} + manager.num_changes = {'': 0} + manager.update_thread = {'': None} + exception_queue = Queue() + manager.num_changes[''] = 1 + manager.check_and_update_control_plane_acls('', 0, exception_queue) + self.assertFalse(exception_queue.empty()) + exc_info = exception_queue.get() + self.assertEqual(exc_info[0], '') + self.assertIn('Test exception', exc_info[1]) From da50b16de320745ea0fc88eb0911349b9389670e Mon Sep 17 00:00:00 2001 From: Zhaohui Sun Date: Wed, 18 Dec 2024 05:14:08 +0000 Subject: [PATCH 05/11] Add unit test for run function Signed-off-by: Zhaohui Sun --- tests/caclmgrd/caclmgrd_test.py | 203 ++++++++++++++++++++++++++++---- 1 file changed, 183 insertions(+), 20 deletions(-) diff --git a/tests/caclmgrd/caclmgrd_test.py b/tests/caclmgrd/caclmgrd_test.py index fe081201..6d93c147 100644 --- a/tests/caclmgrd/caclmgrd_test.py +++ b/tests/caclmgrd/caclmgrd_test.py @@ -1,15 +1,22 @@ import os import sys import swsscommon -from unittest.mock import call, patch +from unittest.mock import call, patch, MagicMock from unittest import TestCase, mock from tests.common.mock_configdb import MockConfigDb from sonic_py_common.general import load_module_from_source import threading import time import sys - +from .test_basic_vectors import CACLMGRD_BASIC_TEST_VECTOR +from parameterized import parameterized +from pyfakefs.fake_filesystem_unittest import patchfs from queue import Queue + + +DBCONFIG_PATH = "/var/run/redis/sonic-db/database_config.json" + + class TestCaclmgrd(TestCase): def setUp(self): swsscommon.swsscommon.ConfigDBConnector = MockConfigDb @@ -17,38 +24,194 @@ def setUp(self): modules_path = os.path.dirname(test_path) scripts_path = os.path.join(modules_path, "scripts") sys.path.insert(0, modules_path) - caclmgrd_path = os.path.join(scripts_path, 'caclmgrd') - self.caclmgrd = load_module_from_source('caclmgrd', caclmgrd_path) + caclmgrd_path = os.path.join(scripts_path, "caclmgrd") + self.caclmgrd = load_module_from_source("caclmgrd", caclmgrd_path) def test_run_commands_pipe(self): caclmgrd_daemon = self.caclmgrd.ControlPlaneAclManager("caclmgrd") - output = caclmgrd_daemon.run_commands_pipe(['echo', 'caclmgrd'], ['awk', '{print $1}']) - assert output == 'caclmgrd' + output = caclmgrd_daemon.run_commands_pipe( + ["echo", "caclmgrd"], ["awk", "{print $1}"] + ) + assert output == "caclmgrd" - output = caclmgrd_daemon.run_commands_pipe([sys.executable, "-c", "import sys; sys.exit(6)"], [sys.executable, "-c", "import sys; sys.exit(8)"]) - assert output == '' + output = caclmgrd_daemon.run_commands_pipe( + [sys.executable, "-c", "import sys; sys.exit(6)"], + [sys.executable, "-c", "import sys; sys.exit(8)"], + ) + assert output == "" def test_get_chain_list(self): - expected_calls = [call(['iptables', '-L', '-v', '-n'], ['grep', 'Chain'], ['awk', '{print $2}'])] + expected_calls = [ + call( + ["iptables", "-L", "-v", "-n"], ["grep", "Chain"], ["awk", "{print $2}"] + ) + ] caclmgrd_daemon = self.caclmgrd.ControlPlaneAclManager("caclmgrd") - with mock.patch("caclmgrd.ControlPlaneAclManager.run_commands_pipe") as mock_run_commands_pipe: - caclmgrd_daemon.get_chain_list([], ['']) + with mock.patch( + "caclmgrd.ControlPlaneAclManager.run_commands_pipe" + ) as mock_run_commands_pipe: + caclmgrd_daemon.get_chain_list([], [""]) mock_run_commands_pipe.assert_has_calls(expected_calls) - @patch('caclmgrd.ControlPlaneAclManager.update_control_plane_acls') + @patch("caclmgrd.ControlPlaneAclManager.update_control_plane_acls") def test_update_control_plane_acls_exception(self, mock_update): # Set the side effect to raise an exception - mock_update.side_effect = Exception('Test exception') + mock_update.side_effect = Exception("Test exception") # Mock the necessary attributes and methods manager = self.caclmgrd.ControlPlaneAclManager("caclmgrd") manager.UPDATE_DELAY_SECS = 1 - manager.lock = {'': threading.Lock()} - manager.num_changes = {'': 0} - manager.update_thread = {'': None} + manager.lock = {"": threading.Lock()} + manager.num_changes = {"": 0} + manager.update_thread = {"": None} exception_queue = Queue() - manager.num_changes[''] = 1 - manager.check_and_update_control_plane_acls('', 0, exception_queue) + manager.num_changes[""] = 1 + manager.check_and_update_control_plane_acls("", 0, exception_queue) self.assertFalse(exception_queue.empty()) exc_info = exception_queue.get() - self.assertEqual(exc_info[0], '') - self.assertIn('Test exception', exc_info[1]) + self.assertEqual(exc_info[0], "") + self.assertIn("Test exception", exc_info[1]) + + @patch("caclmgrd.swsscommon") + @patch("os.geteuid", return_value=0) + @patch("os.kill") + @patch("signal.SIGKILL", return_value=9) + @patch("sys.exit") + @patch("traceback.format_exception") + def test_run( + self, + mock_format_exception, + mock_exit, + mock_sigkill, + mock_kill, + mock_geteuid, + mock_swsscommon, + ): + # Setup + mock_select_instance = MagicMock() + mock_Select.return_value = mock_select_instance + + mock_getDbId.side_effect = lambda db_name: {"STATE_DB": 1, "CONFIG_DB": 2}.get( + db_name, 0 + ) + + mock_db_connector = MagicMock() + mock_DBConnector.return_value = mock_db_connector + + exception_queue = Queue() + + # Mock self object and its attributes + self_mock = MagicMock() + self_mock.DualToR = True + self_mock.bfdAllowed = False + self_mock.VxlanAllowed = False + self_mock.config_db_map = {"DEFAULT_NAMESPACE": MagicMock()} + self_mock.lock = {"DEFAULT_NAMESPACE": threading.Lock()} + self_mock.num_changes = {"DEFAULT_NAMESPACE": 0} + self_mock.update_thread = {"DEFAULT_NAMESPACE": None} + self_mock.MUX_CABLE_TABLE = "MUX_CABLE_TABLE" + self_mock.BFD_SESSION_TABLE = "BFD_SESSION_TABLE" + self_mock.VXLAN_TUNNEL_TABLE = "VXLAN_TUNNEL_TABLE" + self_mock.ACL_TABLE = "ACL_TABLE" + self_mock.ACL_TABLE_TYPE_CTRLPLANE = "CTRLPLANE" + + # Define expected behavior for self.log_error and self.log_info + self_mock.log_error = MagicMock() + self_mock.log_info = MagicMock() + + # Mock traceback formatting + mock_format_exception.return_value = ["Traceback line 1", "Traceback line 2"] + + # Execute the `run` function + try: + TestRunFunction.run(self_mock) + except SystemExit: + pass # Ignore the SystemExit exception + + # Assertions + self_mock.log_info.assert_called_with("Starting up ...") + mock_DBConnector.assert_any_call("STATE_DB", 0) + mock_DBConnector.assert_any_call("CONFIG_DB", 0) + self_mock.log_error.assert_not_called_with("Must be root to run this daemon") + mock_initializeGlobalConfig.assert_called_once() + mock_Select.assert_called_once() + + # Ensure signals were set correctly during exception + if not exception_queue.empty(): + self_mock.log_error.assert_called() + mock_kill.assert_called_once_with(os.getpid(), signal.SIGKILL) + + # Validate exit handling + mock_exit.assert_not_called() + + @patch("caclmgrd.swsscommon") + @patch("os.geteuid", return_value=0) + @patch("os.kill") + @patch("signal.SIGKILL", return_value=9) + @patch("sys.exit") + @patch("traceback.format_exception") + def test_run( + self, + mock_format_exception, + mock_exit, + mock_sigkill, + mock_kill, + mock_geteuid, + mock_swsscommon, + ): + mock_swsscommon.SonicDBConfig.getDbId.side_effect = lambda db_name: ( + 0 if db_name == "STATE_DB" else 1 + ) + mock_swsscommon.Select.OBJECT = 1 + mock_swsscommon.Select.return_value.select.return_value = ( + mock_swsscommon.Select.OBJECT, + MagicMock(), + ) + mock_swsscommon.SubscriberStateTable.return_value.select.return_value = ( + mock_swsscommon.Select.OBJECT, + MagicMock(), + ) + mock_swsscommon.CastSelectableToRedisSelectObj.return_value.getDbConnector.return_value.getNamespace.return_value = ( + "default" + ) + mock_swsscommon.CastSelectableToRedisSelectObj.return_value.getDbConnector.return_value.getDbId.side_effect = ( + lambda: 0 + ) + + # Creating an instance of ControlPlaneAclManager + manager = self.caclmgrd.ControlPlaneAclManager("caclmgrd") + + # Setting necessary attributes + manager.DualToR = True + manager.iptables_cmd_ns_prefix = {"": []} + manager.lock = {"": threading.Lock()} + manager.num_changes = {"": 0} + manager.update_thread = {"": None} + manager.bfdAllowed = False + manager.VxlanAllowed = False + manager.VxlanSrcIP = "" + manager.MUX_CABLE_TABLE = "MUX_CABLE_TABLE" + manager.BFD_SESSION_TABLE = "BFD_SESSION_TABLE" + manager.VXLAN_TUNNEL_TABLE = "VXLAN_TUNNEL_TABLE" + manager.ACL_TABLE = "ACL_TABLE" + manager.ACL_RULE = "ACL_RULE" + manager.ACL_TABLE_TYPE_CTRLPLANE = "CTRLPLANE" + + # Mocking methods + manager.update_control_plane_acls = MagicMock() + manager.allow_bfd_protocol = MagicMock() + manager.allow_vxlan_port = MagicMock() + manager.block_vxlan_port = MagicMock() + manager.update_dhcp_acl_for_mark_change = MagicMock() + manager.update_dhcp_acl = MagicMock() + manager.setup_dhcp_chain = MagicMock() + + manager.run() + + # Asserting the method calls + manager.update_control_plane_acls.assert_called() + manager.allow_bfd_protocol.assert_not_called() + manager.allow_vxlan_port.assert_not_called() + manager.block_vxlan_port.assert_not_called() + manager.update_dhcp_acl_for_mark_change.assert_not_called() + manager.update_dhcp_acl.assert_not_called() + manager.setup_dhcp_chain.assert_called() From 83f8b74a2dbc5d5e94a53009dbbb2aa0efebbf95 Mon Sep 17 00:00:00 2001 From: Zhaohui Sun Date: Wed, 18 Dec 2024 05:24:45 +0000 Subject: [PATCH 06/11] Add unit test for run function Signed-off-by: Zhaohui Sun --- tests/caclmgrd/caclmgrd_test.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/caclmgrd/caclmgrd_test.py b/tests/caclmgrd/caclmgrd_test.py index 6d93c147..c7610e09 100644 --- a/tests/caclmgrd/caclmgrd_test.py +++ b/tests/caclmgrd/caclmgrd_test.py @@ -6,11 +6,7 @@ from tests.common.mock_configdb import MockConfigDb from sonic_py_common.general import load_module_from_source import threading -import time import sys -from .test_basic_vectors import CACLMGRD_BASIC_TEST_VECTOR -from parameterized import parameterized -from pyfakefs.fake_filesystem_unittest import patchfs from queue import Queue From 8e5f5db36e2a14167055ea5f2f7147570311d95f Mon Sep 17 00:00:00 2001 From: Zhaohui Sun Date: Wed, 18 Dec 2024 08:03:50 +0000 Subject: [PATCH 07/11] fix unit test Signed-off-by: Zhaohui Sun --- tests/caclmgrd/caclmgrd_test.py | 116 ++++++++++---------------------- 1 file changed, 34 insertions(+), 82 deletions(-) diff --git a/tests/caclmgrd/caclmgrd_test.py b/tests/caclmgrd/caclmgrd_test.py index c7610e09..042a69c0 100644 --- a/tests/caclmgrd/caclmgrd_test.py +++ b/tests/caclmgrd/caclmgrd_test.py @@ -67,77 +67,6 @@ def test_update_control_plane_acls_exception(self, mock_update): self.assertEqual(exc_info[0], "") self.assertIn("Test exception", exc_info[1]) - @patch("caclmgrd.swsscommon") - @patch("os.geteuid", return_value=0) - @patch("os.kill") - @patch("signal.SIGKILL", return_value=9) - @patch("sys.exit") - @patch("traceback.format_exception") - def test_run( - self, - mock_format_exception, - mock_exit, - mock_sigkill, - mock_kill, - mock_geteuid, - mock_swsscommon, - ): - # Setup - mock_select_instance = MagicMock() - mock_Select.return_value = mock_select_instance - - mock_getDbId.side_effect = lambda db_name: {"STATE_DB": 1, "CONFIG_DB": 2}.get( - db_name, 0 - ) - - mock_db_connector = MagicMock() - mock_DBConnector.return_value = mock_db_connector - - exception_queue = Queue() - - # Mock self object and its attributes - self_mock = MagicMock() - self_mock.DualToR = True - self_mock.bfdAllowed = False - self_mock.VxlanAllowed = False - self_mock.config_db_map = {"DEFAULT_NAMESPACE": MagicMock()} - self_mock.lock = {"DEFAULT_NAMESPACE": threading.Lock()} - self_mock.num_changes = {"DEFAULT_NAMESPACE": 0} - self_mock.update_thread = {"DEFAULT_NAMESPACE": None} - self_mock.MUX_CABLE_TABLE = "MUX_CABLE_TABLE" - self_mock.BFD_SESSION_TABLE = "BFD_SESSION_TABLE" - self_mock.VXLAN_TUNNEL_TABLE = "VXLAN_TUNNEL_TABLE" - self_mock.ACL_TABLE = "ACL_TABLE" - self_mock.ACL_TABLE_TYPE_CTRLPLANE = "CTRLPLANE" - - # Define expected behavior for self.log_error and self.log_info - self_mock.log_error = MagicMock() - self_mock.log_info = MagicMock() - - # Mock traceback formatting - mock_format_exception.return_value = ["Traceback line 1", "Traceback line 2"] - - # Execute the `run` function - try: - TestRunFunction.run(self_mock) - except SystemExit: - pass # Ignore the SystemExit exception - - # Assertions - self_mock.log_info.assert_called_with("Starting up ...") - mock_DBConnector.assert_any_call("STATE_DB", 0) - mock_DBConnector.assert_any_call("CONFIG_DB", 0) - self_mock.log_error.assert_not_called_with("Must be root to run this daemon") - mock_initializeGlobalConfig.assert_called_once() - mock_Select.assert_called_once() - - # Ensure signals were set correctly during exception - if not exception_queue.empty(): - self_mock.log_error.assert_called() - mock_kill.assert_called_once_with(os.getpid(), signal.SIGKILL) - - # Validate exit handling - mock_exit.assert_not_called() @patch("caclmgrd.swsscommon") @patch("os.geteuid", return_value=0) @@ -155,35 +84,57 @@ def test_run( mock_swsscommon, ): mock_swsscommon.SonicDBConfig.getDbId.side_effect = lambda db_name: ( - 0 if db_name == "STATE_DB" else 1 + 6 if db_name == "STATE_DB" else 1 ) + mock_state_db_connector = MagicMock() + mock_config_db_connector = MagicMock() + mock_swsscommon.DBConnector.side_effect = [mock_state_db_connector, mock_config_db_connector, mock_state_db_connector] mock_swsscommon.Select.OBJECT = 1 mock_swsscommon.Select.return_value.select.return_value = ( mock_swsscommon.Select.OBJECT, MagicMock(), ) + mock_swsscommon.Select.return_value.removeSelectable.return_value = MagicMock() + mock_swsscommon.SubscriberStateTable.return_value.select.return_value = ( mock_swsscommon.Select.OBJECT, MagicMock(), ) - mock_swsscommon.CastSelectableToRedisSelectObj.return_value.getDbConnector.return_value.getNamespace.return_value = ( - "default" - ) - mock_swsscommon.CastSelectableToRedisSelectObj.return_value.getDbConnector.return_value.getDbId.side_effect = ( - lambda: 0 - ) + pop_values = [ + ("key1", "SET", [("mark", "0x11"), ("field2", "value2")]), + (None, None, None), + ("key2", "DEL", []), + (None, None, None), + ("key3", "SET", [("mark", "0x11")]), + (None, None, None), + ("key4", "DEL", []), + (None, None, None), + ("key5", "SET", [("mark", "0x11"), ("field4", "value4")]), + (None, None, None), + ("key6", "SET", [("mark", "0x11"), ("field5", "value5")]), + (None, None, None), + ("key7", "SET", [("mark", "0x11"), ("field6", "value6")]), + (None, None, None), + ] + mock_swsscommon.SubscriberStateTable.return_value.pop.side_effect = pop_values + mock_swsscommon.CastSelectableToRedisSelectObj.return_value.getDbConnector.return_value.getNamespace.return_value = "" + mock_swsscommon.CastSelectableToRedisSelectObj.return_value.getDbConnector.return_value.getDbId.return_value = 6 # Creating an instance of ControlPlaneAclManager + self.caclmgrd.ControlPlaneAclManager.get_namespace_mgmt_ip = MagicMock() + self.caclmgrd.ControlPlaneAclManager.get_namespace_mgmt_ipv6 = MagicMock() manager = self.caclmgrd.ControlPlaneAclManager("caclmgrd") # Setting necessary attributes + manager.log_info = MagicMock() + manager.log_error = MagicMock() manager.DualToR = True manager.iptables_cmd_ns_prefix = {"": []} manager.lock = {"": threading.Lock()} manager.num_changes = {"": 0} manager.update_thread = {"": None} manager.bfdAllowed = False - manager.VxlanAllowed = False + manager.VxlanAllowed = True manager.VxlanSrcIP = "" manager.MUX_CABLE_TABLE = "MUX_CABLE_TABLE" manager.BFD_SESSION_TABLE = "BFD_SESSION_TABLE" @@ -193,6 +144,7 @@ def test_run( manager.ACL_TABLE_TYPE_CTRLPLANE = "CTRLPLANE" # Mocking methods + manager.removeSelectable = MagicMock() manager.update_control_plane_acls = MagicMock() manager.allow_bfd_protocol = MagicMock() manager.allow_vxlan_port = MagicMock() @@ -205,9 +157,9 @@ def test_run( # Asserting the method calls manager.update_control_plane_acls.assert_called() - manager.allow_bfd_protocol.assert_not_called() + manager.allow_bfd_protocol.assert_called() manager.allow_vxlan_port.assert_not_called() manager.block_vxlan_port.assert_not_called() - manager.update_dhcp_acl_for_mark_change.assert_not_called() - manager.update_dhcp_acl.assert_not_called() + manager.update_dhcp_acl_for_mark_change.assert_called() + manager.update_dhcp_acl.assert_called() manager.setup_dhcp_chain.assert_called() From 821ae806113615b1a367021ab823a62fa2d93a0d Mon Sep 17 00:00:00 2001 From: Zhaohui Sun Date: Wed, 18 Dec 2024 09:15:21 +0000 Subject: [PATCH 08/11] increase test coverage Signed-off-by: Zhaohui Sun --- tests/caclmgrd/caclmgrd_test.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/tests/caclmgrd/caclmgrd_test.py b/tests/caclmgrd/caclmgrd_test.py index 042a69c0..ecdd1c79 100644 --- a/tests/caclmgrd/caclmgrd_test.py +++ b/tests/caclmgrd/caclmgrd_test.py @@ -100,6 +100,7 @@ def test_run( mock_swsscommon.Select.OBJECT, MagicMock(), ) + mock_swsscommon.SubscriberStateTable.return_value.getTableNameSeparator.return_value = "|" pop_values = [ ("key1", "SET", [("mark", "0x11"), ("field2", "value2")]), (None, None, None), @@ -107,19 +108,19 @@ def test_run( (None, None, None), ("key3", "SET", [("mark", "0x11")]), (None, None, None), - ("key4", "DEL", []), - (None, None, None), + ("SSH_ONLY", "SET", (('policy_desc', 'SSH_ONLY'), ('services', 'SSH'), ('stage', 'ingress'), ('type', 'CTRLPLANE'))), + ('', None, None), ("key5", "SET", [("mark", "0x11"), ("field4", "value4")]), - (None, None, None), + ('', None, None), ("key6", "SET", [("mark", "0x11"), ("field5", "value5")]), (None, None, None), - ("key7", "SET", [("mark", "0x11"), ("field6", "value6")]), - (None, None, None), + ("SSH_ONLY", "SET", (('policy_desc', 'SSH_ONLY'), ('services', 'SSH'), ('stage', 'ingress'), ('type', 'CTRLPLANE'))), + ('', None, None), ] mock_swsscommon.SubscriberStateTable.return_value.pop.side_effect = pop_values mock_swsscommon.CastSelectableToRedisSelectObj.return_value.getDbConnector.return_value.getNamespace.return_value = "" - mock_swsscommon.CastSelectableToRedisSelectObj.return_value.getDbConnector.return_value.getDbId.return_value = 6 + mock_swsscommon.CastSelectableToRedisSelectObj.return_value.getDbConnector.return_value.getDbId.side_effect = [6, 4, 4] # Creating an instance of ControlPlaneAclManager self.caclmgrd.ControlPlaneAclManager.get_namespace_mgmt_ip = MagicMock() self.caclmgrd.ControlPlaneAclManager.get_namespace_mgmt_ipv6 = MagicMock() @@ -131,7 +132,7 @@ def test_run( manager.DualToR = True manager.iptables_cmd_ns_prefix = {"": []} manager.lock = {"": threading.Lock()} - manager.num_changes = {"": 0} + manager.num_changes = {"": 2} manager.update_thread = {"": None} manager.bfdAllowed = False manager.VxlanAllowed = True From 16776fb8649a74881d5c96a881e7c440a91ebfb3 Mon Sep 17 00:00:00 2001 From: Zhaohui Sun Date: Mon, 23 Dec 2024 09:43:27 +0000 Subject: [PATCH 09/11] Address comment Signed-off-by: Zhaohui Sun --- scripts/caclmgrd | 209 +++++++++++++++----------------- tests/caclmgrd/caclmgrd_test.py | 13 +- 2 files changed, 108 insertions(+), 114 deletions(-) diff --git a/scripts/caclmgrd b/scripts/caclmgrd index 3033da84..ef8c66f8 100755 --- a/scripts/caclmgrd +++ b/scripts/caclmgrd @@ -872,7 +872,6 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): while True: # Sleep for our delay interval time.sleep(self.UPDATE_DELAY_SECS) - self.log_info("After delay {}s, checking for ACL table changes in namespace '{}'".format(self.UPDATE_DELAY_SECS, namespace)) with self.lock[namespace]: if self.num_changes[namespace] > num_changes: @@ -897,14 +896,8 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): except Exception as e: self.log_error("Exception occured at {} thread due to {}".format(threading.current_thread().getName(), repr(e))) exc_type, exc_value, exc_traceback = sys.exc_info() - msg = traceback.format_exception(exc_type, exc_value, exc_traceback) - for tb_line in msg: - for tb_line_split in tb_line.splitlines(): - self.log_error(tb_line_split) - exc_info = traceback.format_exc() - exception_queue.put((namespace, repr(e), exc_info)) # Add the exception to the queue - self.log_error("Exiting thread {}, put it into exception_queue {}".format( - threading.current_thread().getName(), exception_queue)) + full_traceback = traceback.format_exception(exc_type, exc_value, exc_traceback) + exception_queue.put((namespace, repr(e), full_traceback)) # Add the exception to the queue finally: new_config_db_connector.close("CONFIG_DB") @@ -1047,121 +1040,117 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): # Get the ACL rule table seprator acl_rule_table_seprator = subscribe_acl_rule_table.getTableNameSeparator() - try: - # Loop on select to see if any event happen on state db or config db of any namespace - while True: - # Periodically check for exceptions from child threads - try: - namespace, error, _ = exception_queue.get_nowait() # Non-blocking - self.log_error(f"Exception in namespace '{namespace}': {error}") - self.log_error("Detect exception in Child thread {} , generating SIGKILL for main thread".format(self.update_thread[namespace].getName())) - os.kill(os.getpid(), signal.SIGKILL) - except Empty: - # No exceptions in the queue - pass - (state, selectableObj) = sel.select(SELECT_TIMEOUT_MS) - # Continue if select is timeout or selectable object is not return - if state != swsscommon.Select.OBJECT: - continue - # Get the redisselect object from selectable object - redisSelectObj = swsscommon.CastSelectableToRedisSelectObj(selectableObj) + # Loop on select to see if any event happen on state db or config db of any namespace + while True: + # Periodically check for exceptions from child threads + try: + namespace, error, exc_info = exception_queue.get_nowait() # Non-blocking + self.log_error("Exception in namespace '{}': {}".format(namespace, error)) + self.log_error("Full traceback from child thread:") + import pdb; pdb.set_trace() + for tb_line in exc_info: + for tb_line_split in tb_line.splitlines(): + self.log_error(tb_line_split) + self.log_error("Detect exception in Child thread {} , generating SIGKILL for main thread".format(self.update_thread[namespace].getName())) + os.kill(os.getpid(), signal.SIGKILL) + except Empty: + # No exceptions in the queue + pass + (state, selectableObj) = sel.select(SELECT_TIMEOUT_MS) + # Continue if select is timeout or selectable object is not return + if state != swsscommon.Select.OBJECT: + continue + + # Get the redisselect object from selectable object + redisSelectObj = swsscommon.CastSelectableToRedisSelectObj(selectableObj) - # Get the corresponding namespace and db_id from redisselect - namespace = redisSelectObj.getDbConnector().getNamespace() - db_id = redisSelectObj.getDbConnector().getDbId() + # Get the corresponding namespace and db_id from redisselect + namespace = redisSelectObj.getDbConnector().getNamespace() + db_id = redisSelectObj.getDbConnector().getDbId() - if db_id == state_db_id: + if db_id == state_db_id: + while True: + key, op, fvs = subscribe_bfd_session.pop() + if not key: + break + + if op == 'SET' and not self.bfdAllowed: + self.allow_bfd_protocol(namespace) + self.bfdAllowed = True + sel.removeSelectable(subscribe_bfd_session) + + if self.DualToR: + '''dhcp packet mark update''' while True: - key, op, fvs = subscribe_bfd_session.pop() + key, op, fvs = subscribe_dhcp_packet_mark.pop() if not key: break + self.log_info("dhcp packet mark update : '%s'" % str((key, op, fvs))) - if op == 'SET' and not self.bfdAllowed: - self.allow_bfd_protocol(namespace) - self.bfdAllowed = True - sel.removeSelectable(subscribe_bfd_session) - - if self.DualToR: - '''dhcp packet mark update''' - while True: - key, op, fvs = subscribe_dhcp_packet_mark.pop() - if not key: - break - self.log_info("dhcp packet mark update : '%s'" % str((key, op, fvs))) - - '''initial value is None''' - pre_mark = None if key not in dhcp_packet_mark_tbl else dhcp_packet_mark_tbl[key] - cur_mark = None if op == 'DEL' else dict(fvs)['mark'] - dhcp_packet_mark_tbl[key] = cur_mark - self.update_dhcp_acl_for_mark_change(key, pre_mark, cur_mark) - - '''mux cable update''' - while True: - key, op, fvs = subscribe_mux_cable.pop() - if not key: - break - self.log_info("mux cable update : '%s'" % str((key, op, fvs))) - - mark = None if key not in dhcp_packet_mark_tbl else dhcp_packet_mark_tbl[key] - self.update_dhcp_acl(key, op, dict(fvs), mark) - continue + '''initial value is None''' + pre_mark = None if key not in dhcp_packet_mark_tbl else dhcp_packet_mark_tbl[key] + cur_mark = None if op == 'DEL' else dict(fvs)['mark'] + dhcp_packet_mark_tbl[key] = cur_mark + self.update_dhcp_acl_for_mark_change(key, pre_mark, cur_mark) - if db_id == config_db_id: + '''mux cable update''' while True: - key, op, fvs = subscribe_vxlan_table.pop() + key, op, fvs = subscribe_mux_cable.pop() if not key: break - if op == 'SET' and not self.VxlanAllowed: - self.allow_vxlan_port(namespace, fvs) - elif op == 'DEL' and self.VxlanAllowed: - self.block_vxlan_port(namespace) + self.log_info("mux cable update : '%s'" % str((key, op, fvs))) - ctrl_plane_acl_notification = set() + mark = None if key not in dhcp_packet_mark_tbl else dhcp_packet_mark_tbl[key] + self.update_dhcp_acl(key, op, dict(fvs), mark) + continue - # Pop data of both Subscriber Table object of namespace that got config db acl table event - for table in config_db_subscriber_table_map[namespace]: - while True: - (key, op, fvp) = table.pop() - # Pop of table that does not have data so break - if key == '': - break - # ACL Table notification. We will take Control Plane ACTION for any ACL Table Event - # This can be optimize further but we should not have many acl table set/del events in normal - # scenario - if acl_rule_table_seprator not in key: + if db_id == config_db_id: + while True: + key, op, fvs = subscribe_vxlan_table.pop() + if not key: + break + if op == 'SET' and not self.VxlanAllowed: + self.allow_vxlan_port(namespace, fvs) + elif op == 'DEL' and self.VxlanAllowed: + self.block_vxlan_port(namespace) + + ctrl_plane_acl_notification = set() + + # Pop data of both Subscriber Table object of namespace that got config db acl table event + for table in config_db_subscriber_table_map[namespace]: + while True: + (key, op, fvp) = table.pop() + # Pop of table that does not have data so break + if key == '': + break + # ACL Table notification. We will take Control Plane ACTION for any ACL Table Event + # This can be optimize further but we should not have many acl table set/del events in normal + # scenario + if acl_rule_table_seprator not in key: + ctrl_plane_acl_notification.add(namespace) + # Check ACL Rule notification and make sure Rule point to ACL Table which is Controlplane + else: + acl_table = key.split(acl_rule_table_seprator)[0] + if self.config_db_map[namespace].get_table(self.ACL_TABLE)[acl_table]["type"] == self.ACL_TABLE_TYPE_CTRLPLANE: ctrl_plane_acl_notification.add(namespace) - # Check ACL Rule notification and make sure Rule point to ACL Table which is Controlplane - else: - acl_table = key.split(acl_rule_table_seprator)[0] - if self.config_db_map[namespace].get_table(self.ACL_TABLE)[acl_table]["type"] == self.ACL_TABLE_TYPE_CTRLPLANE: - ctrl_plane_acl_notification.add(namespace) - - # Update the Control Plane ACL of the namespace that got config db acl table event - for namespace in ctrl_plane_acl_notification: - with self.lock[namespace]: - if self.num_changes[namespace] == 0: - self.log_info("ACL change detected for namespace '{}'".format(namespace)) - - # Increment the number of change events we've received for this namespace - self.num_changes[namespace] += 1 - - # If an update thread is not already spawned for the namespace which we received - # the ACL table update event, spawn one now - if not self.update_thread[namespace]: - self.log_info("Spawning ACL update thread for namepsace '{}' ...".format(namespace)) - self.update_thread[namespace] = threading.Thread(target=self.check_and_update_control_plane_acls, - args=(namespace, self.num_changes[namespace], exception_queue)) - self.update_thread[namespace].start() - except Exception as e: - self.log_error("Exception occured at main thread due to {}".format(repr(e))) - exc_type, exc_value, exc_traceback = sys.exc_info() - msg = traceback.format_exception(exc_type, exc_value, exc_traceback) - for tb_line in msg: - for tb_line_split in tb_line.splitlines(): - self.log_error(tb_line_split) - self.log_error("Catch exception in main thread, generating SIGKILL for main thread") - os.kill(os.getpid(), signal.SIGKILL) + + # Update the Control Plane ACL of the namespace that got config db acl table event + for namespace in ctrl_plane_acl_notification: + with self.lock[namespace]: + if self.num_changes[namespace] == 0: + self.log_info("ACL change detected for namespace '{}'".format(namespace)) + + # Increment the number of change events we've received for this namespace + self.num_changes[namespace] += 1 + + # If an update thread is not already spawned for the namespace which we received + # the ACL table update event, spawn one now + if not self.update_thread[namespace]: + self.log_info("Spawning ACL update thread for namepsace '{}' ...".format(namespace)) + self.update_thread[namespace] = threading.Thread(target=self.check_and_update_control_plane_acls, + args=(namespace, self.num_changes[namespace], exception_queue)) + self.update_thread[namespace].start() # ============================= Functions ============================= diff --git a/tests/caclmgrd/caclmgrd_test.py b/tests/caclmgrd/caclmgrd_test.py index ecdd1c79..443b82d1 100644 --- a/tests/caclmgrd/caclmgrd_test.py +++ b/tests/caclmgrd/caclmgrd_test.py @@ -8,7 +8,7 @@ import threading import sys from queue import Queue - +import queue DBCONFIG_PATH = "/var/run/redis/sonic-db/database_config.json" @@ -113,7 +113,9 @@ def test_run( ("key5", "SET", [("mark", "0x11"), ("field4", "value4")]), ('', None, None), ("key6", "SET", [("mark", "0x11"), ("field5", "value5")]), - (None, None, None), + ('', None, None), + ("SSH_ONLY", "SET", (('policy_desc', 'SSH_ONLY'), ('services', 'SSH'), ('stage', 'ingress'), ('type', 'CTRLPLANE'))), + ('', None, None), ("SSH_ONLY", "SET", (('policy_desc', 'SSH_ONLY'), ('services', 'SSH'), ('stage', 'ingress'), ('type', 'CTRLPLANE'))), ('', None, None), ] @@ -153,8 +155,11 @@ def test_run( manager.update_dhcp_acl_for_mark_change = MagicMock() manager.update_dhcp_acl = MagicMock() manager.setup_dhcp_chain = MagicMock() - - manager.run() + try: + manager.run() + except StopIteration as e: + # This is expected to happen + pass # Asserting the method calls manager.update_control_plane_acls.assert_called() From 027b0163e33640276f7f29821533bd90d978ef32 Mon Sep 17 00:00:00 2001 From: Zhaohui Sun Date: Tue, 24 Dec 2024 04:09:47 +0000 Subject: [PATCH 10/11] Increase unit test coverage Signed-off-by: Zhaohui Sun --- scripts/caclmgrd | 11 ++- tests/caclmgrd/caclmgrd_bfd_test.py | 3 +- tests/caclmgrd/caclmgrd_scale_test.py | 3 +- tests/caclmgrd/caclmgrd_test.py | 115 +++++++++++++++++++++++++- tests/caclmgrd/caclmgrd_vxlan_test.py | 4 +- 5 files changed, 120 insertions(+), 16 deletions(-) diff --git a/scripts/caclmgrd b/scripts/caclmgrd index ef8c66f8..090ddcb9 100755 --- a/scripts/caclmgrd +++ b/scripts/caclmgrd @@ -137,6 +137,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): self.update_thread[DEFAULT_NAMESPACE] = None self.lock[DEFAULT_NAMESPACE] = threading.Lock() self.num_changes[DEFAULT_NAMESPACE] = 0 + self.exception_queue = Queue() if device_info.is_multi_npu(): swsscommon.SonicDBConfig.load_sonic_global_db_config() @@ -855,7 +856,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): self.run_commands(dualtor_iptables_cmds) - def check_and_update_control_plane_acls(self, namespace, num_changes, exception_queue): + def check_and_update_control_plane_acls(self, namespace, num_changes): """ This function is intended to be spawned in a separate thread. Its purpose is to prevent unnecessary iptables updates if we receive @@ -897,7 +898,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): self.log_error("Exception occured at {} thread due to {}".format(threading.current_thread().getName(), repr(e))) exc_type, exc_value, exc_traceback = sys.exc_info() full_traceback = traceback.format_exception(exc_type, exc_value, exc_traceback) - exception_queue.put((namespace, repr(e), full_traceback)) # Add the exception to the queue + self.exception_queue.put((namespace, repr(e), full_traceback)) # Add the exception to the queue finally: new_config_db_connector.close("CONFIG_DB") @@ -996,7 +997,6 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): # set up state_db connector state_db_connector = swsscommon.DBConnector("STATE_DB", 0) config_db_connector = swsscommon.DBConnector("CONFIG_DB", 0) - exception_queue = Queue() if self.DualToR: self.log_info("Dual ToR mode") @@ -1045,10 +1045,9 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): while True: # Periodically check for exceptions from child threads try: - namespace, error, exc_info = exception_queue.get_nowait() # Non-blocking + namespace, error, exc_info = self.exception_queue.get_nowait() # Non-blocking self.log_error("Exception in namespace '{}': {}".format(namespace, error)) self.log_error("Full traceback from child thread:") - import pdb; pdb.set_trace() for tb_line in exc_info: for tb_line_split in tb_line.splitlines(): self.log_error(tb_line_split) @@ -1149,7 +1148,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): if not self.update_thread[namespace]: self.log_info("Spawning ACL update thread for namepsace '{}' ...".format(namespace)) self.update_thread[namespace] = threading.Thread(target=self.check_and_update_control_plane_acls, - args=(namespace, self.num_changes[namespace], exception_queue)) + args=(namespace, self.num_changes[namespace])) self.update_thread[namespace].start() # ============================= Functions ============================= diff --git a/tests/caclmgrd/caclmgrd_bfd_test.py b/tests/caclmgrd/caclmgrd_bfd_test.py index b30e370f..114a739b 100644 --- a/tests/caclmgrd/caclmgrd_bfd_test.py +++ b/tests/caclmgrd/caclmgrd_bfd_test.py @@ -53,8 +53,7 @@ def test_caclmgrd_bfd(self, test_name, test_data, fs): caclmgrd_daemon.bfdAllowed = True mocked_subprocess.Popen.reset_mock() caclmgrd_daemon.num_changes[''] = 1 - exception_queue = Queue() - caclmgrd_daemon.check_and_update_control_plane_acls('', 1, exception_queue) + caclmgrd_daemon.check_and_update_control_plane_acls('', 1) #Ensure BFD rules are installed before ip2me rules to avoid traffic loss during update of control plane acl rules bfd_ipv4_idx = 0 diff --git a/tests/caclmgrd/caclmgrd_scale_test.py b/tests/caclmgrd/caclmgrd_scale_test.py index 414234b4..179c8c42 100644 --- a/tests/caclmgrd/caclmgrd_scale_test.py +++ b/tests/caclmgrd/caclmgrd_scale_test.py @@ -48,6 +48,5 @@ def test_caclmgrd_scale(self, test_name, test_data, fs): caclmgrd_daemon = self.caclmgrd.ControlPlaneAclManager("caclmgrd") caclmgrd_daemon.num_changes[''] = 150 - exception_queue = Queue() - caclmgrd_daemon.check_and_update_control_plane_acls('', 150, exception_queue) + caclmgrd_daemon.check_and_update_control_plane_acls('', 150) mocked_subprocess.Popen.assert_has_calls(test_data["expected_subprocess_calls"], any_order=True) diff --git a/tests/caclmgrd/caclmgrd_test.py b/tests/caclmgrd/caclmgrd_test.py index 443b82d1..b52ebe51 100644 --- a/tests/caclmgrd/caclmgrd_test.py +++ b/tests/caclmgrd/caclmgrd_test.py @@ -59,11 +59,10 @@ def test_update_control_plane_acls_exception(self, mock_update): manager.lock = {"": threading.Lock()} manager.num_changes = {"": 0} manager.update_thread = {"": None} - exception_queue = Queue() manager.num_changes[""] = 1 - manager.check_and_update_control_plane_acls("", 0, exception_queue) - self.assertFalse(exception_queue.empty()) - exc_info = exception_queue.get() + manager.check_and_update_control_plane_acls("", 0) + self.assertFalse(manager.exception_queue.empty()) + exc_info = manager.exception_queue.get() self.assertEqual(exc_info[0], "") self.assertIn("Test exception", exc_info[1]) @@ -86,6 +85,7 @@ def test_run( mock_swsscommon.SonicDBConfig.getDbId.side_effect = lambda db_name: ( 6 if db_name == "STATE_DB" else 1 ) + mock_kill.return_value = None mock_state_db_connector = MagicMock() mock_config_db_connector = MagicMock() mock_swsscommon.DBConnector.side_effect = [mock_state_db_connector, mock_config_db_connector, mock_state_db_connector] @@ -169,3 +169,110 @@ def test_run( manager.update_dhcp_acl_for_mark_change.assert_called() manager.update_dhcp_acl.assert_called() manager.setup_dhcp_chain.assert_called() + + + @patch("caclmgrd.swsscommon") + @patch("os.geteuid", return_value=0) + @patch("os.kill") + @patch("signal.SIGKILL", return_value=9) + @patch("sys.exit") + def test_run_exception( + self, + mock_exit, + mock_sigkill, + mock_kill, + mock_geteuid, + mock_swsscommon + ): + mock_kill.return_value = None + mock_sigkill.return_value = 9 + mock_geteuid.return_value = 0 + mock_exit.return_value = None + + mock_swsscommon.SonicDBConfig.getDbId.side_effect = lambda db_name: ( + 6 if db_name == "STATE_DB" else 1 + ) + mock_state_db_connector = MagicMock() + mock_config_db_connector = MagicMock() + mock_swsscommon.DBConnector.side_effect = [mock_state_db_connector, mock_config_db_connector, mock_state_db_connector] + mock_swsscommon.Select.OBJECT = 1 + mock_swsscommon.Select.return_value.select.return_value = ( + mock_swsscommon.Select.OBJECT, + MagicMock(), + ) + mock_swsscommon.Select.return_value.removeSelectable.return_value = MagicMock() + + mock_swsscommon.SubscriberStateTable.return_value.select.return_value = ( + mock_swsscommon.Select.OBJECT, + MagicMock(), + ) + mock_swsscommon.SubscriberStateTable.return_value.getTableNameSeparator.return_value = "|" + pop_values = [ + ("key1", "SET", [("mark", "0x11"), ("field2", "value2")]), + (None, None, None), + ("key2", "DEL", []), + (None, None, None), + ("key3", "SET", [("mark", "0x11")]), + (None, None, None), + ("SSH_ONLY", "SET", (('policy_desc', 'SSH_ONLY'), ('services', 'SSH'), ('stage', 'ingress'), ('type', 'CTRLPLANE'))), + ('', None, None), + ("key5", "SET", [("mark", "0x11"), ("field4", "value4")]), + ('', None, None), + ("key6", "SET", [("mark", "0x11"), ("field5", "value5")]), + ('', None, None), + ("SSH_ONLY", "SET", (('policy_desc', 'SSH_ONLY'), ('services', 'SSH'), ('stage', 'ingress'), ('type', 'CTRLPLANE'))), + ('', None, None), + ("SSH_ONLY", "SET", (('policy_desc', 'SSH_ONLY'), ('services', 'SSH'), ('stage', 'ingress'), ('type', 'CTRLPLANE'))), + ('', None, None), + ] + mock_swsscommon.SubscriberStateTable.return_value.pop.side_effect = pop_values + mock_swsscommon.CastSelectableToRedisSelectObj.return_value.getDbConnector.return_value.getNamespace.return_value = "" + + mock_swsscommon.CastSelectableToRedisSelectObj.return_value.getDbConnector.return_value.getDbId.side_effect = [6, 4, 4] + + # Creating an instance of ControlPlaneAclManager + self.caclmgrd.ControlPlaneAclManager.get_namespace_mgmt_ip = MagicMock() + self.caclmgrd.ControlPlaneAclManager.get_namespace_mgmt_ipv6 = MagicMock() + manager = self.caclmgrd.ControlPlaneAclManager("caclmgrd") + + # Setting necessary attributes + manager.log_info = MagicMock() + manager.log_error = MagicMock() + manager.DualToR = True + manager.iptables_cmd_ns_prefix = {"": []} + manager.lock = {"": threading.Lock()} + manager.num_changes = {"": 2} + manager.update_thread = {"": threading.Thread()} + manager.bfdAllowed = False + manager.VxlanAllowed = True + manager.VxlanSrcIP = "" + manager.MUX_CABLE_TABLE = "MUX_CABLE_TABLE" + manager.BFD_SESSION_TABLE = "BFD_SESSION_TABLE" + manager.VXLAN_TUNNEL_TABLE = "VXLAN_TUNNEL_TABLE" + manager.ACL_TABLE = "ACL_TABLE" + manager.ACL_RULE = "ACL_RULE" + manager.ACL_TABLE_TYPE_CTRLPLANE = "CTRLPLANE" + + # Mocking methods + manager.removeSelectable = MagicMock() + manager.allow_bfd_protocol = MagicMock() + manager.update_control_plane_acls = MagicMock() + manager.allow_vxlan_port = MagicMock() + manager.block_vxlan_port = MagicMock() + manager.update_dhcp_acl_for_mark_change = MagicMock() + manager.update_dhcp_acl = MagicMock() + manager.setup_dhcp_chain = MagicMock() + manager.exception_queue = Queue() + namespace = "" + error = "Simulated exception" + exc_info = ["Traceback (most recent call last):", " File \"mock.py\", line 1, in ", " raise Exception('Simulated exception')"] + manager.exception_queue.put((namespace, error, exc_info)) + try: + manager.run() + except StopIteration as e: + # This is expected to happen + pass + + # Asserting the method calls + manager.update_control_plane_acls.assert_called() + mock_kill.assert_called() diff --git a/tests/caclmgrd/caclmgrd_vxlan_test.py b/tests/caclmgrd/caclmgrd_vxlan_test.py index ac1d1495..6b32d012 100644 --- a/tests/caclmgrd/caclmgrd_vxlan_test.py +++ b/tests/caclmgrd/caclmgrd_vxlan_test.py @@ -59,7 +59,7 @@ def test_caclmgrd_vxlan(self, test_name, test_data, fs): caclmgrd_daemon.allow_vxlan_port('', data) mocked_subprocess.Popen.reset_mock() caclmgrd_daemon.num_changes[''] = 1 - exception_queue = Queue() - caclmgrd_daemon.check_and_update_control_plane_acls('', 1, exception_queue) + caclmgrd_daemon.exception_queue = Queue() + caclmgrd_daemon.check_and_update_control_plane_acls('', 1) mocked_subprocess.Popen.assert_has_calls(test_data["expected_add_subprocess_calls"], any_order=True) From ad61bf6ab5d7dbbdbd588b87d96740a0814dc746 Mon Sep 17 00:00:00 2001 From: Zhaohui Sun Date: Wed, 25 Dec 2024 07:54:24 +0000 Subject: [PATCH 11/11] replace deprecated function getName() Signed-off-by: Zhaohui Sun --- scripts/caclmgrd | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/caclmgrd b/scripts/caclmgrd index 090ddcb9..7759a9a3 100755 --- a/scripts/caclmgrd +++ b/scripts/caclmgrd @@ -895,7 +895,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): self.update_thread[namespace] = None return except Exception as e: - self.log_error("Exception occured at {} thread due to {}".format(threading.current_thread().getName(), repr(e))) + self.log_error("Exception occured at {} thread due to {}".format(threading.current_thread().name, repr(e))) exc_type, exc_value, exc_traceback = sys.exc_info() full_traceback = traceback.format_exception(exc_type, exc_value, exc_traceback) self.exception_queue.put((namespace, repr(e), full_traceback)) # Add the exception to the queue @@ -1051,7 +1051,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase): for tb_line in exc_info: for tb_line_split in tb_line.splitlines(): self.log_error(tb_line_split) - self.log_error("Detect exception in Child thread {} , generating SIGKILL for main thread".format(self.update_thread[namespace].getName())) + self.log_error("Detect exception in Child thread {} , generating SIGKILL for main thread".format(self.update_thread[namespace].name)) os.kill(os.getpid(), signal.SIGKILL) except Empty: # No exceptions in the queue