From e77aa82afbb8572e9c72dc86610f8784e8f5a23c Mon Sep 17 00:00:00 2001 From: junchao Date: Thu, 22 Sep 2022 15:01:33 +0800 Subject: [PATCH] [Mellanox] Use sdk sysfs instead of ethtool --- .../mlnx-platform-api/sonic_platform/sfp.py | 342 ++++++++++-------- .../tests/input_platform/cmis_page0 | Bin 0 -> 256 bytes .../tests/input_platform/sff8472_page0 | Bin 0 -> 256 bytes .../tests/input_platform/sff8636_page0 | Bin 0 -> 256 bytes .../mlnx-platform-api/tests/test_sfp.py | 151 ++++++-- 5 files changed, 319 insertions(+), 174 deletions(-) create mode 100644 platform/mellanox/mlnx-platform-api/tests/input_platform/cmis_page0 create mode 100644 platform/mellanox/mlnx-platform-api/tests/input_platform/sff8472_page0 create mode 100644 platform/mellanox/mlnx-platform-api/tests/input_platform/sff8636_page0 diff --git a/platform/mellanox/mlnx-platform-api/sonic_platform/sfp.py b/platform/mellanox/mlnx-platform-api/sonic_platform/sfp.py index bc87fb8cd4df..2ca37fcefdfc 100644 --- a/platform/mellanox/mlnx-platform-api/sonic_platform/sfp.py +++ b/platform/mellanox/mlnx-platform-api/sonic_platform/sfp.py @@ -23,6 +23,7 @@ ############################################################################# try: + import ctypes import subprocess import os from sonic_py_common.logger import Logger @@ -126,11 +127,42 @@ SFP_STATUS_INSERTED = '1' # SFP constants -SFP_PAGE_SIZE = 256 -SFP_UPPER_PAGE_OFFSET = 128 -SFP_VENDOR_PAGE_START = 640 - -BYTES_IN_DWORD = 4 +SFP_PAGE_SIZE = 256 # page size of page0h +SFP_UPPER_PAGE_OFFSET = 128 # page size of other pages + +# SFP sysfs path constants +SFP_PAGE0_PATH = '0/i2c-0x50/data' +SFP_A2H_PAGE0_PATH = '0/i2c-0x51/data' +SFP_EEPROM_ROOT_TEMPLATE = '/sys/module/sx_core/asic0/module{}/eeprom/pages' + +# SFP type constants +SFP_TYPE_CMIS = 'cmis' +SFP_TYPE_SFF8472 = 'sff8472' +SFP_TYPE_SFF8636 = 'sff8636' + +# SFP stderr +SFP_EEPROM_NOT_AVAILABLE = 'Input/output error' + +# SFP EEPROM limited bytes +limited_eeprom = { + SFP_TYPE_CMIS: { + 'write': { + 0: [26, (31, 36), (126, 127)], + 16: [(0, 128)] + } + }, + SFP_TYPE_SFF8472: { + 'write': { + 0: [110, (114, 115), 118, 127] + } + }, + SFP_TYPE_SFF8636: { + 'write': { + 0: [(86, 88), 93, (98, 99), (100, 106), 127], + 3: [(230, 241), (242, 251)] + } + } +} # Global logger class instance logger = Logger() @@ -157,77 +189,6 @@ def deinitialize_sdk_handle(sdk_handle): logger.log_warning("Sdk handle is none") return False -class MlxregManager: - def __init__(self, mst_pci_device, slot_id, sdk_index): - self.mst_pci_device = mst_pci_device - self.slot_id = slot_id - self.sdk_index = sdk_index - - def construct_dword(self, write_buffer): - if len(write_buffer) == 0: - return None - - used_bytes_in_dword = len(write_buffer) % BYTES_IN_DWORD - - res = "dword[0]=0x" - for idx, x in enumerate(write_buffer): - word = hex(x)[2:] - - if (idx > 0) and (idx % BYTES_IN_DWORD) == 0: - res += ",dword[{}]=0x".format(str((idx + 1)//BYTES_IN_DWORD)) - res += word.zfill(2) - - if used_bytes_in_dword > 0: - res += (BYTES_IN_DWORD - used_bytes_in_dword) * "00" - return res - - def write_mlxreg_eeprom(self, num_bytes, dword, device_address, page): - if not dword: - return False - - try: - cmd = ["mlxreg", "-d", "", "--reg_name", "MCIA", "--indexes", "", "--set", "", "-y"] - cmd[2] = "/dev/mst/" + self.mst_pci_device - cmd[6] = "slot_index={},module={},device_address={},page_number={},i2c_device_address=0x50,size={},bank_number=0".format(self.slot_id, self.sdk_index, device_address, page, num_bytes) - cmd[8] = dword - subprocess.check_call(cmd, universal_newlines=True, stdout=subprocess.DEVNULL) - except subprocess.CalledProcessError as e: - logger.log_error("Error! Unable to write data dword={} for {} port, page {} offset {}, rc = {}, err msg: {}".format(dword, self.sdk_index, page, device_address, e.returncode, e.output)) - return False - return True - - def read_mlxred_eeprom(self, offset, page, num_bytes): - try: - - cmd = ["mlxreg", "-d", "", "--reg_name", "MCIA", "--indexes", "", "--get"] - cmd[2] = "/dev/mst/" + self.mst_pci_device - cmd[6] = "slot_index={},module={},device_address={},page_number={},i2c_device_address=0x50,size={},bank_number=0".format(self.slot_id, self.sdk_index, offset, page, num_bytes) - result = subprocess.check_output(cmd, universal_newlines=True) - except subprocess.CalledProcessError as e: - logger.log_error("Error! Unable to read data for {} port, page {} offset {}, rc = {}, err msg: {}".format(self.sdk_index, page, offset, e.returncode, e.output)) - return None - return result - - def parse_mlxreg_read_output(self, read_output, num_bytes): - if not read_output: - return None - - res = "" - dword_num = num_bytes // BYTES_IN_DWORD - used_bytes_in_dword = num_bytes % BYTES_IN_DWORD - arr = [value for value in read_output.split('\n') if value[0:5] == "dword"] - for i in range(dword_num): - dword = arr[i].split()[2] - res += dword[2:] - - if used_bytes_in_dword > 0: - # Cut needed info and insert into final hex string - # Example: 3 bytes : 0x12345600 - # ^ ^ - dword = arr[dword_num].split()[2] - res += dword[2 : 2 + used_bytes_in_dword * 2] - - return bytearray.fromhex(res) if res else None class SdkHandleContext(object): def __init__(self): @@ -312,6 +273,7 @@ def __init__(self, sfp_index, sfp_type=None, slot_id=0, linecard_port_count=0, l self.slot_id = slot_id self.mst_pci_device = self.get_mst_pci_device() + self._sfp_type_str = None # get MST PCI device name def get_mst_pci_device(self): @@ -337,6 +299,7 @@ def reinit(self): Re-initialize this SFP object when a new SFP inserted :return: """ + self._sfp_type_str = None self.refresh_xcvr_api() def get_presence(self): @@ -346,36 +309,9 @@ def get_presence(self): Returns: bool: True if device is present, False if not """ - eeprom_raw = self.read_eeprom(0, 1) - + eeprom_raw = self._read_eeprom(0, 1, log_on_error=False) return eeprom_raw is not None - # Read out any bytes from any offset - def _read_eeprom_specific_bytes(self, offset, num_bytes): - if offset + num_bytes > SFP_VENDOR_PAGE_START: - logger.log_error("Error mismatch between page size and bytes to read (offset: {} num_bytes: {}) ".format(offset, num_bytes)) - return None - - eeprom_raw = [] - ethtool_cmd = ["ethtool", "-m", "", "hex", "on", "offset", "", "length", ""] - ethtool_cmd[2] = "sfp" + str(self.index) - ethtool_cmd[6] = str(offset) - ethtool_cmd[8] = str(num_bytes) - try: - output = subprocess.check_output(ethtool_cmd, - universal_newlines=True) - output_lines = output.splitlines() - first_line_raw = output_lines[0] - if "Offset" in first_line_raw: - for line in output_lines[2:]: - line_split = line.split() - eeprom_raw = eeprom_raw + line_split[1:] - except subprocess.CalledProcessError as e: - return None - - eeprom_raw = list(map(lambda h: int(h, base=16), eeprom_raw)) - return bytearray(eeprom_raw) - # read eeprom specfic bytes beginning from offset with size as num_bytes def read_eeprom(self, offset, num_bytes): """ @@ -383,43 +319,37 @@ def read_eeprom(self, offset, num_bytes): Returns: bytearray, if raw sequence of bytes are read correctly from the offset of size num_bytes None, if the read_eeprom fails - Example: - mlxreg -d /dev/mst/mt52100_pciconf0 --reg_name MCIA --indexes slot_index=0,module=1,device_address=148,page_number=0,i2c_device_address=0x50,size=16,bank_number=0 -g - Sending access register... - Field Name | Data - =================================== - status | 0x00000000 - slot_index | 0x00000000 - module | 0x00000001 - l | 0x00000000 - device_address | 0x00000094 - page_number | 0x00000000 - i2c_device_address | 0x00000050 - size | 0x00000010 - bank_number | 0x00000000 - dword[0] | 0x43726564 - dword[1] | 0x6f202020 - dword[2] | 0x20202020 - dword[3] | 0x20202020 - dword[4] | 0x00000000 - dword[5] | 0x00000000 - .... - 16 bytes to read from dword -> 0x437265646f2020202020202020202020 -> Credo """ - # recalculate offset and page. Use 'ethtool' if there is no need to read vendor pages - if offset < SFP_VENDOR_PAGE_START: - return self._read_eeprom_specific_bytes(offset, num_bytes) - else: - page = (offset - SFP_PAGE_SIZE) // SFP_UPPER_PAGE_OFFSET + 1 - # calculate offset per page - device_address = (offset - SFP_PAGE_SIZE) % SFP_UPPER_PAGE_OFFSET + SFP_UPPER_PAGE_OFFSET + return self._read_eeprom(offset, num_bytes) + + def _read_eeprom(self, offset, num_bytes, log_on_error=True): + """Read eeprom specfic bytes beginning from a random offset with size as num_bytes + + Args: + offset (int): read offset + num_bytes (int): read size + log_on_error (bool, optional): whether log error when exception occurs. Defaults to True. + + Returns: + bytearray: the content of EEPROM + """ + _, page, page_offset = self._get_page_and_page_offset(offset) + if not page: + return None - if not self.mst_pci_device: + try: + with open(page, mode='rb', buffering=0) as f: + f.seek(page_offset) + content = f.read(num_bytes) + if ctypes.get_errno() != 0: + raise IOError(f'errno = {os.strerror(ctypes.get_errno())}') + except (OSError, IOError) as e: + if log_on_error: + logger.log_error(f'Failed to read sfp={self.sdk_index} EEPROM page={page}, page_offset={page_offset}, \ + size={num_bytes}, offset={offset}, error = {e}') return None - mlxreg_mngr = MlxregManager(self.mst_pci_device, self.slot_id, self.sdk_index) - read_output = mlxreg_mngr.read_mlxred_eeprom(device_address, page, num_bytes) - return mlxreg_mngr.parse_mlxreg_read_output(read_output, num_bytes) + return bytearray(content) # write eeprom specfic bytes beginning from offset with size as num_bytes def write_eeprom(self, offset, num_bytes, write_buffer): @@ -435,21 +365,28 @@ def write_eeprom(self, offset, num_bytes, write_buffer): logger.log_error("Error mismatch between buffer length and number of bytes to be written") return False - # recalculate offset and page - if offset < SFP_PAGE_SIZE: - page = 0 - device_address = offset - else: - page = (offset - SFP_PAGE_SIZE) // SFP_UPPER_PAGE_OFFSET + 1 - # calculate offset per page - device_address = (offset - SFP_PAGE_SIZE) % SFP_UPPER_PAGE_OFFSET + SFP_UPPER_PAGE_OFFSET - - if not self.mst_pci_device: + page_num, page, page_offset = self._get_page_and_page_offset(offset) + if not page: return False - mlxreg_mngr = MlxregManager(self.mst_pci_device, self.slot_id, self.sdk_index) - dword = mlxreg_mngr.construct_dword(write_buffer) - return mlxreg_mngr.write_mlxreg_eeprom(num_bytes, dword, device_address, page) + try: + if self._is_write_protected(page_num, page_offset, num_bytes): + # write limited eeprom is not supported + raise IOError('write limited bytes') + + with open(page, mode='r+b', buffering=0) as f: + f.seek(page_offset) + ret = f.write(write_buffer[0:num_bytes]) + if ret != num_bytes: + raise IOError(f'write return code = {ret}') + if ctypes.get_errno() != 0: + raise IOError(f'errno = {os.strerror(ctypes.get_errno())}') + except (OSError, IOError) as e: + data = ''.join('{:02x}'.format(x) for x in write_buffer) + logger.log_error(f'Failed to write EEPROM data sfp={self.sdk_index} EEPROM page={page}, page_offset={page_offset}, size={num_bytes}, \ + offset={offset}, data = {data}, error = {e}') + return False + return True @classmethod def mgmt_phy_mod_pwr_attr_get(cls, power_attr_type, sdk_handle, sdk_index, slot_id): @@ -759,6 +696,109 @@ def get_error_description(self): error_description = "Unknow SFP module status ({})".format(oper_status) return error_description + def _get_eeprom_path(self): + return SFP_EEPROM_ROOT_TEMPLATE.format(self.sdk_index) + + def _get_page_and_page_offset(self, overall_offset): + """Get EEPROM page and page offset according to overall offset + + Args: + overall_offset (int): Overall read offset + + Returns: + tuple: (, , ) + """ + eeprom_path = self._get_eeprom_path() + if not os.path.exists(eeprom_path): + logger.log_error(f'EEPROM file path for sfp {self.sdk_index} does not exist') + return None, None, None + + if overall_offset < SFP_PAGE_SIZE: + return 0, os.path.join(eeprom_path, SFP_PAGE0_PATH), overall_offset + + if self._get_sfp_type_str(eeprom_path) == SFP_TYPE_SFF8472: + page1h_start = SFP_PAGE_SIZE * 2 + if overall_offset < page1h_start: + return -1, os.path.join(eeprom_path, SFP_A2H_PAGE0_PATH), overall_offset - SFP_PAGE_SIZE + else: + page1h_start = SFP_PAGE_SIZE + + page_num = (overall_offset - page1h_start) // SFP_UPPER_PAGE_OFFSET + 1 + page = f'{page_num}/data' + offset = (overall_offset - page1h_start) % SFP_UPPER_PAGE_OFFSET + return page_num, os.path.join(eeprom_path, page), offset + + def _get_sfp_type_str(self, eeprom_path): + """Get SFP type by reading first byte of EEPROM + + Args: + eeprom_path (str): EEPROM path + + Returns: + str: SFP type in string + """ + if self._sfp_type_str is None: + page = os.path.join(eeprom_path, SFP_PAGE0_PATH) + try: + with open(page, mode='rb', buffering=0) as f: + id_byte_raw = bytearray(f.read(1)) + id = id_byte_raw[0] + if id == 0x18 or id == 0x19 or id == 0x1e: + self._sfp_type_str = SFP_TYPE_CMIS + elif id == 0x11 or id == 0x0D: + # in sonic-platform-common, 0x0D is treated as sff8436, + # but it shared the same implementation on Nvidia platforms, + # so, we treat it as sff8636 here. + self._sfp_type_str = SFP_TYPE_SFF8636 + elif id == 0x03: + self._sfp_type_str = SFP_TYPE_SFF8472 + else: + logger.log_error(f'Unsupported sfp type {id}') + except (OSError, IOError) as e: + # SFP_EEPROM_NOT_AVAILABLE usually indicates SFP is not present, no need + # print such error information to log + if SFP_EEPROM_NOT_AVAILABLE not in str(e): + logger.log_error(f'Failed to get SFP type, index={self.sdk_index}, error={e}') + return None + return self._sfp_type_str + + def _is_write_protected(self, page, page_offset, num_bytes): + """Check if the EEPROM read/write operation hit limitation bytes + + Args: + page (str): EEPROM page path + page_offset (int): EEPROM page offset + num_bytes (int): read/write size + + Returns: + bool: True if the limited bytes is hit + """ + eeprom_path = self._get_eeprom_path() + limited_data = limited_eeprom.get(self._get_sfp_type_str(eeprom_path)) + if not limited_data: + return False + + access_type = 'write' + limited_data = limited_data.get(access_type) + if not limited_data: + return False + + limited_ranges = limited_data.get(page) + if not limited_ranges: + return False + + access_begin = page_offset + access_end = page_offset + num_bytes - 1 + for limited_range in limited_ranges: + if isinstance(limited_range, int): + if access_begin <= limited_range <= access_end: + return True + else: # tuple + if not (access_end < limited_range[0] or access_begin > limited_range[1]): + return True + + return False + def get_rx_los(self): """Accessing rx los is not supproted, return all False @@ -786,7 +826,7 @@ def get_xcvr_api(self): """ if self._xcvr_api is None: self.refresh_xcvr_api() - if self._xcvr_api is not None: + if self._xcvr_api is not None: self._xcvr_api.get_rx_los = self.get_rx_los self._xcvr_api.get_tx_fault = self.get_tx_fault return self._xcvr_api diff --git a/platform/mellanox/mlnx-platform-api/tests/input_platform/cmis_page0 b/platform/mellanox/mlnx-platform-api/tests/input_platform/cmis_page0 new file mode 100644 index 0000000000000000000000000000000000000000..623dcfe5f3f1b1a07b9628c7b3a9a6228ea72c90 GIT binary patch literal 256 zcmb0zXkccbHei-z>|m5(bP<$hR0@>>(h`h<|HXl{7?5TJVuXbfzNtAmiFx@I3Q)kn zbkf&3z|hRhKsVgLz|qy%K*7<-H^j)m&@wF8z{JAP2&mA=z`)eNLO}sx4g({rG7Bra dC{Ton3oe|;U~FJ)Y~XETY+~SH0T)JM001&>7$pDz literal 0 HcmV?d00001 diff --git a/platform/mellanox/mlnx-platform-api/tests/input_platform/sff8472_page0 b/platform/mellanox/mlnx-platform-api/tests/input_platform/sff8472_page0 new file mode 100644 index 0000000000000000000000000000000000000000..edda806c8abaee1b4c64921eaa720b0bc3690f28 GIT binary patch literal 256 zcmZQ(QDy)E4h9Cc|6mp~gKuh1PGVkug#r{vFfg6;b#pX#G&Rt5HZU*-2|EHwpmJ9U zt8@n65JL-N6SoipV+%`5pq!zFfrXK&fm1_tKlmOwe6LNfys1qB9%^aMjA3kx$NL(?#4r!bg3 JaN2-@0RY7;6hZ(1 literal 0 HcmV?d00001 diff --git a/platform/mellanox/mlnx-platform-api/tests/test_sfp.py b/platform/mellanox/mlnx-platform-api/tests/test_sfp.py index b72a5f3ed4aa..2a79b39308b5 100644 --- a/platform/mellanox/mlnx-platform-api/tests/test_sfp.py +++ b/platform/mellanox/mlnx-platform-api/tests/test_sfp.py @@ -14,7 +14,10 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import ctypes import os +import pytest +import shutil import sys if sys.version_info.major == 3: from unittest import mock @@ -27,8 +30,6 @@ from sonic_platform.sfp import SFP, SX_PORT_MODULE_STATUS_INITIALIZING, SX_PORT_MODULE_STATUS_PLUGGED, SX_PORT_MODULE_STATUS_UNPLUGGED, SX_PORT_MODULE_STATUS_PLUGGED_WITH_ERROR, SX_PORT_MODULE_STATUS_PLUGGED_DISABLED from sonic_platform.chassis import Chassis -from sonic_platform.sfp import MlxregManager -from tests.input_platform import output_sfp class TestSfp: @@ -86,31 +87,61 @@ def test_sfp_get_error_status(self, mock_get_error_code): assert description == expected_description - @mock.patch('sonic_platform.sfp.SFP.get_mst_pci_device', mock.MagicMock(return_value="pciconf")) - @mock.patch('sonic_platform.sfp.MlxregManager.write_mlxreg_eeprom', mock.MagicMock(return_value=True)) - def test_sfp_write_eeprom(self): - mlxreg_mngr = MlxregManager("", 0, 0) - write_buffer = bytearray([1,2,3,4]) - offset = 793 - + @mock.patch('sonic_platform.sfp.SFP._get_page_and_page_offset') + @mock.patch('sonic_platform.sfp.SFP._is_write_protected') + def test_sfp_write_eeprom(self, mock_limited_eeprom, mock_get_page): + sfp = SFP(0) + assert not sfp.write_eeprom(0, 1, bytearray()) + + mock_get_page.return_value = (None, None, None) + assert not sfp.write_eeprom(0, 1, bytearray([1])) + + mock_get_page.return_value = (0, '/tmp/mock_page', 0) + mock_limited_eeprom.return_value = True + assert not sfp.write_eeprom(0, 1, bytearray([1])) + + mock_limited_eeprom.return_value = False + mo = mock.mock_open() + print('after mock open') + with mock.patch('sonic_platform.sfp.open', mo): + handle = mo() + handle.write.return_value = 1 + assert sfp.write_eeprom(0, 1, bytearray([1])) + + handle.seek.assert_called_once_with(0) + handle.write.assert_called_once_with(bytearray([1])) + handle.write.return_value = -1 + assert not sfp.write_eeprom(0, 1, bytearray([1])) + + handle.write.return_value = 1 + ctypes.set_errno(1) + assert not sfp.write_eeprom(0, 1, bytearray([1])) + ctypes.set_errno(0) + + handle.write.side_effect = OSError('') + assert not sfp.write_eeprom(0, 1, bytearray([1])) + + @mock.patch('sonic_platform.sfp.SFP.get_mst_pci_device', mock.MagicMock(return_value = None)) + @mock.patch('sonic_platform.sfp.SFP._get_page_and_page_offset') + def test_sfp_read_eeprom(self, mock_get_page): sfp = SFP(0) - sfp.write_eeprom(offset, 4, write_buffer) - MlxregManager.write_mlxreg_eeprom.assert_called_with(4, output_sfp.write_eeprom_dword1, 153, 5) + mock_get_page.return_value = (None, None, None) + assert sfp.read_eeprom(0, 1) is None - offset = 641 - write_buffer = bytearray([1,2,3,4,5,6]) - sfp.write_eeprom(offset, 6, write_buffer) - MlxregManager.write_mlxreg_eeprom.assert_called_with(6, output_sfp.write_eeprom_dword2, 129, 4) + mock_get_page.return_value = (0, '/tmp/mock_page', 0) + mo = mock.mock_open() + with mock.patch('sonic_platform.sfp.open', mo): + handle = mo() + handle.read.return_value = b'\x00' + assert sfp.read_eeprom(0, 1) == bytearray([0]) + handle.seek.assert_called_once_with(0) - @mock.patch('sonic_platform.sfp.SFP.get_mst_pci_device', mock.MagicMock(return_value="pciconf")) - @mock.patch('sonic_platform.sfp.MlxregManager.read_mlxred_eeprom', mock.MagicMock(return_value=output_sfp.read_eeprom_output)) - def test_sfp_read_eeprom(self): - mlxreg_mngr = MlxregManager("", 0, 0) - offset = 644 + ctypes.set_errno(1) + assert sfp.read_eeprom(0, 1) is None + ctypes.set_errno(0) - sfp = SFP(0) - assert output_sfp.y_cable_part_number == sfp.read_eeprom(offset, 16).decode() - MlxregManager.read_mlxred_eeprom.assert_called_with(132, 4, 16) + handle.read.side_effect = OSError('') + assert sfp.read_eeprom(0, 1) is None @mock.patch('sonic_platform.sfp.SFP._fetch_port_status') def test_is_port_admin_status_up(self, mock_port_status): @@ -120,6 +151,80 @@ def test_is_port_admin_status_up(self, mock_port_status): mock_port_status.return_value = (0, False) assert not SFP.is_port_admin_status_up(None, None) + @mock.patch('sonic_platform.sfp.SFP._get_eeprom_path', mock.MagicMock(return_value = None)) + @mock.patch('sonic_platform.sfp.SFP._get_sfp_type_str') + def test_is_write_protected(self, mock_get_type_str): + sfp = SFP(0) + mock_get_type_str.return_value = 'cmis' + assert sfp._is_write_protected(page=0, page_offset=26, num_bytes=1) + assert not sfp._is_write_protected(page=0, page_offset=27, num_bytes=1) + + # not exist page + assert not sfp._is_write_protected(page=3, page_offset=0, num_bytes=1) + + # invalid sfp type str + mock_get_type_str.return_value = 'invalid' + assert not sfp._is_write_protected(page=0, page_offset=0, num_bytes=1) + + def test_get_sfp_type_str(self): + sfp = SFP(0) + expect_sfp_types = ['cmis', 'sff8636', 'sff8472'] + mock_eeprom_path = '/tmp/mock_eeprom' + mock_dir = '/tmp/mock_eeprom/0/i2c-0x50' + os.makedirs(os.path.join(mock_dir), exist_ok=True) + for expect_sfp_type in expect_sfp_types: + source_eeprom_file = os.path.join(test_path, 'input_platform', expect_sfp_type + '_page0') + shutil.copy(source_eeprom_file, os.path.join(mock_dir, 'data')) + assert sfp._get_sfp_type_str(mock_eeprom_path) == expect_sfp_type + sfp._sfp_type_str = None + + os.system('rm -rf {}'.format(mock_eeprom_path)) + assert sfp._get_sfp_type_str('invalid') is None + + @mock.patch('os.path.exists') + @mock.patch('sonic_platform.sfp.SFP._get_eeprom_path') + @mock.patch('sonic_platform.sfp.SFP._get_sfp_type_str') + def test_get_page_and_page_offset(self, mock_get_type_str, mock_eeprom_path, mock_path_exists): + sfp = SFP(0) + mock_path_exists.return_value = False + page_num, page, page_offset = sfp._get_page_and_page_offset(0) + assert page_num is None + assert page is None + assert page_offset is None + + mock_path_exists.return_value = True + mock_eeprom_path.return_value = '/tmp' + page_num, page, page_offset = sfp._get_page_and_page_offset(255) + assert page_num == 0 + assert page == '/tmp/0/i2c-0x50/data' + assert page_offset is 255 + + mock_get_type_str.return_value = 'cmis' + page_num, page, page_offset = sfp._get_page_and_page_offset(256) + assert page_num == 1 + assert page == '/tmp/1/data' + assert page_offset is 0 + + mock_get_type_str.return_value = 'sff8472' + page_num, page, page_offset = sfp._get_page_and_page_offset(511) + assert page_num == -1 + assert page == '/tmp/0/i2c-0x51/data' + assert page_offset is 255 + + page_num, page, page_offset = sfp._get_page_and_page_offset(512) + assert page_num == 1 + assert page == '/tmp/1/data' + assert page_offset is 0 + + @mock.patch('sonic_platform.sfp.SFP._read_eeprom') + def test_get_presence(self, mock_read_eeprom): + sfp = SFP(0) + mock_read_eeprom.return_value = None + assert not sfp.get_presence() + + mock_read_eeprom.return_value = bytearray([1]) + assert sfp.get_presence() + @mock.patch('sonic_platform.sfp.SFP.get_xcvr_api') def test_dummy_apis(self, mock_get_xcvr_api): mock_api = mock.MagicMock()