Skip to content

Commit

Permalink
Fix subscribe and unsubscribe command and pattern mode (#174)
Browse files Browse the repository at this point in the history
* Fix subscribe and psubscribe command reply, the return number is the sum of subscription and psubscription
* Fix unpsubscribe command operates on subscription data instead of psubscription
* The unsubscribe and punsubscribe commands can operates on more than one channel and pattern
* The unsubscribe and punsubscribe commands should have reply
* Fix punsubscribe/unsubscribe command reply
  • Loading branch information
ShooterIT authored Feb 5, 2021
1 parent dd07178 commit e657d58
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 27 deletions.
49 changes: 31 additions & 18 deletions src/redis_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3277,57 +3277,70 @@ class CommandPublish : public Commander {
}
};

void SubscribeCommmandReply(std::string *output, std::string name, std::string sub_name, int num) {
output->append(Redis::MultiLen(3));
output->append(Redis::BulkString(name));
output->append(sub_name.empty() ? Redis::NilString() : Redis::BulkString(sub_name));
output->append(Redis::Integer(num));
}

class CommandSubscribe : public Commander {
public:
CommandSubscribe() : Commander("subcribe", -2, false) {}
CommandSubscribe() : Commander("subscribe", -2, false) {}
Status Execute(Server *svr, Connection *conn, std::string *output) override {
for (unsigned i = 1; i < args_.size(); i++) {
conn->SubscribeChannel(args_[i]);
output->append(Redis::MultiLen(3));
output->append(Redis::BulkString("subscribe"));
output->append(Redis::BulkString(args_[i]));
output->append(Redis::Integer(conn->SubscriptionsCount()));
SubscribeCommmandReply(output, "subscribe", args_[i],
conn->SubscriptionsCount() + conn->PSubscriptionsCount());
}
return Status::OK();
}
};

class CommandUnSubscribe : public Commander {
public:
CommandUnSubscribe() : Commander("unsubcribe", -1, false) {}
CommandUnSubscribe() : Commander("unsubscribe", -1, false) {}
Status Execute(Server *svr, Connection *conn, std::string *output) override {
if (args_.size() > 1) {
conn->UnSubscribeChannel(args_[1]);
if (args_.size() == 1) {
conn->UnSubscribeAll(std::bind(SubscribeCommmandReply, output, "unsubscribe",
std::placeholders::_1, std::placeholders::_2));
} else {
conn->UnSubscribeAll();
for (unsigned i = 1; i < args_.size(); i++) {
conn->UnSubscribeChannel(args_[i]);
SubscribeCommmandReply(output, "unsubscribe", args_[i],
conn->SubscriptionsCount() + conn->PSubscriptionsCount());
}
}
return Status::OK();
}
};

class CommandPSubscribe : public Commander {
public:
CommandPSubscribe() : Commander("psubcribe", -2, false) {}
CommandPSubscribe() : Commander("psubscribe", -2, false) {}
Status Execute(Server *svr, Connection *conn, std::string *output) override {
for (unsigned i = 1; i < args_.size(); i++) {
conn->PSubscribeChannel(args_[i]);
output->append(Redis::MultiLen(3));
output->append(Redis::BulkString("psubscribe"));
output->append(Redis::BulkString(args_[i]));
output->append(Redis::Integer(conn->PSubscriptionsCount()));
SubscribeCommmandReply(output, "psubscribe", args_[i],
conn->SubscriptionsCount() + conn->PSubscriptionsCount());
}
return Status::OK();
}
};

class CommandPUnSubscribe : public Commander {
public:
CommandPUnSubscribe() : Commander("punsubcribe", -1, false) {}
CommandPUnSubscribe() : Commander("punsubscribe", -1, false) {}
Status Execute(Server *svr, Connection *conn, std::string *output) override {
if (args_.size() > 1) {
conn->PUnSubscribeChannel(args_[1]);
if (args_.size() == 1) {
conn->PUnSubscribeAll(std::bind(SubscribeCommmandReply, output, "punsubscribe",
std::placeholders::_1, std::placeholders::_2));
} else {
conn->PUnSubscribeAll();
for (unsigned i = 1; i < args_.size(); i++) {
conn->PUnSubscribeChannel(args_[i]);
SubscribeCommmandReply(output, "punsubscribe", args_[i],
conn->SubscriptionsCount() + conn->PSubscriptionsCount());
}
}
return Status::OK();
}
Expand Down
33 changes: 26 additions & 7 deletions src/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,19 @@ void Connection::UnSubscribeChannel(const std::string &channel) {
}
}

void Connection::UnSubscribeAll() {
if (subscribe_channels_.empty()) return;
void Connection::UnSubscribeAll(unsubscribe_callback reply) {
if (subscribe_channels_.empty()) {
if (reply != nullptr) reply("", subcribe_patterns_.size());
return;
}
int removed = 0;
for (const auto &chan : subscribe_channels_) {
owner_->svr_->UnSubscribeChannel(chan, this);
removed++;
if (reply != nullptr) {
reply(chan, static_cast<int>(subscribe_channels_.size() -
removed + subcribe_patterns_.size()));
}
}
subscribe_channels_.clear();
}
Expand All @@ -185,20 +194,30 @@ void Connection::PSubscribeChannel(const std::string &pattern) {
}

void Connection::PUnSubscribeChannel(const std::string &pattern) {
auto iter = subscribe_channels_.begin();
for (; iter != subscribe_channels_.end(); iter++) {
auto iter = subcribe_patterns_.begin();
for (; iter != subcribe_patterns_.end(); iter++) {
if (*iter == pattern) {
subscribe_channels_.erase(iter);
subcribe_patterns_.erase(iter);
owner_->svr_->PUnSubscribeChannel(pattern, this);
return;
}
}
}

void Connection::PUnSubscribeAll() {
if (subcribe_patterns_.empty()) return;
void Connection::PUnSubscribeAll(unsubscribe_callback reply) {
if (subcribe_patterns_.empty()) {
if (reply != nullptr) reply("", subscribe_channels_.size());
return;
}

int removed = 0;
for (const auto &pattern : subcribe_patterns_) {
owner_->svr_->PUnSubscribeChannel(pattern, this);
removed++;
if (reply != nullptr) {
reply(pattern, static_cast<int>(subcribe_patterns_.size() -
removed + subscribe_channels_.size()));
}
}
subcribe_patterns_.clear();
}
Expand Down
5 changes: 3 additions & 2 deletions src/redis_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ class Connection {
void SendFile(int fd);
std::string ToString();

typedef std::function<void(std::string, int)> unsubscribe_callback;
void SubscribeChannel(const std::string &channel);
void UnSubscribeChannel(const std::string &channel);
void UnSubscribeAll();
void UnSubscribeAll(unsubscribe_callback reply = nullptr);
int SubscriptionsCount();
void PSubscribeChannel(const std::string &pattern);
void PUnSubscribeChannel(const std::string &pattern);
void PUnSubscribeAll();
void PUnSubscribeAll(unsubscribe_callback reply = nullptr);
int PSubscriptionsCount();

uint64_t GetAge();
Expand Down

0 comments on commit e657d58

Please sign in to comment.