diff --git a/src/server.cc b/src/server.cc index 5eb229cc473..06cdb9fd48d 100644 --- a/src/server.cc +++ b/src/server.cc @@ -249,11 +249,7 @@ void Server::FeedMonitorConns(Redis::Connection *conn, const std::vector to_publish_conn_ctxs; @@ -263,16 +259,41 @@ int Server::PublishMessage(const std::string &channel, const std::string &msg) { to_publish_conn_ctxs.emplace_back(*conn_ctx); } } + + // The patterns variable records the pattern of connections + std::vector patterns; + std::vector to_publish_patterns_conn_ctxs; for (const auto &iter : pubsub_patterns_) { if (Util::StringMatch(iter.first, channel, 0)) { for (const auto &conn_ctx : iter.second) { - to_publish_conn_ctxs.emplace_back(*conn_ctx); + to_publish_patterns_conn_ctxs.emplace_back(*conn_ctx); + patterns.emplace_back(iter.first); } } } pubsub_channels_mu_.unlock(); + + std::string channel_reply; + channel_reply.append(Redis::MultiLen(3)); + channel_reply.append(Redis::BulkString("message")); + channel_reply.append(Redis::BulkString(channel)); + channel_reply.append(Redis::BulkString(msg)); for (const auto &conn_ctx : to_publish_conn_ctxs) { - auto s = conn_ctx.owner->Reply(conn_ctx.fd, reply); + auto s = conn_ctx.owner->Reply(conn_ctx.fd, channel_reply); + if (s.IsOK()) { + cnt++; + } + } + + // We should publish corresponding pattern and message for connections + for (const auto &conn_ctx : to_publish_patterns_conn_ctxs) { + std::string pattern_reply; + pattern_reply.append(Redis::MultiLen(4)); + pattern_reply.append(Redis::BulkString("pmessage")); + pattern_reply.append(Redis::BulkString(patterns[index++])); + pattern_reply.append(Redis::BulkString(channel)); + pattern_reply.append(Redis::BulkString(msg)); + auto s = conn_ctx.owner->Reply(conn_ctx.fd, pattern_reply); if (s.IsOK()) { cnt++; } diff --git a/tests/functional/pub_sub_test.py b/tests/functional/pub_sub_test.py index ee6bf7427c0..54088acaaf6 100644 --- a/tests/functional/pub_sub_test.py +++ b/tests/functional/pub_sub_test.py @@ -23,7 +23,7 @@ def psubscribe(pattern, master=True): p.psubscribe(pattern) for item in p.listen(): - if item['type'] == "message": + if item['type'] == "pmessage": assert (item['data'] == "a") p.punsubscribe() break