Skip to content

Commit

Permalink
[QoS] Optimize QoS operations of buffer setting and queue information…
Browse files Browse the repository at this point in the history
… fetching (#2752)

What I did
Optimize QoS operations:

Cache queue information to avoid fetching them from SAI every time
The cache is created when a queue's information is fetched for the first time
Avoid calling SAI API to fetch queue information if it exists in the cache
Cache will be cleared for the queues of a certain port when the port is removed
Apply buffer items (table: BUFFER_QUEUE, BUFFER_PG, BUFFER_PORT_INGRESS_PROFILE_LIST, BUFFER_PORT_EGRESS_PROFILE_LIST) only if they are updated
There is only one attribute, profile or profile_list, in the items in all the tables, and the attribute is stored in BufferOrch::m_buffer_type_maps, which means we can just check whether the new value is the same as the one stored in the mapping and apply to SAI only if it differs.
For the BUFFER_QUEUE table, it's possible that it needs to retry when a PFC storm is detected on the queue. A new set m_partiallyAppliedQueues is introduced to handle this case.
In any case, if it fails to call SAI API, we do not repeat calling it when the buffer table is set with the same value of attribute because it's users' responsibility to correct the configuration.

Signed-off-by: Stephen Sun stephens@nvidia.com

Why I did it
Theoretically, it should be fast for both operations. But there is a mutex in sairedis enforcing a critical section for all SAI APIs. In case there is another SAI API ongoing, eg. fetching the counter, it has to wait for the current one to finish which can take more milliseconds. This occurs frequently when a large number of buffer PG or queue items are being set and the accumulated time is significant. In this scenario, two threads run parallelly and they will compete the critical section.

Syncd main thread in which the buffer PG, queue setting API, or queue info getting API runs,
FlexCounter thread in which the counter is fetched.

How I verified it
Mock test
Regression test

Details if related
An example of queue information fetching. For each queue, the information is fetched for 5 times, which consumes ~0.25 seconds. With the caching logic, it will be called only once.

2023-04-20.18:01:00.634562|a|INIT_VIEW
2023-04-20.18:01:00.635586|A|SAI_STATUS_SUCCESS
--
2023-04-20.18:01:43.290205|g|SAI_OBJECT_TYPE_QUEUE:oid:0x15000000000549|SAI_QUEUE_ATTR_TYPE=SAI_QUEUE_TYPE_ALL|SAI_QUEUE_ATTR_INDEX=205
2023-04-20.18:01:43.331625|G|SAI_STATUS_SUCCESS|SAI_QUEUE_ATTR_TYPE=SAI_QUEUE_TYPE_UNICAST|SAI_QUEUE_ATTR_INDEX=4
--
2023-04-20.18:01:46.420931|g|SAI_OBJECT_TYPE_QUEUE:oid:0x15000000000549|SAI_QUEUE_ATTR_TYPE=SAI_QUEUE_TYPE_ALL|SAI_QUEUE_ATTR_INDEX=0
2023-04-20.18:01:46.422113|G|SAI_STATUS_SUCCESS|SAI_QUEUE_ATTR_TYPE=SAI_QUEUE_TYPE_UNICAST|SAI_QUEUE_ATTR_INDEX=4
--
2023-04-20.18:01:56.825879|g|SAI_OBJECT_TYPE_QUEUE:oid:0x15000000000549|SAI_QUEUE_ATTR_TYPE=SAI_QUEUE_TYPE_ALL|SAI_QUEUE_ATTR_INDEX=24
2023-04-20.18:01:56.866720|G|SAI_STATUS_SUCCESS|SAI_QUEUE_ATTR_TYPE=SAI_QUEUE_TYPE_UNICAST|SAI_QUEUE_ATTR_INDEX=4
--
2023-04-20.18:02:37.248679|a|APPLY_VIEW
2023-04-20.18:02:37.249435|A|SAI_STATUS_SUCCESS
--
2023-04-20.18:02:54.824194|g|SAI_OBJECT_TYPE_QUEUE:oid:0x15000000000549|SAI_QUEUE_ATTR_TYPE=SAI_QUEUE_TYPE_ALL|SAI_QUEUE_ATTR_INDEX=205
2023-04-20.18:02:54.866955|G|SAI_STATUS_SUCCESS|SAI_QUEUE_ATTR_TYPE=SAI_QUEUE_TYPE_UNICAST|SAI_QUEUE_ATTR_INDEX=4
--
2023-04-20.18:02:54.932174|g|SAI_OBJECT_TYPE_QUEUE:oid:0x15000000000549|SAI_QUEUE_ATTR_TYPE=SAI_QUEUE_TYPE_ALL|SAI_QUEUE_ATTR_INDEX=205
2023-04-20.18:02:54.965082|G|SAI_STATUS_SUCCESS|SAI_QUEUE_ATTR_TYPE=SAI_QUEUE_TYPE_UNICAST|SAI_QUEUE_ATTR_INDEX=4
  • Loading branch information
stephenxs authored May 22, 2023
1 parent fe8c395 commit cc06b18
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 18 deletions.
41 changes: 41 additions & 0 deletions orchagent/bufferorch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,21 @@ task_process_status BufferOrch::processQueue(KeyOpFieldsValuesTuple &tuple)
return task_process_status::task_failed;
}

string old_buffer_profile_name;
if (doesObjectExist(m_buffer_type_maps, APP_BUFFER_QUEUE_TABLE_NAME, key, buffer_profile_field_name, old_buffer_profile_name)
&& (old_buffer_profile_name == buffer_profile_name))
{
if (m_partiallyAppliedQueues.find(key) == m_partiallyAppliedQueues.end())
{
SWSS_LOG_INFO("Skip setting buffer queue %s to %s since it is not changed", key.c_str(), buffer_profile_name.c_str());
return task_process_status::task_success;
}
else
{
m_partiallyAppliedQueues.erase(key);
}
}

SWSS_LOG_NOTICE("Set buffer queue %s to %s", key.c_str(), buffer_profile_name.c_str());

setObjectReference(m_buffer_type_maps, APP_BUFFER_QUEUE_TABLE_NAME, key, buffer_profile_field_name, buffer_profile_name);
Expand All @@ -854,6 +869,7 @@ task_process_status BufferOrch::processQueue(KeyOpFieldsValuesTuple &tuple)
sai_buffer_profile = SAI_NULL_OBJECT_ID;
SWSS_LOG_NOTICE("Remove buffer queue %s", key.c_str());
removeObject(m_buffer_type_maps, APP_BUFFER_QUEUE_TABLE_NAME, key);
m_partiallyAppliedQueues.erase(key);
}
else
{
Expand Down Expand Up @@ -898,6 +914,7 @@ task_process_status BufferOrch::processQueue(KeyOpFieldsValuesTuple &tuple)
if (port.m_queue_lock[ind])
{
SWSS_LOG_WARN("Queue %zd on port %s is locked, will retry", ind, port_name.c_str());
m_partiallyAppliedQueues.insert(key);
return task_process_status::task_need_retry;
}
queue_id = port.m_queue_ids[ind];
Expand Down Expand Up @@ -1038,6 +1055,14 @@ task_process_status BufferOrch::processPriorityGroup(KeyOpFieldsValuesTuple &tup
return task_process_status::task_failed;
}

string old_buffer_profile_name;
if (doesObjectExist(m_buffer_type_maps, APP_BUFFER_PG_TABLE_NAME, key, buffer_profile_field_name, old_buffer_profile_name)
&& (old_buffer_profile_name == buffer_profile_name))
{
SWSS_LOG_INFO("Skip setting buffer priority group %s to %s since it is not changed", key.c_str(), buffer_profile_name.c_str());
return task_process_status::task_success;
}

SWSS_LOG_NOTICE("Set buffer PG %s to %s", key.c_str(), buffer_profile_name.c_str());

setObjectReference(m_buffer_type_maps, APP_BUFFER_PG_TABLE_NAME, key, buffer_profile_field_name, buffer_profile_name);
Expand Down Expand Up @@ -1209,6 +1234,14 @@ task_process_status BufferOrch::processIngressBufferProfileList(KeyOpFieldsValue
return task_process_status::task_failed;
}

string old_profile_name_list;
if (doesObjectExist(m_buffer_type_maps, APP_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME, key, buffer_profile_list_field_name, old_profile_name_list)
&& (old_profile_name_list == profile_name_list))
{
SWSS_LOG_INFO("Skip setting buffer ingress profile list %s to %s since it is not changed", key.c_str(), profile_name_list.c_str());
return task_process_status::task_success;
}

setObjectReference(m_buffer_type_maps, APP_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME, key, buffer_profile_list_field_name, profile_name_list);

attr.value.objlist.count = (uint32_t)profile_list.size();
Expand Down Expand Up @@ -1280,6 +1313,14 @@ task_process_status BufferOrch::processEgressBufferProfileList(KeyOpFieldsValues
return task_process_status::task_failed;
}

string old_profile_name_list;
if (doesObjectExist(m_buffer_type_maps, APP_BUFFER_PORT_EGRESS_PROFILE_LIST_NAME, key, buffer_profile_list_field_name, old_profile_name_list)
&& (old_profile_name_list == profile_name_list))
{
SWSS_LOG_INFO("Skip setting buffer egress profile list %s to %s since it is not changed", key.c_str(), profile_name_list.c_str());
return task_process_status::task_success;
}

setObjectReference(m_buffer_type_maps, APP_BUFFER_PORT_EGRESS_PROFILE_LIST_NAME, key, buffer_profile_list_field_name, profile_name_list);

attr.value.objlist.count = (uint32_t)profile_list.size();
Expand Down
2 changes: 1 addition & 1 deletion orchagent/bufferorch.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class BufferOrch : public Orch
unique_ptr<DBConnector> m_countersDb;

bool m_isBufferPoolWatermarkCounterIdListGenerated = false;

set<string> m_partiallyAppliedQueues;
};
#endif /* SWSS_BUFFORCH_H */

43 changes: 33 additions & 10 deletions orchagent/portsorch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2448,19 +2448,36 @@ bool PortsOrch::getQueueTypeAndIndex(sai_object_id_t queue_id, string &type, uin
{
SWSS_LOG_ENTER();

sai_attribute_t attr[2];
attr[0].id = SAI_QUEUE_ATTR_TYPE;
attr[1].id = SAI_QUEUE_ATTR_INDEX;
auto const &queueInfoRef = m_queueInfo.find(queue_id);

sai_status_t status = sai_queue_api->get_queue_attribute(queue_id, 2, attr);
if (status != SAI_STATUS_SUCCESS)
sai_attribute_t attr[2];
if (queueInfoRef == m_queueInfo.end())
{
SWSS_LOG_ERROR("Failed to get queue type and index for queue %" PRIu64 " rv:%d", queue_id, status);
task_process_status handle_status = handleSaiGetStatus(SAI_API_QUEUE, status);
if (handle_status != task_process_status::task_success)
attr[0].id = SAI_QUEUE_ATTR_TYPE;
attr[1].id = SAI_QUEUE_ATTR_INDEX;

sai_status_t status = sai_queue_api->get_queue_attribute(queue_id, 2, attr);
if (status != SAI_STATUS_SUCCESS)
{
return false;
SWSS_LOG_ERROR("Failed to get queue type and index for queue %" PRIu64 " rv:%d", queue_id, status);
task_process_status handle_status = handleSaiGetStatus(SAI_API_QUEUE, status);
if (handle_status != task_process_status::task_success)
{
return false;
}
}

SWSS_LOG_INFO("Caching information (index %d type %d) for queue %" PRIx64, attr[1].value.u8, attr[0].value.s32, queue_id);

m_queueInfo[queue_id].type = static_cast<sai_queue_type_t>(attr[0].value.s32);
m_queueInfo[queue_id].index = attr[1].value.u8;
}
else
{
attr[0].value.s32 = m_queueInfo[queue_id].type;
attr[1].value.u8 = m_queueInfo[queue_id].index;

SWSS_LOG_INFO("Fetched cached information (index %d type %d) for queue %" PRIx64, attr[1].value.u8, attr[0].value.s32, queue_id);
}

switch (attr[0].value.s32)
Expand All @@ -2478,7 +2495,7 @@ bool PortsOrch::getQueueTypeAndIndex(sai_object_id_t queue_id, string &type, uin
type = "SAI_QUEUE_TYPE_UNICAST_VOQ";
break;
default:
SWSS_LOG_ERROR("Got unsupported queue type %d for %" PRIu64 " queue", attr[0].value.s32, queue_id);
SWSS_LOG_ERROR("Got unsupported queue type %d for %" PRIx64 " queue", attr[0].value.s32, queue_id);
throw runtime_error("Got unsupported queue type");
}

Expand Down Expand Up @@ -2755,6 +2772,12 @@ sai_status_t PortsOrch::removePort(sai_object_id_t port_id)

removePortSerdesAttribute(port_id);

for (auto queue_id : port.m_queue_ids)
{
SWSS_LOG_INFO("Removing cached information for queue %" PRIx64, queue_id);
m_queueInfo.erase(queue_id);
}

sai_status_t status = sai_port_api->remove_port(port_id);
if (status != SAI_STATUS_SUCCESS)
{
Expand Down
9 changes: 9 additions & 0 deletions orchagent/portsorch.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ struct VlanMemberUpdate
bool add;
};

struct queueInfo
{
// SAI_QUEUE_ATTR_TYPE
sai_queue_type_t type;
// SAI_QUEUE_ATTR_INDEX
sai_uint8_t index;
};

class PortsOrch : public Orch, public Subject
{
public:
Expand Down Expand Up @@ -465,5 +473,6 @@ class PortsOrch : public Orch, public Subject
set<sai_object_id_t> m_macsecEnabledPorts;

std::unordered_set<std::string> generateCounterStats(const string& type, bool gearbox = false);
map<sai_object_id_t, struct queueInfo> m_queueInfo;
};
#endif /* SWSS_PORTSORCH_H */
119 changes: 112 additions & 7 deletions tests/mock_tests/bufferorch_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ namespace bufferorch_test

sai_port_api_t ut_sai_port_api;
sai_port_api_t *pold_sai_port_api;
sai_buffer_api_t ut_sai_buffer_api;
sai_buffer_api_t *pold_sai_buffer_api;
sai_queue_api_t ut_sai_queue_api;
sai_queue_api_t *pold_sai_queue_api;

shared_ptr<swss::DBConnector> m_app_db;
shared_ptr<swss::DBConnector> m_app_state_db;
Expand Down Expand Up @@ -51,17 +55,47 @@ namespace bufferorch_test
return pold_sai_port_api->set_port_attribute(port_id, attr);
}

void _hook_sai_port_api()
uint32_t _ut_stub_set_pg_count;
sai_status_t _ut_stub_sai_set_ingress_priority_group_attribute(
_In_ sai_object_id_t ingress_priority_group_id,
_In_ const sai_attribute_t *attr)
{
_ut_stub_set_pg_count++;
return pold_sai_buffer_api->set_ingress_priority_group_attribute(ingress_priority_group_id, attr);
}

uint32_t _ut_stub_set_queue_count;
sai_status_t _ut_stub_sai_set_queue_attribute(
_In_ sai_object_id_t queue_id,
_In_ const sai_attribute_t *attr)
{
_ut_stub_set_queue_count++;
return pold_sai_queue_api->set_queue_attribute(queue_id, attr);
}

void _hook_sai_apis()
{
ut_sai_port_api = *sai_port_api;
pold_sai_port_api = sai_port_api;
ut_sai_port_api.set_port_attribute = _ut_stub_sai_set_port_attribute;
sai_port_api = &ut_sai_port_api;

ut_sai_buffer_api = *sai_buffer_api;
pold_sai_buffer_api = sai_buffer_api;
ut_sai_buffer_api.set_ingress_priority_group_attribute = _ut_stub_sai_set_ingress_priority_group_attribute;
sai_buffer_api = &ut_sai_buffer_api;

ut_sai_queue_api = *sai_queue_api;
pold_sai_queue_api = sai_queue_api;
ut_sai_queue_api.set_queue_attribute = _ut_stub_sai_set_queue_attribute;
sai_queue_api = &ut_sai_queue_api;
}

void _unhook_sai_port_api()
void _unhook_sai_apis()
{
sai_port_api = pold_sai_port_api;
sai_buffer_api = pold_sai_buffer_api;
sai_queue_api = pold_sai_queue_api;
}

struct BufferOrchTest : public ::testing::Test
Expand Down Expand Up @@ -341,6 +375,7 @@ namespace bufferorch_test

TEST_F(BufferOrchTest, BufferOrchTestBufferPgReferencingObjRemoveThenAdd)
{
_hook_sai_apis();
vector<string> ts;
std::deque<KeyOpFieldsValuesTuple> entries;
Table bufferPgTable = Table(m_app_db.get(), APP_BUFFER_PG_TABLE_NAME);
Expand Down Expand Up @@ -398,18 +433,34 @@ namespace bufferorch_test
bufferProfileConsumer->addToSync(entries);
entries.clear();
// Drain BUFFER_PROFILE_TABLE table
auto sai_pg_attr_set_count = _ut_stub_set_pg_count;
static_cast<Orch *>(gBufferOrch)->doTask();
// Make sure the dependency recovers
CheckDependency(APP_BUFFER_PG_TABLE_NAME, "Ethernet0:0", "profile", APP_BUFFER_PROFILE_TABLE_NAME, "ingress_lossy_profile");
ASSERT_EQ(++sai_pg_attr_set_count, _ut_stub_set_pg_count);

// All items have been drained
static_cast<Orch *>(gBufferOrch)->dumpPendingTasks(ts);
ASSERT_TRUE(ts.empty());

// Try applying the same profile, which should not call SAI API
entries.push_back({"Ethernet0:0", "SET",
{
{"profile", "ingress_lossy_profile"}
}});
bufferPgConsumer->addToSync(entries);
entries.clear();
sai_pg_attr_set_count = _ut_stub_set_pg_count;
static_cast<Orch *>(gBufferOrch)->doTask();
ASSERT_EQ(sai_pg_attr_set_count, _ut_stub_set_pg_count);
static_cast<Orch *>(gBufferOrch)->dumpPendingTasks(ts);
ASSERT_TRUE(ts.empty());
_unhook_sai_apis();
}

TEST_F(BufferOrchTest, BufferOrchTestReferencingObjRemoveThenAdd)
{
_hook_sai_port_api();
_hook_sai_apis();
vector<string> ts;
std::deque<KeyOpFieldsValuesTuple> entries;
Table bufferProfileListTable = Table(m_app_db.get(), APP_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME);
Expand Down Expand Up @@ -494,6 +545,29 @@ namespace bufferorch_test
// As an side-effect, all pending notifications should be drained
ASSERT_TRUE(ts.empty());

// Apply a buffer item only if it is changed
_ut_stub_expected_profile_list_type = SAI_PORT_ATTR_QOS_INGRESS_BUFFER_PROFILE_LIST;
_ut_stub_expected_profile_count = 1;
entries.push_back({"Ethernet0", "SET",
{
{"profile_list", "ingress_lossy_profile"}
}});
consumer = dynamic_cast<Consumer *>(gBufferOrch->getExecutor(APP_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME));
consumer->addToSync(entries);
sai_port_profile_list_create_count = _ut_stub_port_profile_list_add_count;
// Drain BUFFER_PORT_INGRESS_PROFILE_LIST_TABLE table
static_cast<Orch *>(gBufferOrch)->doTask();
ASSERT_EQ(++sai_port_profile_list_create_count, _ut_stub_port_profile_list_add_count);
static_cast<Orch *>(gBufferOrch)->dumpPendingTasks(ts);
ASSERT_TRUE(ts.empty());

// Try applying it for the second time, which should not call SAI API
consumer->addToSync(entries);
static_cast<Orch *>(gBufferOrch)->doTask();
ASSERT_EQ(sai_port_profile_list_create_count, _ut_stub_port_profile_list_add_count);
static_cast<Orch *>(gBufferOrch)->dumpPendingTasks(ts);
ASSERT_TRUE(ts.empty());

// To satisfy the coverage requirement
bufferProfileListTable.set("Ethernet0",
{
Expand All @@ -505,12 +579,12 @@ namespace bufferorch_test
ASSERT_EQ(ts[0], "BUFFER_PORT_INGRESS_PROFILE_LIST_TABLE:Ethernet0|SET|profile_list:ingress_no_exist_profile");
ts.clear();

_unhook_sai_port_api();
_unhook_sai_apis();
}

TEST_F(BufferOrchTest, BufferOrchTestCreateAndRemoveEgressProfileList)
{
_hook_sai_port_api();
_hook_sai_apis();
vector<string> ts;
std::deque<KeyOpFieldsValuesTuple> entries;
Table bufferPoolTable = Table(m_app_db.get(), APP_BUFFER_POOL_TABLE_NAME);
Expand Down Expand Up @@ -553,9 +627,21 @@ namespace bufferorch_test
CheckDependency(APP_BUFFER_PORT_EGRESS_PROFILE_LIST_NAME, "Ethernet0", "profile_list",
APP_BUFFER_PROFILE_TABLE_NAME, "egress_lossless_profile");

// Try applying it for the second time, which should not call SAI API
entries.push_back({"Ethernet0", "SET",
{
{"profile_list", "egress_lossless_profile"}
}});
auto consumer = dynamic_cast<Consumer *>(gBufferOrch->getExecutor(APP_BUFFER_PORT_EGRESS_PROFILE_LIST_NAME));
consumer->addToSync(entries);
entries.clear();
static_cast<Orch *>(gBufferOrch)->doTask();
ASSERT_EQ(sai_port_profile_list_create_count, _ut_stub_port_profile_list_add_count);
static_cast<Orch *>(gBufferOrch)->dumpPendingTasks(ts);
ASSERT_TRUE(ts.empty());

// Remove egress port profile list
entries.push_back({"Ethernet0", "DEL", {}});
auto consumer = dynamic_cast<Consumer *>(gBufferOrch->getExecutor(APP_BUFFER_PORT_EGRESS_PROFILE_LIST_NAME));
consumer->addToSync(entries);
entries.clear();
// Drain BUFFER_PORT_EGRESS_PROFILE_LIST_TABLE table
Expand All @@ -567,6 +653,25 @@ namespace bufferorch_test
static_cast<Orch *>(gBufferOrch)->dumpPendingTasks(ts);
ASSERT_TRUE(ts.empty());

_unhook_sai_port_api();
// Queue table
entries.push_back({"Ethernet0:0", "SET",
{
{"profile", "egress_lossless_profile"}
}});
consumer = dynamic_cast<Consumer *>(gBufferOrch->getExecutor(APP_BUFFER_QUEUE_TABLE_NAME));
consumer->addToSync(entries);
auto sai_queue_set_count = _ut_stub_set_queue_count;
static_cast<Orch *>(gBufferOrch)->doTask();
ASSERT_EQ(++sai_queue_set_count, _ut_stub_set_queue_count);
static_cast<Orch *>(gBufferOrch)->dumpPendingTasks(ts);
ASSERT_TRUE(ts.empty());

consumer->addToSync(entries);
static_cast<Orch *>(gBufferOrch)->doTask();
ASSERT_EQ(sai_queue_set_count, _ut_stub_set_queue_count);
static_cast<Orch *>(gBufferOrch)->dumpPendingTasks(ts);
ASSERT_TRUE(ts.empty());

_unhook_sai_apis();
}
}
Loading

0 comments on commit cc06b18

Please sign in to comment.