Skip to content

Commit

Permalink
[pfcwd] Add vs test infrastructure (#2077)
Browse files Browse the repository at this point in the history
Currently vs platform support is missing in pfcwd which is preventing us from adding unit tests in this area. This PR adds vs platform support for pfcwd and unit test for basic functionality

What I did
Define pfcwd port and queue attributes for vs platform and a vs platform detection lua
Added unit tests for basic functionality

How I verified it
New unit testcases added have passed

nejo@nejo-linux:~/SONiC/sonic-swss/tests$ sudo pytest --dvsname=vs test_pfcwd.py
================================================= test session starts ==================================================
platform linux -- Python 3.6.9, pytest-6.0.2, py-1.9.0, pluggy-0.13.1
rootdir: /home/nejo/SONiC/sonic-swss/tests
plugins: flaky-3.7.0
collected 4 items                                                                                                      

test_pfcwd.py ....                                                                                               [100%]

============================================= 4 passed in 90.12s (0:01:30) =============================================

Signed-off-by: Neetha John <nejo@microsoft.com>
  • Loading branch information
neethajohn authored Jan 11, 2022
1 parent b96ee54 commit 8fd6e48
Show file tree
Hide file tree
Showing 4 changed files with 326 additions and 1 deletion.
1 change: 1 addition & 0 deletions orchagent/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dist_swss_DATA = \
pfc_detect_barefoot.lua \
pfc_detect_nephos.lua \
pfc_detect_cisco-8000.lua \
pfc_detect_vs.lua \
pfc_restore.lua \
pfc_restore_cisco-8000.lua \
port_rates.lua \
Expand Down
2 changes: 1 addition & 1 deletion orchagent/orchdaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ bool OrchDaemon::init()
CFG_PFC_WD_TABLE_NAME
};

if (platform == MLNX_PLATFORM_SUBSTRING)
if ((platform == MLNX_PLATFORM_SUBSTRING) || (platform == VS_PLATFORM_SUBSTRING))
{

static const vector<sai_port_stat_t> portStatIds =
Expand Down
108 changes: 108 additions & 0 deletions orchagent/pfc_detect_vs.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
-- KEYS - queue IDs
-- ARGV[1] - counters db index
-- ARGV[2] - counters table name
-- ARGV[3] - poll time interval (milliseconds)
-- return queue Ids that satisfy criteria

local counters_db = ARGV[1]
local counters_table_name = ARGV[2]
local poll_time = tonumber(ARGV[3]) * 1000

local rets = {}

redis.call('SELECT', counters_db)

-- Iterate through each queue
local n = table.getn(KEYS)
for i = n, 1, -1 do
local counter_keys = redis.call('HKEYS', counters_table_name .. ':' .. KEYS[i])
local counter_num = 0
local old_counter_num = 0
local is_deadlock = false
local pfc_wd_status = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'PFC_WD_STATUS')
local pfc_wd_action = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'PFC_WD_ACTION')

local big_red_switch_mode = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'BIG_RED_SWITCH_MODE')
if not big_red_switch_mode and (pfc_wd_status == 'operational' or pfc_wd_action == 'alert') then
local detection_time = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'PFC_WD_DETECTION_TIME')
if detection_time then
detection_time = tonumber(detection_time)
local time_left = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'PFC_WD_DETECTION_TIME_LEFT')
if not time_left then
time_left = detection_time
else
time_left = tonumber(time_left)
end

local queue_index = redis.call('HGET', 'COUNTERS_QUEUE_INDEX_MAP', KEYS[i])
local port_id = redis.call('HGET', 'COUNTERS_QUEUE_PORT_MAP', KEYS[i])
-- If there is no entry in COUNTERS_QUEUE_INDEX_MAP or COUNTERS_QUEUE_PORT_MAP then
-- it means KEYS[i] queue is inserted into FLEX COUNTER DB but the corresponding
-- maps haven't been updated yet.
if queue_index and port_id then
local pfc_rx_pkt_key = 'SAI_PORT_STAT_PFC_' .. queue_index .. '_RX_PKTS'
local pfc_duration_key = 'SAI_PORT_STAT_PFC_' .. queue_index .. '_RX_PAUSE_DURATION_US'

-- Get all counters
local occupancy_bytes = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'SAI_QUEUE_STAT_CURR_OCCUPANCY_BYTES')
local packets = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'SAI_QUEUE_STAT_PACKETS')
local pfc_rx_packets = redis.call('HGET', counters_table_name .. ':' .. port_id, pfc_rx_pkt_key)
local pfc_duration = redis.call('HGET', counters_table_name .. ':' .. port_id, pfc_duration_key)

if occupancy_bytes and packets and pfc_rx_packets and pfc_duration then
occupancy_bytes = tonumber(occupancy_bytes)
packets = tonumber(packets)
pfc_rx_packets = tonumber(pfc_rx_packets)
pfc_duration = tonumber(pfc_duration)

local packets_last = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'SAI_QUEUE_STAT_PACKETS_last')
local pfc_rx_packets_last = redis.call('HGET', counters_table_name .. ':' .. port_id, pfc_rx_pkt_key .. '_last')
local pfc_duration_last = redis.call('HGET', counters_table_name .. ':' .. port_id, pfc_duration_key .. '_last')
-- DEBUG CODE START. Uncomment to enable
local debug_storm = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'DEBUG_STORM')
-- DEBUG CODE END.

-- If this is not a first run, then we have last values available
if packets_last and pfc_rx_packets_last and pfc_duration_last then
packets_last = tonumber(packets_last)
pfc_rx_packets_last = tonumber(pfc_rx_packets_last)
pfc_duration_last = tonumber(pfc_duration_last)
local storm_condition = (pfc_duration - pfc_duration_last) > (poll_time * 0.8)

-- Check actual condition of queue being in PFC storm
if (occupancy_bytes > 0 and packets - packets_last == 0 and pfc_rx_packets - pfc_rx_packets_last > 0) or
-- DEBUG CODE START. Uncomment to enable
(debug_storm == "enabled") or
-- DEBUG CODE END.
(occupancy_bytes == 0 and packets - packets_last == 0 and storm_condition) then
if time_left <= poll_time then
redis.call('HDEL', counters_table_name .. ':' .. port_id, pfc_rx_pkt_key .. '_last')
redis.call('HDEL', counters_table_name .. ':' .. port_id, pfc_duration_key .. '_last')
redis.call('PUBLISH', 'PFC_WD_ACTION', '["' .. KEYS[i] .. '","storm"]')
is_deadlock = true
time_left = detection_time
else
time_left = time_left - poll_time
end
else
if pfc_wd_action == 'alert' and pfc_wd_status ~= 'operational' then
redis.call('PUBLISH', 'PFC_WD_ACTION', '["' .. KEYS[i] .. '","restore"]')
end
time_left = detection_time
end
end

-- Save values for next run
redis.call('HSET', counters_table_name .. ':' .. KEYS[i], 'SAI_QUEUE_STAT_PACKETS_last', packets)
redis.call('HSET', counters_table_name .. ':' .. KEYS[i], 'PFC_WD_DETECTION_TIME_LEFT', time_left)
if is_deadlock == false then
redis.call('HSET', counters_table_name .. ':' .. port_id, pfc_rx_pkt_key .. '_last', pfc_rx_packets)
redis.call('HSET', counters_table_name .. ':' .. port_id, pfc_duration_key .. '_last', pfc_duration)
end
end
end
end
end
end

return rets
216 changes: 216 additions & 0 deletions tests/test_pfcwd.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,222 @@ def test_PfcWdAclCreationDeletion(self, dvs, dvs_acl, testlog):

finally:
dvs_acl.remove_acl_table(PFCWD_TABLE_NAME)


class TestPfcwdFunc(object):
@pytest.fixture
def setup_teardown_test(self, dvs):
self.get_db_handle(dvs)

self.test_ports = ["Ethernet0"]

self.setup_test(dvs)
self.get_port_oids()
self.get_queue_oids()

yield

self.teardown_test(dvs)

def setup_test(self, dvs):
# get original cable len for test ports
fvs = self.config_db.get_entry("CABLE_LENGTH", "AZURE")
self.orig_cable_len = dict()
for port in self.test_ports:
self.orig_cable_len[port] = fvs[port]
# set cable len to non zero value. if port is down, default cable len is 0
self.set_cable_len(port, "5m")
# startup port
dvs.runcmd("config interface startup {}".format(port))

# enable pfcwd
self.set_flex_counter_status("PFCWD", "enable")
# enable queue so that queue oids are generated
self.set_flex_counter_status("QUEUE", "enable")

def teardown_test(self, dvs):
# disable pfcwd
self.set_flex_counter_status("PFCWD", "disable")
# disable queue
self.set_flex_counter_status("QUEUE", "disable")

for port in self.test_ports:
if self.orig_cable_len:
self.set_cable_len(port, self.orig_cable_len[port])
# shutdown port
dvs.runcmd("config interface shutdown {}".format(port))

def get_db_handle(self, dvs):
self.app_db = dvs.get_app_db()
self.asic_db = dvs.get_asic_db()
self.config_db = dvs.get_config_db()
self.counters_db = dvs.get_counters_db()

def set_flex_counter_status(self, key, state):
fvs = {'FLEX_COUNTER_STATUS': state}
self.config_db.update_entry("FLEX_COUNTER_TABLE", key, fvs)
time.sleep(1)

def get_queue_oids(self):
self.queue_oids = self.counters_db.get_entry("COUNTERS_QUEUE_NAME_MAP", "")

def get_port_oids(self):
self.port_oids = self.counters_db.get_entry("COUNTERS_PORT_NAME_MAP", "")

def _get_bitmask(self, queues):
mask = 0
if queues is not None:
for queue in queues:
mask = mask | 1 << queue

return str(mask)

def set_ports_pfc(self, status='enable', pfc_queues=[3,4]):
for port in self.test_ports:
if 'enable' in status:
fvs = {'pfc_enable': ",".join([str(q) for q in pfc_queues])}
self.config_db.create_entry("PORT_QOS_MAP", port, fvs)
else:
self.config_db.delete_entry("PORT_QOS_MAP", port)

def set_cable_len(self, port_name, cable_len):
fvs = {port_name: cable_len}
self.config_db.update_entry("CABLE_LEN", "AZURE", fvs)

def start_pfcwd_on_ports(self, poll_interval="200", detection_time="200", restoration_time="200", action="drop"):
pfcwd_info = {"POLL_INTERVAL": poll_interval}
self.config_db.update_entry("PFC_WD", "GLOBAL", pfcwd_info)

pfcwd_info = {"action": action,
"detection_time" : detection_time,
"restoration_time": restoration_time
}
for port in self.test_ports:
self.config_db.update_entry("PFC_WD", port, pfcwd_info)

def stop_pfcwd_on_ports(self):
for port in self.test_ports:
self.config_db.delete_entry("PFC_WD", port)

def verify_ports_pfc(self, queues=None):
mask = self._get_bitmask(queues)
fvs = {"SAI_PORT_ATTR_PRIORITY_FLOW_CONTROL" : mask}
for port in self.test_ports:
self.asic_db.wait_for_field_match("ASIC_STATE:SAI_OBJECT_TYPE_PORT", self.port_oids[port], fvs)

def verify_pfcwd_state(self, queues, state="stormed"):
fvs = {"PFC_WD_STATUS": state}
for port in self.test_ports:
for queue in queues:
queue_name = port + ":" + str(queue)
self.counters_db.wait_for_field_match("COUNTERS", self.queue_oids[queue_name], fvs)

def verify_pfcwd_counters(self, queues, restore="0"):
fvs = {"PFC_WD_QUEUE_STATS_DEADLOCK_DETECTED" : "1",
"PFC_WD_QUEUE_STATS_DEADLOCK_RESTORED" : restore
}
for port in self.test_ports:
for queue in queues:
queue_name = port + ":" + str(queue)
self.counters_db.wait_for_field_match("COUNTERS", self.queue_oids[queue_name], fvs)

def reset_pfcwd_counters(self, queues):
fvs = {"PFC_WD_QUEUE_STATS_DEADLOCK_DETECTED" : "0",
"PFC_WD_QUEUE_STATS_DEADLOCK_RESTORED" : "0"
}
for port in self.test_ports:
for queue in queues:
queue_name = port + ":" + str(queue)
self.counters_db.update_entry("COUNTERS", self.queue_oids[queue_name], fvs)

def set_storm_state(self, queues, state="enabled"):
fvs = {"DEBUG_STORM": state}
for port in self.test_ports:
for queue in queues:
queue_name = port + ":" + str(queue)
self.counters_db.update_entry("COUNTERS", self.queue_oids[queue_name], fvs)

def test_pfcwd_single_queue(self, dvs, setup_teardown_test):
try:
# enable PFC on queues
test_queues = [3, 4]
self.set_ports_pfc(pfc_queues=test_queues)

# verify in asic db
self.verify_ports_pfc(test_queues)

# start pfcwd
self.start_pfcwd_on_ports()

# start pfc storm
storm_queue = [3]
self.set_storm_state(storm_queue)

# verify pfcwd is triggered
self.verify_pfcwd_state(storm_queue)

# verify pfcwd counters
self.verify_pfcwd_counters(storm_queue)

# verify if queue is disabled
self.verify_ports_pfc(queues=[4])

# stop storm
self.set_storm_state(storm_queue, state="disabled")

# verify pfcwd state is restored
self.verify_pfcwd_state(storm_queue, state="operational")

# verify pfcwd counters
self.verify_pfcwd_counters(storm_queue, restore="1")

# verify if queue is enabled
self.verify_ports_pfc(test_queues)

finally:
self.reset_pfcwd_counters(storm_queue)
self.stop_pfcwd_on_ports()

def test_pfcwd_multi_queue(self, dvs, setup_teardown_test):
try:
# enable PFC on queues
test_queues = [3, 4]
self.set_ports_pfc(pfc_queues=test_queues)

# verify in asic db
self.verify_ports_pfc(test_queues)

# start pfcwd
self.start_pfcwd_on_ports()

# start pfc storm
self.set_storm_state(test_queues)

# verify pfcwd is triggered
self.verify_pfcwd_state(test_queues)

# verify pfcwd counters
self.verify_pfcwd_counters(test_queues)

# verify if queue is disabled. Expected mask is 0
self.verify_ports_pfc()

# stop storm
self.set_storm_state(test_queues, state="disabled")

# verify pfcwd state is restored
self.verify_pfcwd_state(test_queues, state="operational")

# verify pfcwd counters
self.verify_pfcwd_counters(test_queues, restore="1")

# verify if queue is enabled
self.verify_ports_pfc(test_queues)

finally:
self.reset_pfcwd_counters(test_queues)
self.stop_pfcwd_on_ports()

#
# Add Dummy always-pass test at end as workaroud
# for issue when Flaky fail on final test it invokes module tear-down before retrying
Expand Down

0 comments on commit 8fd6e48

Please sign in to comment.