From 9dbbcd30b2490117bd005967a96eeabe10bbbb99 Mon Sep 17 00:00:00 2001 From: Qi Luo Date: Thu, 22 Jul 2021 02:18:12 -0700 Subject: [PATCH] Fix DBInterface blocking operations (#505) Fixes https://github.com/Azure/sonic-buildimage/issues/8202 - Fix pubsub channel timeout unit - Reset context err before next redisGetReply, otherwise later redisGetReply early return on no data, and burn CPU core in a loop. --- common/dbinterface.cpp | 3 ++- common/dbinterface.h | 30 +++++++++++++++--------------- tests/test_redis_ut.py | 20 +++++++++++++++++++- 3 files changed, 36 insertions(+), 17 deletions(-) diff --git a/common/dbinterface.cpp b/common/dbinterface.cpp index efd35a4e4..53e6ccd31 100644 --- a/common/dbinterface.cpp +++ b/common/dbinterface.cpp @@ -247,6 +247,7 @@ bool DBInterface::_unavailable_data_handler(const std::string& dbName, const cha auto& channel = keyspace_notification_channels.at(dbName); auto ctx = channel->getContext(); redisReply *reply; + ctx->err = REDIS_OK; // Stop later redisGetReply early return on no data after first redisReply timeout int rc = redisGetReply(ctx, reinterpret_cast(&reply)); if (rc == REDIS_ERR && ctx->err == REDIS_ERR_IO && errno == EAGAIN) { @@ -293,7 +294,7 @@ void DBInterface::_subscribe_keyspace_notification(const std::string& dbName) pubsub->psubscribe(KEYSPACE_PATTERN); // Set the timeout of the pubsub channel, so future redisGetReply will be impacted - struct timeval tv = { 0, (suseconds_t)(1000 * PUB_SUB_NOTIFICATION_TIMEOUT) }; + struct timeval tv = { PUB_SUB_NOTIFICATION_TIMEOUT, 0 }; int rc = redisSetTimeout(pubsub->getContext(), tv); if (rc != REDIS_OK) { diff --git a/common/dbinterface.h b/common/dbinterface.h index a5ac9e30b..ccf114a07 100644 --- a/common/dbinterface.h +++ b/common/dbinterface.h @@ -48,20 +48,6 @@ class DBInterface DBConnector& get_redis_client(const std::string& dbName); void set_redis_kwargs(std::string unix_socket_path, std::string host, int port); -private: - template - T blockable(FUNC f, const std::string& dbName, bool blocking = false); - // Unsubscribe the chosent client from keyspace event notifications - void _unsubscribe_keyspace_notification(const std::string& dbName); - bool _unavailable_data_handler(const std::string& dbName, const char *data); - // Subscribe the chosent client to keyspace event notifications - void _subscribe_keyspace_notification(const std::string& dbName); - // In the event Redis is unavailable, close existing connections, and try again. - void _connection_error_handler(const std::string& dbName); - void _onetime_connect(int dbId, const std::string& dbName); - // Keep reconnecting to Database 'dbId' until success - void _persistent_connect(int dbId, const std::string& dbName); - static const int BLOCKING_ATTEMPT_ERROR_THRESHOLD = 10; static const int BLOCKING_ATTEMPT_SUPPRESSION = BLOCKING_ATTEMPT_ERROR_THRESHOLD + 5; @@ -72,11 +58,25 @@ class DBInterface static const int DATA_RETRIEVAL_WAIT_TIME = 3; // Time to wait for any given message to arrive via pub-sub. - static constexpr double PUB_SUB_NOTIFICATION_TIMEOUT = 10.0; // seconds + static const int PUB_SUB_NOTIFICATION_TIMEOUT = 10; // seconds // Maximum allowable time to wait on a specific pub-sub notification. static constexpr double PUB_SUB_MAXIMUM_DATA_WAIT = 60.0; // seconds +private: + template + T blockable(FUNC f, const std::string& dbName, bool blocking = false); + // Unsubscribe the chosent client from keyspace event notifications + void _unsubscribe_keyspace_notification(const std::string& dbName); + bool _unavailable_data_handler(const std::string& dbName, const char *data); + // Subscribe the chosent client to keyspace event notifications + void _subscribe_keyspace_notification(const std::string& dbName); + // In the event Redis is unavailable, close existing connections, and try again. + void _connection_error_handler(const std::string& dbName); + void _onetime_connect(int dbId, const std::string& dbName); + // Keep reconnecting to Database 'dbId' until success + void _persistent_connect(int dbId, const std::string& dbName); + // Pub-sub keyspace pattern static constexpr const char *KEYSPACE_PATTERN = "__key*__:*"; diff --git a/tests/test_redis_ut.py b/tests/test_redis_ut.py index b2199d4fd..0006dd44e 100644 --- a/tests/test_redis_ut.py +++ b/tests/test_redis_ut.py @@ -183,6 +183,15 @@ def generator_SelectMemoryLeak(): assert not cases +def thread_coming_data(): + print("Start thread: thread_coming_data") + db = SonicV2Connector(use_unix_socket_path=True) + db.connect("TEST_DB") + time.sleep(DBInterface.PUB_SUB_NOTIFICATION_TIMEOUT * 2) + db.set("TEST_DB", "key0_coming", "field1", "value2") + print("Leave thread: thread_coming_data") + + def test_DBInterface(): dbintf = DBInterface() dbintf.set_redis_kwargs("", "127.0.0.1", 6379) @@ -276,13 +285,22 @@ def test_DBInterface(): with pytest.raises(TypeError): fvs.update(fvs, fvs) - # Test blocking + # Test blocking reading existing data in Redis fvs = db.get_all("TEST_DB", "key0", blocking=True) assert "field1" in fvs assert fvs["field1"] == "value2" assert fvs.get("field1", "default") == "value2" assert fvs.get("nonfield", "default") == "default" + # Test blocking reading coming data in Redis + thread = Thread(target=thread_coming_data) + thread.start() + fvs = db.get_all("TEST_DB", "key0_coming", blocking=True) + assert "field1" in fvs + assert fvs["field1"] == "value2" + assert fvs.get("field1", "default") == "value2" + assert fvs.get("nonfield", "default") == "default" + # Test hmset fvs = {"field1": "value3", "field2": "value4"} db.hmset("TEST_DB", "key5", fvs)