diff --git a/orchagent/Makefile.am b/orchagent/Makefile.am index 248c190f11..68aa474552 100644 --- a/orchagent/Makefile.am +++ b/orchagent/Makefile.am @@ -18,6 +18,7 @@ dist_swss_DATA = \ pfc_detect_barefoot.lua \ pfc_detect_nephos.lua \ pfc_detect_cisco-8000.lua \ + pfc_detect_vs.lua \ pfc_restore.lua \ pfc_restore_cisco-8000.lua \ port_rates.lua \ diff --git a/orchagent/orchdaemon.cpp b/orchagent/orchdaemon.cpp index ff1d8afb18..0341f69ea9 100644 --- a/orchagent/orchdaemon.cpp +++ b/orchagent/orchdaemon.cpp @@ -448,7 +448,7 @@ bool OrchDaemon::init() CFG_PFC_WD_TABLE_NAME }; - if (platform == MLNX_PLATFORM_SUBSTRING) + if ((platform == MLNX_PLATFORM_SUBSTRING) || (platform == VS_PLATFORM_SUBSTRING)) { static const vector portStatIds = diff --git a/orchagent/pfc_detect_vs.lua b/orchagent/pfc_detect_vs.lua new file mode 100644 index 0000000000..e805ad9cff --- /dev/null +++ b/orchagent/pfc_detect_vs.lua @@ -0,0 +1,108 @@ +-- KEYS - queue IDs +-- ARGV[1] - counters db index +-- ARGV[2] - counters table name +-- ARGV[3] - poll time interval (milliseconds) +-- return queue Ids that satisfy criteria + +local counters_db = ARGV[1] +local counters_table_name = ARGV[2] +local poll_time = tonumber(ARGV[3]) * 1000 + +local rets = {} + +redis.call('SELECT', counters_db) + +-- Iterate through each queue +local n = table.getn(KEYS) +for i = n, 1, -1 do + local counter_keys = redis.call('HKEYS', counters_table_name .. ':' .. KEYS[i]) + local counter_num = 0 + local old_counter_num = 0 + local is_deadlock = false + local pfc_wd_status = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'PFC_WD_STATUS') + local pfc_wd_action = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'PFC_WD_ACTION') + + local big_red_switch_mode = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'BIG_RED_SWITCH_MODE') + if not big_red_switch_mode and (pfc_wd_status == 'operational' or pfc_wd_action == 'alert') then + local detection_time = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'PFC_WD_DETECTION_TIME') + if detection_time then + detection_time = tonumber(detection_time) + local time_left = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'PFC_WD_DETECTION_TIME_LEFT') + if not time_left then + time_left = detection_time + else + time_left = tonumber(time_left) + end + + local queue_index = redis.call('HGET', 'COUNTERS_QUEUE_INDEX_MAP', KEYS[i]) + local port_id = redis.call('HGET', 'COUNTERS_QUEUE_PORT_MAP', KEYS[i]) + -- If there is no entry in COUNTERS_QUEUE_INDEX_MAP or COUNTERS_QUEUE_PORT_MAP then + -- it means KEYS[i] queue is inserted into FLEX COUNTER DB but the corresponding + -- maps haven't been updated yet. + if queue_index and port_id then + local pfc_rx_pkt_key = 'SAI_PORT_STAT_PFC_' .. queue_index .. '_RX_PKTS' + local pfc_duration_key = 'SAI_PORT_STAT_PFC_' .. queue_index .. '_RX_PAUSE_DURATION_US' + + -- Get all counters + local occupancy_bytes = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'SAI_QUEUE_STAT_CURR_OCCUPANCY_BYTES') + local packets = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'SAI_QUEUE_STAT_PACKETS') + local pfc_rx_packets = redis.call('HGET', counters_table_name .. ':' .. port_id, pfc_rx_pkt_key) + local pfc_duration = redis.call('HGET', counters_table_name .. ':' .. port_id, pfc_duration_key) + + if occupancy_bytes and packets and pfc_rx_packets and pfc_duration then + occupancy_bytes = tonumber(occupancy_bytes) + packets = tonumber(packets) + pfc_rx_packets = tonumber(pfc_rx_packets) + pfc_duration = tonumber(pfc_duration) + + local packets_last = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'SAI_QUEUE_STAT_PACKETS_last') + local pfc_rx_packets_last = redis.call('HGET', counters_table_name .. ':' .. port_id, pfc_rx_pkt_key .. '_last') + local pfc_duration_last = redis.call('HGET', counters_table_name .. ':' .. port_id, pfc_duration_key .. '_last') + -- DEBUG CODE START. Uncomment to enable + local debug_storm = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'DEBUG_STORM') + -- DEBUG CODE END. + + -- If this is not a first run, then we have last values available + if packets_last and pfc_rx_packets_last and pfc_duration_last then + packets_last = tonumber(packets_last) + pfc_rx_packets_last = tonumber(pfc_rx_packets_last) + pfc_duration_last = tonumber(pfc_duration_last) + local storm_condition = (pfc_duration - pfc_duration_last) > (poll_time * 0.8) + + -- Check actual condition of queue being in PFC storm + if (occupancy_bytes > 0 and packets - packets_last == 0 and pfc_rx_packets - pfc_rx_packets_last > 0) or + -- DEBUG CODE START. Uncomment to enable + (debug_storm == "enabled") or + -- DEBUG CODE END. + (occupancy_bytes == 0 and packets - packets_last == 0 and storm_condition) then + if time_left <= poll_time then + redis.call('HDEL', counters_table_name .. ':' .. port_id, pfc_rx_pkt_key .. '_last') + redis.call('HDEL', counters_table_name .. ':' .. port_id, pfc_duration_key .. '_last') + redis.call('PUBLISH', 'PFC_WD_ACTION', '["' .. KEYS[i] .. '","storm"]') + is_deadlock = true + time_left = detection_time + else + time_left = time_left - poll_time + end + else + if pfc_wd_action == 'alert' and pfc_wd_status ~= 'operational' then + redis.call('PUBLISH', 'PFC_WD_ACTION', '["' .. KEYS[i] .. '","restore"]') + end + time_left = detection_time + end + end + + -- Save values for next run + redis.call('HSET', counters_table_name .. ':' .. KEYS[i], 'SAI_QUEUE_STAT_PACKETS_last', packets) + redis.call('HSET', counters_table_name .. ':' .. KEYS[i], 'PFC_WD_DETECTION_TIME_LEFT', time_left) + if is_deadlock == false then + redis.call('HSET', counters_table_name .. ':' .. port_id, pfc_rx_pkt_key .. '_last', pfc_rx_packets) + redis.call('HSET', counters_table_name .. ':' .. port_id, pfc_duration_key .. '_last', pfc_duration) + end + end + end + end + end +end + +return rets diff --git a/tests/test_pfcwd.py b/tests/test_pfcwd.py index c569bc8a43..78cd851574 100644 --- a/tests/test_pfcwd.py +++ b/tests/test_pfcwd.py @@ -77,6 +77,222 @@ def test_PfcWdAclCreationDeletion(self, dvs, dvs_acl, testlog): finally: dvs_acl.remove_acl_table(PFCWD_TABLE_NAME) + + +class TestPfcwdFunc(object): + @pytest.fixture + def setup_teardown_test(self, dvs): + self.get_db_handle(dvs) + + self.test_ports = ["Ethernet0"] + + self.setup_test(dvs) + self.get_port_oids() + self.get_queue_oids() + + yield + + self.teardown_test(dvs) + + def setup_test(self, dvs): + # get original cable len for test ports + fvs = self.config_db.get_entry("CABLE_LENGTH", "AZURE") + self.orig_cable_len = dict() + for port in self.test_ports: + self.orig_cable_len[port] = fvs[port] + # set cable len to non zero value. if port is down, default cable len is 0 + self.set_cable_len(port, "5m") + # startup port + dvs.runcmd("config interface startup {}".format(port)) + + # enable pfcwd + self.set_flex_counter_status("PFCWD", "enable") + # enable queue so that queue oids are generated + self.set_flex_counter_status("QUEUE", "enable") + + def teardown_test(self, dvs): + # disable pfcwd + self.set_flex_counter_status("PFCWD", "disable") + # disable queue + self.set_flex_counter_status("QUEUE", "disable") + + for port in self.test_ports: + if self.orig_cable_len: + self.set_cable_len(port, self.orig_cable_len[port]) + # shutdown port + dvs.runcmd("config interface shutdown {}".format(port)) + + def get_db_handle(self, dvs): + self.app_db = dvs.get_app_db() + self.asic_db = dvs.get_asic_db() + self.config_db = dvs.get_config_db() + self.counters_db = dvs.get_counters_db() + + def set_flex_counter_status(self, key, state): + fvs = {'FLEX_COUNTER_STATUS': state} + self.config_db.update_entry("FLEX_COUNTER_TABLE", key, fvs) + time.sleep(1) + + def get_queue_oids(self): + self.queue_oids = self.counters_db.get_entry("COUNTERS_QUEUE_NAME_MAP", "") + + def get_port_oids(self): + self.port_oids = self.counters_db.get_entry("COUNTERS_PORT_NAME_MAP", "") + + def _get_bitmask(self, queues): + mask = 0 + if queues is not None: + for queue in queues: + mask = mask | 1 << queue + + return str(mask) + + def set_ports_pfc(self, status='enable', pfc_queues=[3,4]): + for port in self.test_ports: + if 'enable' in status: + fvs = {'pfc_enable': ",".join([str(q) for q in pfc_queues])} + self.config_db.create_entry("PORT_QOS_MAP", port, fvs) + else: + self.config_db.delete_entry("PORT_QOS_MAP", port) + + def set_cable_len(self, port_name, cable_len): + fvs = {port_name: cable_len} + self.config_db.update_entry("CABLE_LEN", "AZURE", fvs) + + def start_pfcwd_on_ports(self, poll_interval="200", detection_time="200", restoration_time="200", action="drop"): + pfcwd_info = {"POLL_INTERVAL": poll_interval} + self.config_db.update_entry("PFC_WD", "GLOBAL", pfcwd_info) + + pfcwd_info = {"action": action, + "detection_time" : detection_time, + "restoration_time": restoration_time + } + for port in self.test_ports: + self.config_db.update_entry("PFC_WD", port, pfcwd_info) + + def stop_pfcwd_on_ports(self): + for port in self.test_ports: + self.config_db.delete_entry("PFC_WD", port) + + def verify_ports_pfc(self, queues=None): + mask = self._get_bitmask(queues) + fvs = {"SAI_PORT_ATTR_PRIORITY_FLOW_CONTROL" : mask} + for port in self.test_ports: + self.asic_db.wait_for_field_match("ASIC_STATE:SAI_OBJECT_TYPE_PORT", self.port_oids[port], fvs) + + def verify_pfcwd_state(self, queues, state="stormed"): + fvs = {"PFC_WD_STATUS": state} + for port in self.test_ports: + for queue in queues: + queue_name = port + ":" + str(queue) + self.counters_db.wait_for_field_match("COUNTERS", self.queue_oids[queue_name], fvs) + + def verify_pfcwd_counters(self, queues, restore="0"): + fvs = {"PFC_WD_QUEUE_STATS_DEADLOCK_DETECTED" : "1", + "PFC_WD_QUEUE_STATS_DEADLOCK_RESTORED" : restore + } + for port in self.test_ports: + for queue in queues: + queue_name = port + ":" + str(queue) + self.counters_db.wait_for_field_match("COUNTERS", self.queue_oids[queue_name], fvs) + + def reset_pfcwd_counters(self, queues): + fvs = {"PFC_WD_QUEUE_STATS_DEADLOCK_DETECTED" : "0", + "PFC_WD_QUEUE_STATS_DEADLOCK_RESTORED" : "0" + } + for port in self.test_ports: + for queue in queues: + queue_name = port + ":" + str(queue) + self.counters_db.update_entry("COUNTERS", self.queue_oids[queue_name], fvs) + + def set_storm_state(self, queues, state="enabled"): + fvs = {"DEBUG_STORM": state} + for port in self.test_ports: + for queue in queues: + queue_name = port + ":" + str(queue) + self.counters_db.update_entry("COUNTERS", self.queue_oids[queue_name], fvs) + + def test_pfcwd_single_queue(self, dvs, setup_teardown_test): + try: + # enable PFC on queues + test_queues = [3, 4] + self.set_ports_pfc(pfc_queues=test_queues) + + # verify in asic db + self.verify_ports_pfc(test_queues) + + # start pfcwd + self.start_pfcwd_on_ports() + + # start pfc storm + storm_queue = [3] + self.set_storm_state(storm_queue) + + # verify pfcwd is triggered + self.verify_pfcwd_state(storm_queue) + + # verify pfcwd counters + self.verify_pfcwd_counters(storm_queue) + + # verify if queue is disabled + self.verify_ports_pfc(queues=[4]) + + # stop storm + self.set_storm_state(storm_queue, state="disabled") + + # verify pfcwd state is restored + self.verify_pfcwd_state(storm_queue, state="operational") + + # verify pfcwd counters + self.verify_pfcwd_counters(storm_queue, restore="1") + + # verify if queue is enabled + self.verify_ports_pfc(test_queues) + + finally: + self.reset_pfcwd_counters(storm_queue) + self.stop_pfcwd_on_ports() + + def test_pfcwd_multi_queue(self, dvs, setup_teardown_test): + try: + # enable PFC on queues + test_queues = [3, 4] + self.set_ports_pfc(pfc_queues=test_queues) + + # verify in asic db + self.verify_ports_pfc(test_queues) + + # start pfcwd + self.start_pfcwd_on_ports() + + # start pfc storm + self.set_storm_state(test_queues) + + # verify pfcwd is triggered + self.verify_pfcwd_state(test_queues) + + # verify pfcwd counters + self.verify_pfcwd_counters(test_queues) + + # verify if queue is disabled. Expected mask is 0 + self.verify_ports_pfc() + + # stop storm + self.set_storm_state(test_queues, state="disabled") + + # verify pfcwd state is restored + self.verify_pfcwd_state(test_queues, state="operational") + + # verify pfcwd counters + self.verify_pfcwd_counters(test_queues, restore="1") + + # verify if queue is enabled + self.verify_ports_pfc(test_queues) + + finally: + self.reset_pfcwd_counters(test_queues) + self.stop_pfcwd_on_ports() + # # Add Dummy always-pass test at end as workaroud # for issue when Flaky fail on final test it invokes module tear-down before retrying