diff --git a/sonic-xcvrd/pytest.ini b/sonic-xcvrd/pytest.ini new file mode 100644 index 000000000000..6a40b69cce87 --- /dev/null +++ b/sonic-xcvrd/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +addopts = --cov=xcvrd --cov-report html diff --git a/sonic-xcvrd/setup.cfg b/sonic-xcvrd/setup.cfg new file mode 100644 index 000000000000..b7e478982ccf --- /dev/null +++ b/sonic-xcvrd/setup.cfg @@ -0,0 +1,2 @@ +[aliases] +test=pytest diff --git a/sonic-xcvrd/setup.py b/sonic-xcvrd/setup.py index 8ed5c38c2aca..27a1ab3bd905 100644 --- a/sonic-xcvrd/setup.py +++ b/sonic-xcvrd/setup.py @@ -24,6 +24,10 @@ setup_requires=[ 'wheel' ], + tests_require=[ + 'pytest', + 'pytest-cov', + ], classifiers=[ 'Development Status :: 4 - Beta', 'Environment :: No Input/Output (Daemon)', diff --git a/sonic-xcvrd/tests/__init__.py b/sonic-xcvrd/tests/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sonic-xcvrd/tests/mock_platform.py b/sonic-xcvrd/tests/mock_platform.py new file mode 100644 index 000000000000..168de9754b0f --- /dev/null +++ b/sonic-xcvrd/tests/mock_platform.py @@ -0,0 +1,262 @@ +class MockDevice: + def __init__(self): + self.name = None + self.presence = True + self.model = 'FAN Model' + self.serial = 'Fan Serial' + + def get_name(self): + return self.name + + def get_presence(self): + return self.presence + + def get_model(self): + return self.model + + def get_serial(self): + return self.serial + + def get_position_in_parent(self): + return 1 + + def is_replaceable(self): + return True + + def get_status(self): + return True + + +class MockFan(MockDevice): + STATUS_LED_COLOR_RED = 'red' + STATUS_LED_COLOR_GREEN = 'green' + + def __init__(self): + MockDevice.__init__(self) + self.speed = 20 + self.speed_tolerance = 20 + self.target_speed = 20 + self.status = True + self.direction = 'intake' + self.led_status = 'red' + + def get_speed(self): + return self.speed + + def get_speed_tolerance(self): + return self.speed_tolerance + + def get_target_speed(self): + return self.target_speed + + def get_status(self): + return self.status + + def get_direction(self): + return self.direction + + def get_status_led(self): + return self.led_status + + def set_status_led(self, value): + self.led_status = value + + def make_under_speed(self): + self.speed = 1 + self.target_speed = 2 + self.speed_tolerance = 0 + + def make_over_speed(self): + self.speed = 2 + self.target_speed = 1 + self.speed_tolerance = 0 + + def make_normal_speed(self): + self.speed = 1 + self.target_speed = 1 + self.speed_tolerance = 0 + + +class MockErrorFan(MockFan): + def get_speed(self): + raise Exception('Fail to get speed') + + +class MockPsu(MockDevice): + def __init__(self): + MockDevice.__init__(self) + self.fan_list = [] + + def get_all_fans(self): + return self.fan_list + + +class MockFanDrawer(MockDevice): + def __init__(self, index): + MockDevice.__init__(self) + self.name = 'FanDrawer {}'.format(index) + self.fan_list = [] + self.led_status = 'red' + + def get_name(self): + return self.name + + def get_all_fans(self): + return self.fan_list + + def get_status_led(self): + return self.led_status + + def set_status_led(self, value): + self.led_status = value + + +class MockThermal(MockDevice): + def __init__(self, index=None): + MockDevice.__init__(self) + self.name = None + self.name = 'Thermal {}'.format(index) if index != None else None + self.temperature = 2 + self.minimum_temperature = 1 + self.maximum_temperature = 5 + self.high_threshold = 3 + self.low_threshold = 1 + self.high_critical_threshold = 4 + self.low_critical_threshold = 0 + + def get_name(self): + return self.name + + def get_temperature(self): + return self.temperature + + def get_minimum_recorded(self): + return self.minimum_temperature + + def get_maximum_recorded(self): + return self.maximum_temperature + + def get_high_threshold(self): + return self.high_threshold + + def get_low_threshold(self): + return self.low_threshold + + def get_high_critical_threshold(self): + return self.high_critical_threshold + + def get_low_critical_threshold(self): + return self.low_critical_threshold + + def make_over_temper(self): + self.high_threshold = 2 + self.temperature = 3 + self.low_threshold = 1 + + def make_under_temper(self): + self.high_threshold = 3 + self.temperature = 1 + self.low_threshold = 2 + + def make_normal_temper(self): + self.high_threshold = 3 + self.temperature = 2 + self.low_threshold = 1 + + +class MockErrorThermal(MockThermal): + def get_temperature(self): + raise Exception('Fail to get temperature') + + +class MockChassis: + def __init__(self): + self.fan_list = [] + self.psu_list = [] + self.thermal_list = [] + self.fan_drawer_list = [] + self.sfp_list = [] + self.is_chassis_system = False + + def get_all_fans(self): + return self.fan_list + + def get_all_psus(self): + return self.psu_list + + def get_all_thermals(self): + return self.thermal_list + + def get_all_fan_drawers(self): + return self.fan_drawer_list + + def get_all_sfps(self): + return self.sfp_list + + def get_num_thermals(self): + return len(self.thermal_list) + + def make_absence_fan(self): + fan = MockFan() + fan.presence = False + fan_drawer = MockFanDrawer(len(self.fan_drawer_list)) + fan_drawer.fan_list.append(fan) + self.fan_list.append(fan) + self.fan_drawer_list.append(fan_drawer) + + def make_fault_fan(self): + fan = MockFan() + fan.status = False + fan_drawer = MockFanDrawer(len(self.fan_drawer_list)) + fan_drawer.fan_list.append(fan) + self.fan_list.append(fan) + self.fan_drawer_list.append(fan_drawer) + + def make_under_speed_fan(self): + fan = MockFan() + fan.make_under_speed() + fan_drawer = MockFanDrawer(len(self.fan_drawer_list)) + fan_drawer.fan_list.append(fan) + self.fan_list.append(fan) + self.fan_drawer_list.append(fan_drawer) + + def make_over_speed_fan(self): + fan = MockFan() + fan.make_over_speed() + fan_drawer = MockFanDrawer(len(self.fan_drawer_list)) + fan_drawer.fan_list.append(fan) + self.fan_list.append(fan) + self.fan_drawer_list.append(fan_drawer) + + def make_error_fan(self): + fan = MockErrorFan() + fan_drawer = MockFanDrawer(len(self.fan_drawer_list)) + fan_drawer.fan_list.append(fan) + self.fan_list.append(fan) + self.fan_drawer_list.append(fan_drawer) + + def make_over_temper_thermal(self): + thermal = MockThermal() + thermal.make_over_temper() + self.thermal_list.append(thermal) + + def make_under_temper_thermal(self): + thermal = MockThermal() + thermal.make_under_temper() + self.thermal_list.append(thermal) + + def make_error_thermal(self): + thermal = MockErrorThermal() + self.thermal_list.append(thermal) + + def is_modular_chassis(self): + return self.is_chassis_system + + def set_modular_chassis(self, is_true): + self.is_chassis_system = is_true + + def set_my_slot(self, my_slot): + self.my_slot = my_slot + + def get_my_slot(self): + return self.my_slot diff --git a/sonic-xcvrd/tests/mock_swsscommon.py b/sonic-xcvrd/tests/mock_swsscommon.py new file mode 100644 index 000000000000..5794efcbcc51 --- /dev/null +++ b/sonic-xcvrd/tests/mock_swsscommon.py @@ -0,0 +1,31 @@ +STATE_DB = '' +CHASSIS_STATE_DB = '' + + +class Table: + def __init__(self, db, table_name): + self.table_name = table_name + self.mock_dict = {} + + def _del(self, key): + if key in self.mock_dict: + del self.mock_dict[key] + pass + + def set(self, key, fvs): + self.mock_dict[key] = fvs + pass + + def get(self, key): + if key in self.mock_dict: + return self.mock_dict[key] + return None + + def get_size(self): + return (len(self.mock_dict)) + + +class FieldValuePairs: + def __init__(self, fvs): + self.fv_dict = dict(fvs) + pass diff --git a/sonic-xcvrd/tests/t0-sample-port-config.ini b/sonic-xcvrd/tests/t0-sample-port-config.ini new file mode 100644 index 000000000000..ed03fa85cbbd --- /dev/null +++ b/sonic-xcvrd/tests/t0-sample-port-config.ini @@ -0,0 +1,14 @@ +# name lanes alias +Ethernet0 29,30,31,32 fortyGigE0/0 +Ethernet4 25,26,27,28 fortyGigE0/4 +Ethernet8 37,38,39,40 fortyGigE0/8 +Ethernet12 33,34,35,36 fortyGigE0/12 +Ethernet16 41,42,43,44 fortyGigE0/16 +Ethernet20 45,46,47,48 fortyGigE0/20 +Ethernet24 5,6,7,8 fortyGigE0/24 +Ethernet28 1,2,3,4 fortyGigE0/28 +Ethernet32 9,10,11,12 fortyGigE0/32 +Ethernet36 13,14,15,16 fortyGigE0/36 +Ethernet40 21,22,23,24 fortyGigE0/40 +Ethernet44 17,18,19,20 fortyGigE0/44 +Ethernet48 49,50,51,52 fortyGigE0/48 diff --git a/sonic-xcvrd/tests/test_xcvrd.py b/sonic-xcvrd/tests/test_xcvrd.py new file mode 100644 index 000000000000..c9b3ffbb3e7a --- /dev/null +++ b/sonic-xcvrd/tests/test_xcvrd.py @@ -0,0 +1,217 @@ +import os +import sys +import subprocess + +import pytest +import unittest +from imp import load_source +if sys.version_info >= (3, 3): + from unittest.mock import MagicMock +else: + from mock import MagicMock, patch + +from sonic_py_common import daemon_base +from swsscommon import swsscommon +from .mock_swsscommon import Table + + +daemon_base.db_connect = MagicMock() +swsscommon.Table = MagicMock() +swsscommon.ProducerStateTable = MagicMock() +sys.modules['sonic_y_cable'] = MagicMock() +sys.modules['sonic_y_cable.y_cable'] = MagicMock() + +test_path = os.path.dirname(os.path.abspath(__file__)) +modules_path = os.path.dirname(test_path) +scripts_path = os.path.join(modules_path, "xcvrd") +helper_file_path = os.path.join(scripts_path, "xcvrd_utilities"+"/y_cable_helper.py") +sys.path.insert(0, modules_path) + +os.environ["XCVRD_UNIT_TESTING"] = "1" +load_source('y_cable_helper', scripts_path + '/xcvrd_utilities/y_cable_helper.py') +from y_cable_helper import * +from xcvrd.xcvrd import * + + +class TestXcvrdScript(object): + + def test_xcvrd_helper_class_run(self): + Y_cable_task = YCableTableUpdateTask() + + @patch('xcvrd.xcvrd.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) + @patch('xcvrd.xcvrd._wrapper_get_transceiver_dom_info', MagicMock(return_value={'temperature': '22.75', + 'voltage': '0.5', + 'rx1power': '0.7', + 'rx2power': '0.7', + 'rx3power': '0.7', + 'rx4power': '0.7', + 'rx5power': '0.7', + 'rx6power': '0.7', + 'rx7power': '0.7', + 'rx8power': '0.7', + 'tx1bias': '0.7', + 'tx2bias': '0.7', + 'tx3bias': '0.7', + 'tx4bias': '0.7', + 'tx5bias': '0.7', + 'tx6bias': '0.7', + 'tx7bias': '0.7', + 'tx8bias': '0.7', + 'tx1power': '0.7', + 'tx2power': '0.7', + 'tx3power': '0.7', + 'tx4power': '0.7', + 'tx5power': '0.7', + 'tx6power': '0.7', + 'tx7power': '0.7', + 'tx8power': '0.7', })) + def test_post_port_dom_info_to_db(self): + logical_port_name = "Ethernet0" + stop_event = threading.Event() + dom_tbl = Table("state_db", "dom_info_tbl") + post_port_dom_info_to_db(logical_port_name, dom_tbl, stop_event) + + @patch('xcvrd.xcvrd.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) + def test_del_port_sfp_dom_info_from_db(self): + logical_port_name = "Ethernet0" + stop_event = threading.Event() + dom_tbl = Table("state_db", "dom_info_tbl") + init_tbl = Table("state_db", "init_info_tbl") + del_port_sfp_dom_info_from_db(logical_port_name, init_tbl, dom_tbl) + + @patch('xcvrd.xcvrd.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) + @patch('xcvrd.xcvrd._wrapper_get_transceiver_dom_threshold_info', MagicMock(return_value={'temphighalarm': '22.75', + 'temphighwarning': '0.5', + 'templowalarm': '0.7', + 'templowwarning': '0.7', + 'vcchighalarm': '0.7', + 'vcchighwarning': '0.7', + 'vcclowalarm': '0.7', + 'vcclowwarning': '0.7', + 'txpowerhighalarm': '0.7', + 'txpowerlowalarm': '0.7', + 'txpowerhighwarning': '0.7', + 'txpowerlowwarning': '0.7', + 'rxpowerhighalarm': '0.7', + 'rxpowerlowalarm': '0.7', + 'rxpowerhighwarning': '0.7', + 'rxpowerlowwarning': '0.7', + 'txbiashighalarm': '0.7', + 'txbiaslowalarm': '0.7', + 'txbiashighwarning': '0.7', + 'txbiaslowwarning': '0.7', })) + def test_post_port_dom_threshold_info_to_db(self): + logical_port_name = "Ethernet0" + stop_event = threading.Event() + dom_tbl = Table("state_db", "dom_info_tbl") + post_port_dom_threshold_info_to_db(logical_port_name, dom_tbl, stop_event) + + @patch('xcvrd.xcvrd.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) + @patch('xcvrd.xcvrd._wrapper_is_replaceable', MagicMock(return_value=True)) + @patch('xcvrd.xcvrd._wrapper_get_transceiver_info', MagicMock(return_value={'type': '22.75', + 'hardware_rev': '0.5', + 'serial': '0.7', + 'manufacturer': '0.7', + 'model': '0.7', + 'vendor_oui': '0.7', + 'vendor_date': '0.7', + 'connector': '0.7', + 'encoding': '0.7', + 'ext_identifier': '0.7', + 'ext_rateselect_compliance': '0.7', + 'cable_type': '0.7', + 'cable_length': '0.7', + 'specification_compliance': '0.7', + 'nominal_bit_rate': '0.7', + 'application_advertisement': '0.7', + 'is_replaceable': '0.7', })) + def test_post_port_sfp_info_to_db(self): + logical_port_name = "Ethernet0" + stop_event = threading.Event() + dom_tbl = Table("state_db", "dom_info_tbl") + transceiver_dict = {} + post_port_sfp_info_to_db(logical_port_name, dom_tbl, transceiver_dict, stop_event) + + @patch('xcvrd.xcvrd.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd.platform_sfputil', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) + @patch('xcvrd.xcvrd._wrapper_is_replaceable', MagicMock(return_value=True)) + @patch('xcvrd.xcvrd._wrapper_get_transceiver_info', MagicMock(return_value={'type': '22.75', + 'hardware_rev': '0.5', + 'serial': '0.7', + 'manufacturer': '0.7', + 'model': '0.7', + 'vendor_oui': '0.7', + 'vendor_date': '0.7', + 'connector': '0.7', + 'encoding': '0.7', + 'ext_identifier': '0.7', + 'ext_rateselect_compliance': '0.7', + 'cable_type': '0.7', + 'cable_length': '0.7', + 'specification_compliance': '0.7', + 'nominal_bit_rate': '0.7', + 'application_advertisement': '0.7', + 'is_replaceable': '0.7', })) + @patch('xcvrd.xcvrd._wrapper_get_transceiver_dom_threshold_info', MagicMock(return_value={'temphighalarm': '22.75', + 'temphighwarning': '0.5', + 'templowalarm': '0.7', + 'templowwarning': '0.7', + 'vcchighalarm': '0.7', + 'vcchighwarning': '0.7', + 'vcclowalarm': '0.7', + 'vcclowwarning': '0.7', + 'txpowerhighalarm': '0.7', + 'txpowerlowalarm': '0.7', + 'txpowerhighwarning': '0.7', + 'txpowerlowwarning': '0.7', + 'rxpowerhighalarm': '0.7', + 'rxpowerlowalarm': '0.7', + 'rxpowerhighwarning': '0.7', + 'rxpowerlowwarning': '0.7', + 'txbiashighalarm': '0.7', + 'txbiaslowalarm': '0.7', + 'txbiashighwarning': '0.7', + 'txbiaslowwarning': '0.7', })) + @patch('xcvrd.xcvrd._wrapper_get_transceiver_dom_info', MagicMock(return_value={'temperature': '22.75', + 'voltage': '0.5', + 'rx1power': '0.7', + 'rx2power': '0.7', + 'rx3power': '0.7', + 'rx4power': '0.7', + 'rx5power': '0.7', + 'rx6power': '0.7', + 'rx7power': '0.7', + 'rx8power': '0.7', + 'tx1bias': '0.7', + 'tx2bias': '0.7', + 'tx3bias': '0.7', + 'tx4bias': '0.7', + 'tx5bias': '0.7', + 'tx6bias': '0.7', + 'tx7bias': '0.7', + 'tx8bias': '0.7', + 'tx1power': '0.7', + 'tx2power': '0.7', + 'tx3power': '0.7', + 'tx4power': '0.7', + 'tx5power': '0.7', + 'tx6power': '0.7', + 'tx7power': '0.7', + 'tx8power': '0.7', })) + def test_post_port_sfp_dom_info_to_db(self): + logical_port_name = "Ethernet0" + stop_event = threading.Event() + post_port_sfp_dom_info_to_db(True, stop_event) + + @patch('xcvrd.xcvrd.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd.platform_sfputil', MagicMock(return_value=[0])) + @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) + @patch('xcvrd.xcvrd._wrapper_is_replaceable', MagicMock(return_value=True)) + def test_init_port_sfp_status_tbl(self): + stop_event = threading.Event() + init_port_sfp_status_tbl(stop_event)