Skip to content

Commit

Permalink
okay
Browse files Browse the repository at this point in the history
Signed-off-by: Dante Su <dante.su@broadcom.com>
  • Loading branch information
ds952811 committed Nov 18, 2021
1 parent 11b6850 commit a1d8e10
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 126 deletions.
268 changes: 172 additions & 96 deletions sonic-xcvrd/xcvrd/xcvrd.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,10 @@ def _wrapper_get_transceiver_change_event(timeout):

def _wrapper_get_sfp_type(physical_port):
if platform_chassis:
sfp = platform_chassis.get_sfp(physical_port)
try:
sfp = platform_chassis.get_sfp(physical_port)
except (NotImplementedError, AttributeError):
return None
try:
return sfp.sfp_type
except (NotImplementedError, AttributeError):
Expand Down Expand Up @@ -232,7 +235,7 @@ def beautify_dom_info_dict(dom_info_dict, physical_port):
dom_info_dict['tx2power'] = strip_unit_and_beautify(dom_info_dict['tx2power'], POWER_UNIT)
dom_info_dict['tx3power'] = strip_unit_and_beautify(dom_info_dict['tx3power'], POWER_UNIT)
dom_info_dict['tx4power'] = strip_unit_and_beautify(dom_info_dict['tx4power'], POWER_UNIT)
if _wrapper_get_sfp_type(physical_port) == 'QSFP_DD':
if 'rx5power' in dom_info_dict:
dom_info_dict['rx5power'] = strip_unit_and_beautify(dom_info_dict['rx5power'], POWER_UNIT)
dom_info_dict['rx6power'] = strip_unit_and_beautify(dom_info_dict['rx6power'], POWER_UNIT)
dom_info_dict['rx7power'] = strip_unit_and_beautify(dom_info_dict['rx7power'], POWER_UNIT)
Expand Down Expand Up @@ -438,7 +441,7 @@ def post_port_dom_info_to_db(logical_port_name, port_mapping, table, stop_event=
dom_info_cache[physical_port] = dom_info_dict
if dom_info_dict is not None:
beautify_dom_info_dict(dom_info_dict, physical_port)
if _wrapper_get_sfp_type(physical_port) == 'QSFP_DD':
if 'rx5power' in dom_info_dict:
fvs = swsscommon.FieldValuePairs(
[('temperature', dom_info_dict['temperature']),
('voltage', dom_info_dict['voltage']),
Expand Down Expand Up @@ -674,31 +677,16 @@ def get_media_settings_key(physical_port, transceiver_dict):
media_key += '-' + media_compliance_code
if _wrapper_get_sfp_type(physical_port) == 'QSFP_DD':
if media_compliance_code == "passive_copper_media_interface":
if media_len != 0:
media_key += '-' + str(media_len) + 'M'
if len(media_len) != 0:
media_key += '-' + media_len + 'M'
else:
if media_len != 0:
media_key += '-' + str(media_len) + 'M'
if len(media_len) != 0:
media_key += '-' + media_len + 'M'
else:
media_key += '-' + '*'

return [vendor_key, media_key]

def get_media_settings_cmis(transceiver_dict):
CMIS_MEDIA_SETTINGS_KEY = 'CMIS_MEDIA_SETTINGS'

if CMIS_MEDIA_SETTINGS_KEY not in g_dict:
return {}

vendor_name_str = transceiver_dict['manufacturer']
vendor_pn_str = transceiver_dict['model']
vendor_key = (vendor_name_str.strip() + '#' + vendor_pn_str.strip()).upper()

for k, v in g_dict[CMIS_MEDIA_SETTINGS_KEY].items():
if k.upper() == vendor_key:
return v
return {}

def get_media_val_str_from_dict(media_dict):
LANE_STR = 'lane'
LANE_SEPARATOR = ','
Expand Down Expand Up @@ -866,136 +854,224 @@ def is_fast_reboot_enabled():

class CmisManagerTask:

CMIS_STATE_INSERTED = 'INSERTED'
CMIS_STATE_DP_DEINIT = 'DP_DEINIT'
CMIS_STATE_AP_CONF = 'AP_CONFIGURED'
CMIS_STATE_DP_INIT = 'DP_INIT'
CMIS_STATE_DP_TXON = 'DP_TXON'
CMIS_STATE_READY = 'READY'
CMIS_STATE_REMOVED = 'REMOVED'
CMIS_STATE_FAILED = 'FAILED'

def __init__(self, port_mapping):
# Max. number of the worker process/thread
self.worker_max = 5
self.task_workers = []
self.task_stopping_event = multiprocessing.Event()
self.task_queue = multiprocessing.Manager().Queue()
self.task_process = None
self.port_dict = {}
self.port_mapping = copy.deepcopy(port_mapping)
self.isPortInitDone = False
self.isPortConfigDone = False

def log_notice(self, message):
def dbg_print(self, message):
helper_logger.log_notice("CMIS: {}".format(message))

def on_port_config_change(self, port_change_event):
if port_change_event.event_type != port_change_event.PORT_SET:
if port_change_event.event_type not in [port_change_event.PORT_SET, port_change_event.PORT_DEL]:
return

lport = port_change_event.port_name
pport = port_change_event.port_index

if lport in ['PortInitDone']:
self.isPortInitDone = True
return

if lport in ['PortConfigDone']:
self.isPortConfigDone = True
return

if not lport.startswith('Ethernet'):
return

if port_change_event.port_dict is None:
speed = "0"
lanes = "0"
lanes = ""
else:
speed = port_change_event.port_dict.get('speed')
lanes = port_change_event.port_dict.get('lanes')
speed = port_change_event.port_dict.get('speed', "0")
lanes = port_change_event.port_dict.get('lanes', "")

if (pport is None) or (speed is None) or (lanes is None):
if pport is None:
return

try:
msg = {
'lport': lport,
'pport': pport,
'speed': int(speed),
'lanes': lanes.split(',')
}
except ValueError as ex:
self.log_notice("{}".format(ex))
# Skip if the port/cage type is not QSFP-DD
ptype = _wrapper_get_sfp_type(pport)
if ptype not in ['QSFP-DD', 'QSFP_DD']:
return

self.task_queue.put(msg)
time.sleep(0.001)
if lport not in self.port_dict:
self.port_dict[lport] = {}

if port_change_event.event_type == port_change_event.PORT_SET:
if pport >= 0:
self.port_dict[lport]['index'] = pport
if port_change_event.port_dict is not None and 'speed' in port_change_event.port_dict:
self.port_dict[lport]['speed'] = port_change_event.port_dict['speed']
if port_change_event.port_dict is not None and 'lanes' in port_change_event.port_dict:
self.port_dict[lport]['lanes'] = port_change_event.port_dict['lanes']
self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_INSERTED
else:
self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_REMOVED

def task_worker(self, commander):
self.log_notice("Starting...")
def task_worker(self):
self.dbg_print("Starting...")

if commander:
sel, asic_context = port_mapping.subscribe_port_config_change()
while not self.task_stopping_event.is_set():
# Handle port change event from main thread
port_mapping.handle_port_config_change(sel, asic_context, self.task_stopping_event, self.port_mapping, helper_logger, self.on_port_config_change)
else:
while not self.task_stopping_event.is_set():
try:
msg = self.task_queue.get(False, 0.1)
except:
msg = None
if msg is None:
time.sleep(1)
# APPL_DB for CONFIG updates, and STATE_DB for insertion/removal
sel, asic_context = port_mapping.subscribe_port_config_change(['APPL_DB', 'STATE_DB'])
while not self.task_stopping_event.is_set():
# Handle port change event from main thread
port_mapping.handle_port_config_change(sel,
asic_context,
self.task_stopping_event,
self.port_mapping,
helper_logger,
self.on_port_config_change,
True)

if not self.isPortConfigDone:
continue

for lport, info in self.port_dict.items():
if self.task_stopping_event.is_set():
break

if lport not in self.port_dict:
continue

lport = msg['lport']
pport = msg['pport']
lanes = msg['lanes']
speed = msg['speed']
if speed == 0 or len(lanes) < 1:
state = self.port_dict[lport].get('cmis_state', self.CMIS_STATE_REMOVED)
if state in [self.CMIS_STATE_FAILED, self.CMIS_STATE_READY, self.CMIS_STATE_REMOVED]:
continue

pport = int(info.get('index', "-1"))
speed = int(info.get('speed', "0"))
lanes = info.get('lanes', "").strip()
if pport < 0 or speed == 0 or len(lanes) < 1:
continue

# Replace the physical lane id with logical lane index
#
# TODO: Add dynamic port breakout support
lanes_new = []
lanes_old = lanes.split(',')
for i in range(len(lanes_old)):
lanes_new.append(i)
host_lanes = lanes_new
host_speed = speed

sfp = platform_chassis.get_sfp(pport)
if not sfp.get_presence():
self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_REMOVED
continue

try:
# Skip if the port/cage type is not QSFP-DD
if sfp.SFP_PORT_TYPE_QSFPDD != sfp.get_port_type():
continue
# Skip if the xcvr type is not QSFP-DD
info = sfp.get_transceiver_info()
if info.get('type_abbrv_name', None) not in ['QSFP-DD', 'QSFP_DD']:
self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_FAILED
continue

# Skip if the memory type is flat
if info.get('memory_type', 'flat') in ['flat', 'FLAT']:
self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY
continue

# Fetch the custom CMIS media setting from media_settings.json
conf = get_media_settings_cmis(info)
if conf is None:
conf = {}
# Enable application selection by default
if conf.get('application_selection', True):
self.log_notice("{}: application update for {}G, {}-lanes".format(
lport, int(speed/1000), len(lanes)))
ret = sfp.set_cmis_application(speed, len(lanes))
else:
ret = None
except (NotImplementedError, AttributeError):
ret = None
self.dbg_print("{}: {}G, {}-lanes, state={}".format(
lport, int(speed/1000), len(host_lanes), state))

if ret is None:
state = "skipped"
elif not ret:
state = "failed"
else:
state = "ok"
if state == self.CMIS_STATE_INSERTED:
(flag, appl) = sfp.get_cmis_application_update(host_lanes, host_speed)
if not flag:
# No application updates
self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY
continue
self.port_dict[lport]['cmis_apsel'] = appl
sfp.set_cmis_application_stop(host_lanes)
self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_DP_DEINIT
elif state == self.CMIS_STATE_DP_DEINIT:
if sfp.get_module_state() != 'ModuleReady':
continue
appl = self.port_dict[lport]['cmis_apsel']
sfp.set_cmis_application_apsel(host_lanes, appl)
self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_AP_CONF
elif state == self.CMIS_STATE_AP_CONF:
st = sfp.get_cmis_state()['config_state']
done = True
for lane in host_lanes:
name = "ConfigStatusLane{}".format(lane + 1)
if st[name] != 'ConfigSuccess':
done = False
continue
if not done:
continue
sfp.set_cmis_application_start(host_lanes)
self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_DP_INIT
elif state == self.CMIS_STATE_DP_INIT:
st = sfp.get_cmis_state()['datapath_state']
done = True
for lane in host_lanes:
name = "DP{}State".format(lane + 1)
if st[name] != 'DataPathInitialized':
done = False
continue
if not done:
continue
sfp.set_cmis_application_txon(host_lanes)
self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_DP_TXON
elif state == self.CMIS_STATE_DP_TXON:
st = sfp.get_cmis_state()['datapath_state']
done = True
for lane in host_lanes:
name = "DP{}State".format(lane + 1)
if st[name] != 'DataPathActivated':
done = False
continue
if not done:
continue
state = self.CMIS_STATE_READY
self.dbg_print("{}: {}G, {}-lanes, state={}".format(
lport, int(speed/1000), len(host_lanes), state))
self.port_dict[lport]['cmis_state'] = state

self.log_notice("{}: application update ... {}".format(lport, state))
except (NotImplementedError, AttributeError):
self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_FAILED

self.log_notice("Stopped")
self.dbg_print("Stopped")

def task_run(self, y_cable_presence):
if platform_chassis is None:
self.log_notice("Platform chassis is not available, stopping...")
self.dbg_print("Platform chassis is not available, stopping...")
return

has_cmis = False
for sfp in platform_chassis.get_all_sfps():
if sfp.get_port_type() in [sfp.SFP_PORT_TYPE_QSFPDD]:
try:
ptype = sfp.get_port_type()
except (NotImplementedError, ValueError):
ptype = 'Unknown'
if ptype in ['QSFP_DD', 'QSFP-DD']:
has_cmis = True
break

if not has_cmis:
self.log_notice("None of QSFP-DD cage is detected, stopping...")
self.dbg_print("None of QSFP-DD cages are detected, stopping...")
return

for n in range(0, self.worker_max):
p = multiprocessing.Process(target=self.task_worker, args=(n==0,))
self.task_workers.append(p)
p.start()
self.task_process = multiprocessing.Process(target=self.task_worker)
if self.task_process is not None:
self.task_process.start()

def task_stop(self):
self.task_stopping_event.set()
for p in self.task_workers:
p.join()
if self.task_process is not None:
self.task_process.join()


# Thread wrapper class to update dom info periodically
Expand Down
Loading

0 comments on commit a1d8e10

Please sign in to comment.