Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ResponsePublisher] add pipeline support #2511

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ swssconfig/swssplayer
tlm_teamd/tlm_teamd
teamsyncd/teamsyncd
tests/tests
tests/mock_tests/tests_response_publisher
tests/mock_tests/tests_fpmsyncd


Expand Down
5 changes: 5 additions & 0 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,11 @@ void Orch::dumpPendingTasks(vector<string> &ts)
}
}

void Orch::flushResponses()
{
m_publisher.flush();
}

void Orch::logfileReopen()
{
gRecordOfs.close();
Expand Down
5 changes: 5 additions & 0 deletions orchagent/orch.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,11 @@ class Orch
static void recordTuple(Consumer &consumer, const swss::KeyOpFieldsValuesTuple &tuple);

void dumpPendingTasks(std::vector<std::string> &ts);

/**
* @brief Flush pending responses
*/
void flushResponses();
protected:
ConsumerMap m_consumerMap;

Expand Down
5 changes: 5 additions & 0 deletions orchagent/orchdaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,11 @@ void OrchDaemon::flush()
SWSS_LOG_ERROR("Failed to flush redis pipeline %d", status);
abort();
}

for (auto* orch: m_orchList)
{
orch->flushResponses();
}
}

/* Release the file handle so the log can be rotated */
Expand Down
37 changes: 22 additions & 15 deletions orchagent/response_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ void RecordResponse(const std::string &response_channel, const std::string &key,

} // namespace

ResponsePublisher::ResponsePublisher() : m_db("APPL_STATE_DB", 0)
ResponsePublisher::ResponsePublisher(bool buffered) :
m_db(std::make_unique<swss::DBConnector>("APPL_STATE_DB", 0)),
m_pipe(std::make_unique<swss::RedisPipeline>(m_db.get())),
Copy link
Contributor

@qiluo-msft qiluo-msft Jan 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

m_db

You are assuming m_pipe initialized after m_db. The sequence is tricky. Suggest move m_pipe to function body. #Closed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

m_buffered(buffered)
{
}

Expand All @@ -107,17 +110,14 @@ void ResponsePublisher::publish(const std::string &table, const std::string &key
}

std::string response_channel = "APPL_DB_" + table + "_RESPONSE_CHANNEL";
if (m_notifiers.find(table) == m_notifiers.end())
{
m_notifiers[table] = std::make_unique<swss::NotificationProducer>(&m_db, response_channel);
}
swss::NotificationProducer notificationProducer{m_pipe.get(), response_channel, m_buffered};
Copy link
Contributor

@qiluo-msft qiluo-msft Jan 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NotificationProducer

Is it too expensive to create a new NotificationProducer for each function call? #Closed

Copy link
Contributor Author

@stepanblyschak stepanblyschak Jan 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qiluo-msft I find it cheap, just arguments copy. It might be cheaper then a hash table lookup, depending on hash function and a load factor, so without it it takes a bit more deterministic time.

In my testing old and new implementation show the same result (measured published responses/sec), probably bound to redis IO anyway.


auto intent_attrs_copy = intent_attrs;
// Add error message as the first field-value-pair.
swss::FieldValueTuple err_str("err_str", PrependedComponent(status) + status.message());
intent_attrs_copy.insert(intent_attrs_copy.begin(), err_str);
// Sends the response to the notification channel.
m_notifiers[table]->send(status.codeStr(), key, intent_attrs_copy);
notificationProducer.send(status.codeStr(), key, intent_attrs_copy);
RecordResponse(response_channel, key, intent_attrs_copy, status.codeStr());
}

Expand All @@ -140,17 +140,14 @@ void ResponsePublisher::publish(const std::string &table, const std::string &key
void ResponsePublisher::writeToDB(const std::string &table, const std::string &key,
const std::vector<swss::FieldValueTuple> &values, const std::string &op, bool replace)
{
if (m_tables.find(table) == m_tables.end())
{
m_tables[table] = std::make_unique<swss::Table>(&m_db, table);
}
swss::Table applStateTable{m_pipe.get(), table, m_buffered};
Copy link
Contributor

@qiluo-msft qiluo-msft Jan 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

applStateTable

Is it too expensive to create a new Table for each function call? #Closed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the argument is "arguments copy". Seem the constructor is not just arguments copy. For example: TableBase::TableBase() includes a map find.

Copy link
Contributor Author

@stepanblyschak stepanblyschak Jan 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qiluo-msft So it's one find with an integer key + arguments copy vs 2 finds with string keys.
I did a benchmark to prove my point and got these results:

static void BM_newResponsePublisher(benchmark::State& state) {
    ResponsePublisher publisher{};

    for (auto _ : state) {
        publisher.publish("SOME_TABLE", "SOME_KEY", {}, ReturnCode(SAI_STATUS_SUCCESS));
    }
}

static void BM_oldResponsePublisher(benchmark::State& state) {
    old::ResponsePublisher publisher{};

    for (auto _ : state) {
        publisher.publish("SOME_TABLE", "SOME_KEY", {}, ReturnCode(SAI_STATUS_SUCCESS));
    }
}

Compiled with:

g++ -std=c++14 -O2 -I ../orchagent/ -I /usr/include/swss -I /usr/include/sai/ ../orchagent/response_publisher.cpp ../orchagent/response_publisher_old.cpp main.cpp -o main -lswsscommon -lhiredis -lbenchmark

Full code is here - https://github.com/stepanblyschak/sonic-swss/blob/response-publisher-benchmarks/benchmarks/main.cpp

stepanb@333f1eb97224:/sonic/src/sonic-swss/benchmarks$ ./main 
2023-01-29T17:42:27+00:00
Running ./main
Run on (12 X 1600 MHz CPU s)
CPU Caches:
  L1 Data 32 KiB (x12)
  L1 Instruction 32 KiB (x12)
  L2 Unified 256 KiB (x12)
  L3 Unified 15360 KiB (x2)
Load Average: 0.55, 0.32, 0.30
------------------------------------------------------------------
Benchmark                        Time             CPU   Iterations
------------------------------------------------------------------
BM_newResponsePublisher      61831 ns        46291 ns        14984
BM_oldResponsePublisher      63855 ns        47892 ns        15570

Most of the time it is IO bound of course, but I constantly get a little bit better result with new implementation every time a run the benchmark. So, the caching makes it even a little bit worse and the new implementation is 3.16 % faster. Please note, that both don't use redis pipeline as the new implementation will clearly be a winner by a couple of times.


auto attrs = values;
if (op == SET_COMMAND)
{
if (replace)
{
m_tables[table]->del(key);
applStateTable.del(key);
}
if (!values.size())
{
Expand All @@ -160,9 +157,9 @@ void ResponsePublisher::writeToDB(const std::string &table, const std::string &k
// Write to DB only if the key does not exist or non-NULL attributes are
// being written to the entry.
std::vector<swss::FieldValueTuple> fv;
if (!m_tables[table]->get(key, fv))
if (!applStateTable.get(key, fv))
{
m_tables[table]->set(key, attrs);
applStateTable.set(key, attrs);
RecordDBWrite(table, key, attrs, op);
return;
}
Expand All @@ -179,13 +176,23 @@ void ResponsePublisher::writeToDB(const std::string &table, const std::string &k
}
if (attrs.size())
{
m_tables[table]->set(key, attrs);
applStateTable.set(key, attrs);
RecordDBWrite(table, key, attrs, op);
}
}
else if (op == DEL_COMMAND)
{
m_tables[table]->del(key);
applStateTable.del(key);
RecordDBWrite(table, key, {}, op);
}
}

void ResponsePublisher::flush()
{
m_pipe->flush();
}

void ResponsePublisher::setBuffered(bool buffered)
{
m_buffered = buffered;
}
24 changes: 18 additions & 6 deletions orchagent/response_publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
class ResponsePublisher : public ResponsePublisherInterface
{
public:
explicit ResponsePublisher();
explicit ResponsePublisher(bool buffered = false);

virtual ~ResponsePublisher() = default;

// Intent attributes are the attributes sent in the notification into the
Expand All @@ -42,10 +43,21 @@ class ResponsePublisher : public ResponsePublisherInterface
void writeToDB(const std::string &table, const std::string &key, const std::vector<swss::FieldValueTuple> &values,
const std::string &op, bool replace = false) override;

/**
* @brief Flush pending responses
*/
void flush();

/**
* @brief Set buffering mode
*
* @param buffered Flag whether responses are buffered
*/
void setBuffered(bool buffered);

private:
swss::DBConnector m_db;
// Maps table names to tables.
std::unordered_map<std::string, std::unique_ptr<swss::Table>> m_tables;
// Maps table names to notifiers.
std::unordered_map<std::string, std::unique_ptr<swss::NotificationProducer>> m_notifiers;
std::unique_ptr<swss::DBConnector> m_db;
std::unique_ptr<swss::RedisPipeline> m_pipe;

bool m_buffered{false};
};
21 changes: 19 additions & 2 deletions tests/mock_tests/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ P4_ORCH_DIR = $(top_srcdir)/orchagent/p4orch

CFLAGS_SAI = -I /usr/include/sai

TESTS = tests tests_intfmgrd tests_portsyncd tests_fpmsyncd
TESTS = tests tests_intfmgrd tests_portsyncd tests_fpmsyncd tests_response_publisher

noinst_PROGRAMS = tests tests_intfmgrd tests_portsyncd tests_fpmsyncd
noinst_PROGRAMS = tests tests_intfmgrd tests_portsyncd tests_fpmsyncd tests_response_publisher

LDADD_SAI = -lsaimeta -lsaimetadata -lsaivs -lsairedis

Expand Down Expand Up @@ -182,3 +182,20 @@ tests_fpmsyncd_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST
tests_fpmsyncd_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(CFLAGS_SAI) $(tests_fpmsyncd_INCLUDES)
tests_fpmsyncd_LDADD = $(LDADD_GTEST) $(LDADD_SAI) -lnl-genl-3 -lhiredis -lhiredis \
-lswsscommon -lswsscommon -lgtest -lgtest_main -lzmq -lnl-3 -lnl-route-3 -lpthread -lgmock -lgmock_main

## response publisher unit tests

tests_response_publisher_SOURCES = response_publisher/response_publisher_ut.cpp \
$(top_srcdir)/orchagent/response_publisher.cpp \
mock_orchagent_main.cpp \
mock_dbconnector.cpp \
mock_table.cpp \
mock_hiredis.cpp \
mock_redisreply.cpp

tests_response_publisher_INCLUDES = $(tests_INCLUDES)
tests_response_publisher_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(CFLAGS_SAI)
tests_response_publisher_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(CFLAGS_SAI) $(tests_response_publisher_INCLUDES)
tests_response_publisher_LDADD = $(LDADD_GTEST) $(LDADD_SAI) -lnl-genl-3 -lhiredis -lhiredis \
-lswsscommon -lswsscommon -lgtest -lgtest_main -lzmq -lnl-3 -lnl-route-3 -lpthread

6 changes: 5 additions & 1 deletion tests/mock_tests/fake_response_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

#include "response_publisher.h"

ResponsePublisher::ResponsePublisher() : m_db("APPL_STATE_DB", 0) {}
ResponsePublisher::ResponsePublisher(bool buffered) : m_db(std::make_unique<swss::DBConnector>("APPL_STATE_DB", 0)), m_buffered(buffered) {}

void ResponsePublisher::publish(
const std::string& table, const std::string& key,
Expand All @@ -20,3 +20,7 @@ void ResponsePublisher::writeToDB(
const std::string& table, const std::string& key,
const std::vector<swss::FieldValueTuple>& values, const std::string& op,
bool replace) {}

void ResponsePublisher::flush() {}

void ResponsePublisher::setBuffered(bool buffered) {}
37 changes: 37 additions & 0 deletions tests/mock_tests/response_publisher/response_publisher_ut.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#include "response_publisher.h"

#include <gtest/gtest.h>

bool gResponsePublisherRecord{false};
bool gResponsePublisherLogRotate{false};
std::ofstream gResponsePublisherRecordOfs;
std::string gResponsePublisherRecordFile;

using namespace swss;

TEST(ResponsePublisher, TestPublish)
{
DBConnector conn{"APPL_STATE_DB", 0};
Table stateTable{&conn, "SOME_TABLE"};
std::string value;
ResponsePublisher publisher{};

publisher.publish("SOME_TABLE", "SOME_KEY", {{"field", "value"}}, ReturnCode(SAI_STATUS_SUCCESS));
ASSERT_TRUE(stateTable.hget("SOME_KEY", "field", value));
ASSERT_EQ(value, "value");
}

TEST(ResponsePublisher, TestPublishBuffered)
{
DBConnector conn{"APPL_STATE_DB", 0};
Table stateTable{&conn, "SOME_TABLE"};
std::string value;
ResponsePublisher publisher{};

publisher.setBuffered(true);

publisher.publish("SOME_TABLE", "SOME_KEY", {{"field", "value"}}, ReturnCode(SAI_STATUS_SUCCESS));
publisher.flush();
ASSERT_TRUE(stateTable.hget("SOME_KEY", "field", value));
ASSERT_EQ(value, "value");
}