Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restart caclmgrd whenever catch exception in child thread or in main thread #194

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
202 changes: 118 additions & 84 deletions scripts/caclmgrd
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ try:
import sys
import threading
import time
import traceback
import signal
from queue import Empty, Queue
from sonic_py_common.general import getstatusoutput_noshell_pipe
from sonic_py_common import daemon_base, device_info, multi_asic
from swsscommon import swsscommon
Expand Down Expand Up @@ -852,7 +855,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
self.run_commands(dualtor_iptables_cmds)


def check_and_update_control_plane_acls(self, namespace, num_changes):
def check_and_update_control_plane_acls(self, namespace, num_changes, exception_queue):
"""
This function is intended to be spawned in a separate thread.
Its purpose is to prevent unnecessary iptables updates if we receive
Expand All @@ -869,6 +872,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
while True:
# Sleep for our delay interval
time.sleep(self.UPDATE_DELAY_SECS)
self.log_info("After delay {}s, checking for ACL table changes in namespace '{}'".format(self.UPDATE_DELAY_SECS, namespace))

with self.lock[namespace]:
if self.num_changes[namespace] > num_changes:
Expand All @@ -890,6 +894,17 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
self.num_changes[namespace] = 0
self.update_thread[namespace] = None
return
except Exception as e:
self.log_error("Exception occured at {} thread due to {}".format(threading.current_thread().getName(), repr(e)))
exc_type, exc_value, exc_traceback = sys.exc_info()
msg = traceback.format_exception(exc_type, exc_value, exc_traceback)
for tb_line in msg:
for tb_line_split in tb_line.splitlines():
self.log_error(tb_line_split)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log_error

syslog may drop message or mess order due to UDP protocol. suggest queue the exception, and re-raise in main thread.

Alternatively, using join in main thread to handle thread exception. No need to invent exception_queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In main thread, it still needs to keep checking if there are next db updates, so it can't join and wait for the child thread's result and handle the exception, that's why I choose exception queue and only check if the queue is empty or not before starting checking the db updates every time in main thread.

exc_info = traceback.format_exc()
exception_queue.put((namespace, repr(e), exc_info)) # Add the exception to the queue
self.log_error("Exiting thread {}, put it into exception_queue {}".format(
threading.current_thread().getName(), exception_queue))
finally:
new_config_db_connector.close("CONFIG_DB")

Expand Down Expand Up @@ -988,6 +1003,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
# set up state_db connector
state_db_connector = swsscommon.DBConnector("STATE_DB", 0)
config_db_connector = swsscommon.DBConnector("CONFIG_DB", 0)
exception_queue = Queue()

if self.DualToR:
self.log_info("Dual ToR mode")
Expand Down Expand Up @@ -1031,103 +1047,121 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):

# Get the ACL rule table seprator
acl_rule_table_seprator = subscribe_acl_rule_table.getTableNameSeparator()
try:
# Loop on select to see if any event happen on state db or config db of any namespace
while True:
# Periodically check for exceptions from child threads
try:
namespace, error, _ = exception_queue.get_nowait() # Non-blocking
self.log_error(f"Exception in namespace '{namespace}': {error}")
self.log_error("Detect exception in Child thread {} , generating SIGKILL for main thread".format(self.update_thread[namespace].getName()))
os.kill(os.getpid(), signal.SIGKILL)
except Empty:
# No exceptions in the queue
pass
(state, selectableObj) = sel.select(SELECT_TIMEOUT_MS)
# Continue if select is timeout or selectable object is not return
if state != swsscommon.Select.OBJECT:
continue

# Loop on select to see if any event happen on state db or config db of any namespace
while True:
(state, selectableObj) = sel.select(SELECT_TIMEOUT_MS)
# Continue if select is timeout or selectable object is not return
if state != swsscommon.Select.OBJECT:
continue

# Get the redisselect object from selectable object
redisSelectObj = swsscommon.CastSelectableToRedisSelectObj(selectableObj)

# Get the corresponding namespace and db_id from redisselect
namespace = redisSelectObj.getDbConnector().getNamespace()
db_id = redisSelectObj.getDbConnector().getDbId()

if db_id == state_db_id:
while True:
key, op, fvs = subscribe_bfd_session.pop()
if not key:
break
# Get the redisselect object from selectable object
redisSelectObj = swsscommon.CastSelectableToRedisSelectObj(selectableObj)

if op == 'SET' and not self.bfdAllowed:
self.allow_bfd_protocol(namespace)
self.bfdAllowed = True
sel.removeSelectable(subscribe_bfd_session)
# Get the corresponding namespace and db_id from redisselect
namespace = redisSelectObj.getDbConnector().getNamespace()
db_id = redisSelectObj.getDbConnector().getDbId()

if self.DualToR:
'''dhcp packet mark update'''
if db_id == state_db_id:
while True:
key, op, fvs = subscribe_dhcp_packet_mark.pop()
key, op, fvs = subscribe_bfd_session.pop()
if not key:
break
self.log_info("dhcp packet mark update : '%s'" % str((key, op, fvs)))

'''initial value is None'''
pre_mark = None if key not in dhcp_packet_mark_tbl else dhcp_packet_mark_tbl[key]
cur_mark = None if op == 'DEL' else dict(fvs)['mark']
dhcp_packet_mark_tbl[key] = cur_mark
self.update_dhcp_acl_for_mark_change(key, pre_mark, cur_mark)
if op == 'SET' and not self.bfdAllowed:
self.allow_bfd_protocol(namespace)
self.bfdAllowed = True
sel.removeSelectable(subscribe_bfd_session)

if self.DualToR:
'''dhcp packet mark update'''
while True:
key, op, fvs = subscribe_dhcp_packet_mark.pop()
if not key:
break
self.log_info("dhcp packet mark update : '%s'" % str((key, op, fvs)))

'''initial value is None'''
pre_mark = None if key not in dhcp_packet_mark_tbl else dhcp_packet_mark_tbl[key]
cur_mark = None if op == 'DEL' else dict(fvs)['mark']
dhcp_packet_mark_tbl[key] = cur_mark
self.update_dhcp_acl_for_mark_change(key, pre_mark, cur_mark)

'''mux cable update'''
while True:
key, op, fvs = subscribe_mux_cable.pop()
if not key:
break
self.log_info("mux cable update : '%s'" % str((key, op, fvs)))

mark = None if key not in dhcp_packet_mark_tbl else dhcp_packet_mark_tbl[key]
self.update_dhcp_acl(key, op, dict(fvs), mark)
continue

'''mux cable update'''
if db_id == config_db_id:
while True:
key, op, fvs = subscribe_mux_cable.pop()
key, op, fvs = subscribe_vxlan_table.pop()
if not key:
break
self.log_info("mux cable update : '%s'" % str((key, op, fvs)))
if op == 'SET' and not self.VxlanAllowed:
self.allow_vxlan_port(namespace, fvs)
elif op == 'DEL' and self.VxlanAllowed:
self.block_vxlan_port(namespace)

mark = None if key not in dhcp_packet_mark_tbl else dhcp_packet_mark_tbl[key]
self.update_dhcp_acl(key, op, dict(fvs), mark)
continue
ctrl_plane_acl_notification = set()

if db_id == config_db_id:
while True:
key, op, fvs = subscribe_vxlan_table.pop()
if not key:
break
if op == 'SET' and not self.VxlanAllowed:
self.allow_vxlan_port(namespace, fvs)
elif op == 'DEL' and self.VxlanAllowed:
self.block_vxlan_port(namespace)

ctrl_plane_acl_notification = set()

# Pop data of both Subscriber Table object of namespace that got config db acl table event
for table in config_db_subscriber_table_map[namespace]:
while True:
(key, op, fvp) = table.pop()
# Pop of table that does not have data so break
if key == '':
break
# ACL Table notification. We will take Control Plane ACTION for any ACL Table Event
# This can be optimize further but we should not have many acl table set/del events in normal
# scenario
if acl_rule_table_seprator not in key:
ctrl_plane_acl_notification.add(namespace)
# Check ACL Rule notification and make sure Rule point to ACL Table which is Controlplane
else:
acl_table = key.split(acl_rule_table_seprator)[0]
if self.config_db_map[namespace].get_table(self.ACL_TABLE)[acl_table]["type"] == self.ACL_TABLE_TYPE_CTRLPLANE:
# Pop data of both Subscriber Table object of namespace that got config db acl table event
for table in config_db_subscriber_table_map[namespace]:
while True:
(key, op, fvp) = table.pop()
# Pop of table that does not have data so break
if key == '':
break
# ACL Table notification. We will take Control Plane ACTION for any ACL Table Event
# This can be optimize further but we should not have many acl table set/del events in normal
# scenario
if acl_rule_table_seprator not in key:
ctrl_plane_acl_notification.add(namespace)

# Update the Control Plane ACL of the namespace that got config db acl table event
for namespace in ctrl_plane_acl_notification:
with self.lock[namespace]:
if self.num_changes[namespace] == 0:
self.log_info("ACL change detected for namespace '{}'".format(namespace))

# Increment the number of change events we've received for this namespace
self.num_changes[namespace] += 1

# If an update thread is not already spawned for the namespace which we received
# the ACL table update event, spawn one now
if not self.update_thread[namespace]:
self.log_info("Spawning ACL update thread for namepsace '{}' ...".format(namespace))
self.update_thread[namespace] = threading.Thread(target=self.check_and_update_control_plane_acls,
args=(namespace, self.num_changes[namespace]))
self.update_thread[namespace].start()
# Check ACL Rule notification and make sure Rule point to ACL Table which is Controlplane
else:
acl_table = key.split(acl_rule_table_seprator)[0]
if self.config_db_map[namespace].get_table(self.ACL_TABLE)[acl_table]["type"] == self.ACL_TABLE_TYPE_CTRLPLANE:
ctrl_plane_acl_notification.add(namespace)

# Update the Control Plane ACL of the namespace that got config db acl table event
for namespace in ctrl_plane_acl_notification:
with self.lock[namespace]:
if self.num_changes[namespace] == 0:
self.log_info("ACL change detected for namespace '{}'".format(namespace))

# Increment the number of change events we've received for this namespace
self.num_changes[namespace] += 1

# If an update thread is not already spawned for the namespace which we received
# the ACL table update event, spawn one now
if not self.update_thread[namespace]:
self.log_info("Spawning ACL update thread for namepsace '{}' ...".format(namespace))
self.update_thread[namespace] = threading.Thread(target=self.check_and_update_control_plane_acls,
args=(namespace, self.num_changes[namespace], exception_queue))
self.update_thread[namespace].start()
except Exception as e:
ZhaohuiS marked this conversation as resolved.
Show resolved Hide resolved
self.log_error("Exception occured at main thread due to {}".format(repr(e)))
exc_type, exc_value, exc_traceback = sys.exc_info()
msg = traceback.format_exception(exc_type, exc_value, exc_traceback)
for tb_line in msg:
for tb_line_split in tb_line.splitlines():
self.log_error(tb_line_split)
self.log_error("Catch exception in main thread, generating SIGKILL for main thread")
os.kill(os.getpid(), signal.SIGKILL)

# ============================= Functions =============================

Expand Down
5 changes: 4 additions & 1 deletion tests/caclmgrd/caclmgrd_bfd_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from unittest import TestCase, mock
from pyfakefs.fake_filesystem_unittest import patchfs

from queue import Queue

from .test_bfd_vectors import CACLMGRD_BFD_TEST_VECTOR
from tests.common.mock_configdb import MockConfigDb
from unittest.mock import MagicMock, patch
Expand Down Expand Up @@ -51,7 +53,8 @@ def test_caclmgrd_bfd(self, test_name, test_data, fs):
caclmgrd_daemon.bfdAllowed = True
mocked_subprocess.Popen.reset_mock()
caclmgrd_daemon.num_changes[''] = 1
caclmgrd_daemon.check_and_update_control_plane_acls('', 1)
exception_queue = Queue()
caclmgrd_daemon.check_and_update_control_plane_acls('', 1, exception_queue)

#Ensure BFD rules are installed before ip2me rules to avoid traffic loss during update of control plane acl rules
bfd_ipv4_idx = 0
Expand Down
4 changes: 3 additions & 1 deletion tests/caclmgrd/caclmgrd_scale_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .test_scale_vectors import CACLMGRD_SCALE_TEST_VECTOR
from tests.common.mock_configdb import MockConfigDb
from unittest.mock import MagicMock, patch
from queue import Queue

DBCONFIG_PATH = '/var/run/redis/sonic-db/database_config.json'

Expand Down Expand Up @@ -47,5 +48,6 @@ def test_caclmgrd_scale(self, test_name, test_data, fs):

caclmgrd_daemon = self.caclmgrd.ControlPlaneAclManager("caclmgrd")
caclmgrd_daemon.num_changes[''] = 150
caclmgrd_daemon.check_and_update_control_plane_acls('', 150)
exception_queue = Queue()
caclmgrd_daemon.check_and_update_control_plane_acls('', 150, exception_queue)
mocked_subprocess.Popen.assert_has_calls(test_data["expected_subprocess_calls"], any_order=True)
Loading
Loading