Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine publish notifications #271

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/consumerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ void ConsumerStateTable::pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const st
// if the set is empty, return an empty kco object
if (ctx0->type == REDIS_REPLY_NIL)
{
// Discard one message in channel, even if there is nothing to pop
// This may be the case that producer called clear()
discard(1);
return;
}

Expand Down Expand Up @@ -90,6 +93,8 @@ void ConsumerStateTable::pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const st
kfvOp(kco) = SET_COMMAND;
}
}
// Discard messages in channel, one per kco
discard(vkco.size());
}

}
7 changes: 6 additions & 1 deletion common/consumertable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ ConsumerTable::ConsumerTable(DBConnector *db, const string &tableName, int popBa

RedisReply r(dequeueReply());
long long int len = r.getReply<long long int>();
//Key, Value and OP are in one list, they are processed in one shot
// Key, Value and OP are in one list, they are processed in one shot
setQueueLength(len/3);
}

Expand All @@ -57,6 +57,9 @@ void ConsumerTable::pops(deque<KeyOpFieldsValuesTuple> &vkco, const string &pref
// if the set is empty, return an empty kco object
if (r.getContext()->type == REDIS_REPLY_NIL)
{
// Discard one message in channel, even if there is nothing to pop
// This may be the case that producer called clear()
discard(1);
return;
}

Expand Down Expand Up @@ -91,6 +94,8 @@ void ConsumerTable::pops(deque<KeyOpFieldsValuesTuple> &vkco, const string &pref
values.push_back(e);
}
}
// Discard messages in channel, one per kco
discard(vkco.size());
}

}
18 changes: 18 additions & 0 deletions common/consumertablebase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,22 @@ void ConsumerTableBase::pop(std::string &key, std::string &op, std::vector<Field
m_buffer.pop_front();
}

bool ConsumerTableBase::hasData()
{
return RedisSelect::hasData() || !m_buffer.empty();
}

// hasCachedData is depreciated, and this is a safe placeholder
// We will use hasData() to indicate that there is pending data
// ref: Select::selecat() implementation
bool ConsumerTableBase::hasCachedData()
{
return true;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The base class has the same implementation. It doesn't need to be implemented here.


// Override with an empty body, and subclasses will explicitly discard messages in the channel
void ConsumerTableBase::updateAfterRead()
{
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why do we need this?
The base class already has an empty body


}
8 changes: 8 additions & 0 deletions common/consumertablebase.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <deque>
#include "table.h"
#include "selectable.h"

Expand All @@ -21,6 +22,13 @@ class ConsumerTableBase: public TableConsumable, public RedisTransactioner
void pop(std::string &key, std::string &op, std::vector<FieldValueTuple> &fvs, const std::string &prefix = EMPTY_PREFIX);

bool empty() const { return m_buffer.empty(); };

bool hasData() override;

bool hasCachedData() override;

void updateAfterRead() override;

protected:

std::deque<KeyOpFieldsValuesTuple> m_buffer;
Expand Down
6 changes: 4 additions & 2 deletions common/producer_state_table_apply_view.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ KEYS:
]]
local arg_start = 2
for i = 1, ARGV[arg_start] do
redis.call('SADD', KEYS[2], ARGV[arg_start + i])
local added = redis.call('SADD', KEYS[2], ARGV[arg_start + i])
if added > 0 then
redis.call('PUBLISH', KEYS[1], ARGV[1])
end
end
arg_start = arg_start + ARGV[arg_start] + 1
for i = 1, ARGV[arg_start] do
Expand All @@ -36,4 +39,3 @@ for j = 4, #KEYS do
end
arg_start = arg_start + 2 * ARGV[arg_start] + 1
end
redis.call('PUBLISH', KEYS[1], ARGV[1])
2 changes: 1 addition & 1 deletion common/producerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta
"for i = 0, #KEYS - 3 do\n"
" redis.call('HSET', KEYS[3 + i], ARGV[3 + i * 2], ARGV[4 + i * 2])\n"
"end\n"
" if added > 0 then \n"
"if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
m_shaSet = m_pipe->loadRedisScript(luaSet);
Expand Down
35 changes: 34 additions & 1 deletion common/redisselect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@ void RedisSelect::readData()
freeReplyObject(reply);
m_queueLength++;

reply = nullptr;
readRemainingData();
}

/* Read remaining messages in Redis buffer nonblockingly */
void RedisSelect::readRemainingData()
{
redisReply *reply = nullptr;
int status;
do
{
Expand Down Expand Up @@ -66,6 +72,33 @@ void RedisSelect::updateAfterRead()
m_queueLength--;
}

/* Discard messages in the channel */
void RedisSelect::discard(long long int n)
{
readRemainingData();

/*
* We will discard at least one message, to prevent any mistakenly infinite loop
* in the select-pop(s) pattern
*/
if (n <= 0)
n = 1;
Copy link
Contributor

@jipanyang jipanyang Apr 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what scenario n <= 0?

#Resolved

Copy link
Contributor Author

@qiluo-msft qiluo-msft Apr 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to prevent any logic error outside the scope of RedisSelect. Considering the case if there are redundant notifications but there is no data popped, caller may call discard(0), and here the function consumes one message in the queue if there is any. discard(negative) is not for protection purpose and should be not in a true case.


In reply to: 276515380 [](ancestors = 276515380)


if (n > m_queueLength)
{
/* If we have less messages, discard them all
* This is no big harm since all the late messages will be selected and nothing popped
* when the channel is not completely busy.
*/
m_queueLength = 0;
}
else
{
/* Otherwise discard as requested by n */
m_queueLength -= n;
}
}

/* Create a new redisContext, SELECT DB and SUBSCRIBE */
void RedisSelect::subscribe(DBConnector* db, const std::string &channelName)
{
Expand Down
5 changes: 5 additions & 0 deletions common/redisselect.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ class RedisSelect : public Selectable
protected:
std::unique_ptr<DBConnector> m_subscribe;
long long int m_queueLength;

void discard(long long int n);
kcudnik marked this conversation as resolved.
Show resolved Hide resolved

private:
void readRemainingData();
};

}
6 changes: 1 addition & 5 deletions common/select.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,5 @@ int Select::select(Selectable **c, int timeout)

}

bool Select::isQueueEmpty()
{
return m_ready.empty();
}
};

};
2 changes: 1 addition & 1 deletion common/select.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class Select
};

int select(Selectable **c, int timeout = -1);
bool isQueueEmpty();

private:
struct cmp
{
Expand Down
2 changes: 1 addition & 1 deletion common/subscriberstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ bool SubscriberStateTable::hasData()

bool SubscriberStateTable::hasCachedData()
{
return m_buffer.size() > 1 || m_keyspace_event_buffer.size() > 1;
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's default implementation in the base class. We could remove whole method.

}

void SubscriberStateTable::pops(deque<KeyOpFieldsValuesTuple> &vkco, const string& /*prefix*/)
Expand Down
4 changes: 3 additions & 1 deletion pyext/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ SWIG_SOURCES = swsscommon.i
pkgpython_PYTHON = swsscommon.py __init__.py
pkgpyexec_LTLIBRARIES = _swsscommon.la

COMMONHEADERS:=$(wildcard ../common/*.h)

_swsscommon_la_SOURCES = swsscommon_wrap.cpp
_swsscommon_la_CPPFLAGS = -std=c++11 -I../common -I/usr/include/python$(PYTHON_VERSION)
_swsscommon_la_LDFLAGS = -module
_swsscommon_la_LIBADD = ../common/libswsscommon.la -lpython$(PYTHON_VERSION)

swsscommon_wrap.cpp: $(SWIG_SOURCES)
swsscommon_wrap.cpp: $(SWIG_SOURCES) $(COMMONHEADERS)
$(SWIG) -c++ -python -I../common -o $@ $<

CLEANFILES = swsscommon_wrap.cpp
60 changes: 59 additions & 1 deletion tests/redis_piped_state_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ TEST(ConsumerStateTable, async_singlethread)
EXPECT_EQ(p.count(), 0);
RedisReply r2(&db, queryCommand.c_str(), REDIS_REPLY_ARRAY);
EXPECT_EQ(r2.getContext()->elements, (size_t)0);

int numberOfNotification = 0;
while ((ret = cs.select(&selectcs, 1000)) == Select::OBJECT)
{
Expand Down Expand Up @@ -581,3 +581,61 @@ TEST(ConsumerStateTable, async_multitable)
cout << endl << "Done." << endl;
}

TEST(ConsumerStateTable, async_set_set_pops)
{
clearDB();

/* Prepare producer */
int index = 0;
string tableName = "UT_REDIS_THREAD_" + to_string(index);
DBConnector db(TEST_DB, "localhost", 6379, 0);
RedisPipeline pipeline(&db);
ProducerStateTable p(&pipeline, tableName, true);
string key1 = "TheKey1";
string key2 = "TheKey2";
int maxNumOfFields = 2;

/* First set operation */
{
vector<FieldValueTuple> fields;
for (int j = 0; j < maxNumOfFields; j++)
{
FieldValueTuple t(field(j), value(j));
fields.push_back(t);
}
p.set(key1, fields);
}

/* Second set operation */
{
vector<FieldValueTuple> fields;
for (int j = maxNumOfFields; j < maxNumOfFields * 2; j++)
{
FieldValueTuple t(field(j), value(j));
fields.push_back(t);
}
p.set(key2, fields);
}
p.flush();

/* Prepare consumer */
ConsumerStateTable c(&db, tableName);
Select cs;
Selectable *selectcs;
cs.addSelectable(&c);

/* First pops operation will be two kco */
{
int ret = cs.select(&selectcs);
EXPECT_EQ(ret, Select::OBJECT);
std::deque<KeyOpFieldsValuesTuple> vkco;
c.pops(vkco);
EXPECT_EQ(vkco.size(), 2L);
}

/* Second select operation will timeout since no kco or channel message left */
{
int ret = cs.select(&selectcs, 1000);
EXPECT_EQ(ret, Select::TIMEOUT);
}
}
90 changes: 0 additions & 90 deletions tests/redis_subscriber_state_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ using namespace swss;
static const string dbhost = "localhost";
static const int dbport = 6379;
static const string testTableName = "UT_REDIS_TABLE";
static const string testTableName2 = "UT_REDIS_TABLE2";

static inline int getMaxFields(int i)
{
Expand Down Expand Up @@ -431,92 +430,3 @@ TEST(SubscriberStateTable, one_producer_multiple_subscriber)
}
cout << endl << "Done." << endl;
}

TEST(SubscriberStateTable, cachedData)
{
clearDB();

/* Prepare init data */
int index = 0;
int maxNumOfFields = 2;

DBConnector db(TEST_DB, dbhost, dbport, 0);
Table p(&db, testTableName);
string key1 = "TheKey1";
/* Set operation */
{
vector<FieldValueTuple> fields;
for (int j = 0; j < maxNumOfFields; j++)
{
FieldValueTuple t(field(index, j), value(index, j));
fields.push_back(t);
}
p.set(key1, fields);
}

Table p2(&db, testTableName2);
string key2 = "TheKey2";
/* Set operation */
{
vector<FieldValueTuple> fields;
for (int j = 0; j < maxNumOfFields; j++)
{
FieldValueTuple t(field(index, j), value(index, j));
fields.push_back(t);
}
p2.set(key2, fields);
}

/* Prepare subscriber */
SubscriberStateTable c1(&db, testTableName);
SubscriberStateTable c2(&db, testTableName2);
Select cs;
Selectable *selectcs;
cs.addSelectable(&c1);
cs.addSelectable(&c2);

/* Pop operation and check CachedSelectable */
{
string key = key1;
int ret = cs.select(&selectcs);
EXPECT_EQ(ret, Select::OBJECT);
KeyOpFieldsValuesTuple kco;
if (selectcs == &c1)
{
c1.pop(kco);
}
else
{
c2.pop(kco);
key = key2;
}

EXPECT_EQ(kfvKey(kco), key);
EXPECT_EQ(kfvOp(kco), "SET");

/* There is one cached selectable left */
bool r = cs.isQueueEmpty();
EXPECT_FALSE(r);

ret = cs.select(&selectcs);
EXPECT_EQ(ret, Select::OBJECT);
if (key == key1)
{
EXPECT_TRUE(selectcs == &c2);
key = key2;
c2.pop(kco);
}
else
{
EXPECT_TRUE(selectcs == &c1);
key = key1;
c1.pop(kco);
}

EXPECT_EQ(kfvKey(kco), key);
EXPECT_EQ(kfvOp(kco), "SET");
/* No cached selectable left */
r = cs.isQueueEmpty();
EXPECT_TRUE(r);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this test was removed?