Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-use Pub/Sub codes #3389

Merged
merged 3 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 4 additions & 157 deletions src/main/java/redis/clients/jedis/BinaryJedisPubSub.java
Original file line number Diff line number Diff line change
@@ -1,162 +1,9 @@
package redis.clients.jedis;

import static redis.clients.jedis.Protocol.ResponseKeyword.*;
public abstract class BinaryJedisPubSub extends JedisPubSubBase<byte[]> {

import java.util.Arrays;
import java.util.List;

import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.util.SafeEncoder;

public abstract class BinaryJedisPubSub {
private int subscribedChannels = 0;
private Connection client;

public void onMessage(byte[] channel, byte[] message) {
}

public void onPMessage(byte[] pattern, byte[] channel, byte[] message) {
}

public void onSubscribe(byte[] channel, int subscribedChannels) {
}

public void onUnsubscribe(byte[] channel, int subscribedChannels) {
}

public void onPUnsubscribe(byte[] pattern, int subscribedChannels) {
}

public void onPSubscribe(byte[] pattern, int subscribedChannels) {
}

public void onPong(byte[] pattern) {
}

public void unsubscribe() {
client.sendCommand(Command.UNSUBSCRIBE);
client.flush();
}

public void unsubscribe(byte[]... channels) {
client.sendCommand(Command.UNSUBSCRIBE, channels);
client.flush();
}

public void subscribe(byte[]... channels) {
client.sendCommand(Command.SUBSCRIBE, channels);
client.flush();
}

public void psubscribe(byte[]... patterns) {
client.sendCommand(Command.PSUBSCRIBE, patterns);
client.flush();
}

public void punsubscribe() {
client.sendCommand(Command.PUNSUBSCRIBE);
client.flush();
}

public void punsubscribe(byte[]... patterns) {
client.sendCommand(Command.PUNSUBSCRIBE, patterns);
client.flush();
}

public void ping() {
client.sendCommand(Command.PING);
client.flush();
}

public void ping(byte[] argument) {
client.sendCommand(Command.PING, argument);
client.flush();
}

public boolean isSubscribed() {
return subscribedChannels > 0;
}

public void proceedWithPatterns(Connection client, byte[]... patterns) {
this.client = client;
this.client.setTimeoutInfinite();
try {
psubscribe(patterns);
process();
} finally {
this.client.rollbackTimeout();
}
}

public void proceed(Connection client, byte[]... channels) {
this.client = client;
this.client.setTimeoutInfinite();
try {
subscribe(channels);
process();
} finally {
this.client.rollbackTimeout();
}
}

private void process() {
do {
Object reply = client.getUnflushedObject();

if (reply instanceof List) {
List<Object> listReply = (List<Object>) reply;
final Object firstObj = listReply.get(0);
if (!(firstObj instanceof byte[])) {
throw new JedisException("Unknown message type: " + firstObj);
}
final byte[] resp = (byte[]) firstObj;
if (Arrays.equals(SUBSCRIBE.getRaw(), resp)) {
subscribedChannels = ((Long) listReply.get(2)).intValue();
final byte[] bchannel = (byte[]) listReply.get(1);
onSubscribe(bchannel, subscribedChannels);
} else if (Arrays.equals(UNSUBSCRIBE.getRaw(), resp)) {
subscribedChannels = ((Long) listReply.get(2)).intValue();
final byte[] bchannel = (byte[]) listReply.get(1);
onUnsubscribe(bchannel, subscribedChannels);
} else if (Arrays.equals(MESSAGE.getRaw(), resp)) {
final byte[] bchannel = (byte[]) listReply.get(1);
final byte[] bmesg = (byte[]) listReply.get(2);
onMessage(bchannel, bmesg);
} else if (Arrays.equals(PMESSAGE.getRaw(), resp)) {
final byte[] bpattern = (byte[]) listReply.get(1);
final byte[] bchannel = (byte[]) listReply.get(2);
final byte[] bmesg = (byte[]) listReply.get(3);
onPMessage(bpattern, bchannel, bmesg);
} else if (Arrays.equals(PSUBSCRIBE.getRaw(), resp)) {
subscribedChannels = ((Long) listReply.get(2)).intValue();
final byte[] bpattern = (byte[]) listReply.get(1);
onPSubscribe(bpattern, subscribedChannels);
} else if (Arrays.equals(PUNSUBSCRIBE.getRaw(), resp)) {
subscribedChannels = ((Long) listReply.get(2)).intValue();
final byte[] bpattern = (byte[]) listReply.get(1);
onPUnsubscribe(bpattern, subscribedChannels);
} else if (Arrays.equals(PONG.getRaw(), resp)) {
final byte[] bpattern = (byte[]) listReply.get(1);
onPong(bpattern);
} else {
throw new JedisException("Unknown message type: " + firstObj);
}
} else if (reply instanceof byte[]) {
byte[] resp = (byte[]) reply;
String str = SafeEncoder.encode(resp);
if ("PONG".equalsIgnoreCase(str)) {
onPong(null);
} else {
onPong(resp);
}
} else {
throw new JedisException("Unknown message type: " + reply);
}
} while (isSubscribed());
}

public int getSubscribedChannels() {
return subscribedChannels;
@Override
protected final byte[] encode(byte[] raw) {
return raw;
}
}
201 changes: 4 additions & 197 deletions src/main/java/redis/clients/jedis/JedisPubSub.java
Original file line number Diff line number Diff line change
@@ -1,204 +1,11 @@
package redis.clients.jedis;

import static redis.clients.jedis.Protocol.ResponseKeyword.*;

import java.util.Arrays;
import java.util.List;

import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.util.SafeEncoder;

public abstract class JedisPubSub {

private static final String JEDIS_SUBSCRIPTION_MESSAGE = "JedisPubSub is not subscribed to a Jedis instance.";
private int subscribedChannels = 0;
private volatile Connection client;

public void onMessage(String channel, String message) {
}

public void onPMessage(String pattern, String channel, String message) {
}

public void onSubscribe(String channel, int subscribedChannels) {
}

public void onUnsubscribe(String channel, int subscribedChannels) {
}

public void onPUnsubscribe(String pattern, int subscribedChannels) {
}

public void onPSubscribe(String pattern, int subscribedChannels) {
}

public void onPong(String pattern) {
}

public void unsubscribe() {
if (client == null) {
throw new JedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE);
}
client.sendCommand(Command.UNSUBSCRIBE);
client.flush();
}

public void unsubscribe(String... channels) {
if (client == null) {
throw new JedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE);
}
client.sendCommand(Command.UNSUBSCRIBE, channels);
client.flush();
}

public void subscribe(String... channels) {
if (client == null) {
throw new JedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE);
}
client.sendCommand(Command.SUBSCRIBE, channels);
client.flush();
}

public void psubscribe(String... patterns) {
if (client == null) {
throw new JedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE);
}
client.sendCommand(Command.PSUBSCRIBE, patterns);
client.flush();
}

public void punsubscribe() {
if (client == null) {
throw new JedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE);
}
client.sendCommand(Command.PUNSUBSCRIBE);
client.flush();
}

public void punsubscribe(String... patterns) {
if (client == null) {
throw new JedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE);
}
client.sendCommand(Command.PUNSUBSCRIBE, patterns);
client.flush();
}

public void ping() {
if (client == null) {
throw new JedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE);
}
client.sendCommand(Command.PING);
client.flush();
}

public void ping(String argument) {
if (client == null) {
throw new JedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE);
}
client.sendCommand(Command.PING, argument);
client.flush();
}

public boolean isSubscribed() {
return subscribedChannels > 0;
}

public void proceedWithPatterns(Connection client, String... patterns) {
this.client = client;
this.client.setTimeoutInfinite();
try {
psubscribe(patterns);
process();
} finally {
this.client.rollbackTimeout();
}
}

public void proceed(Connection client, String... channels) {
this.client = client;
this.client.setTimeoutInfinite();
try {
subscribe(channels);
process();
} finally {
this.client.rollbackTimeout();
}
}

// private void process(Client client) {
private void process() {

do {
Object reply = client.getUnflushedObject();

if (reply instanceof List) {
List<Object> listReply = (List<Object>) reply;
final Object firstObj = listReply.get(0);
if (!(firstObj instanceof byte[])) {
throw new JedisException("Unknown message type: " + firstObj);
}
final byte[] resp = (byte[]) firstObj;
if (Arrays.equals(SUBSCRIBE.getRaw(), resp)) {
subscribedChannels = ((Long) listReply.get(2)).intValue();
final byte[] bchannel = (byte[]) listReply.get(1);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
onSubscribe(strchannel, subscribedChannels);
} else if (Arrays.equals(UNSUBSCRIBE.getRaw(), resp)) {
subscribedChannels = ((Long) listReply.get(2)).intValue();
final byte[] bchannel = (byte[]) listReply.get(1);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
onUnsubscribe(strchannel, subscribedChannels);
} else if (Arrays.equals(MESSAGE.getRaw(), resp)) {
final byte[] bchannel = (byte[]) listReply.get(1);
final byte[] bmesg = (byte[]) listReply.get(2);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
onMessage(strchannel, strmesg);
} else if (Arrays.equals(PMESSAGE.getRaw(), resp)) {
final byte[] bpattern = (byte[]) listReply.get(1);
final byte[] bchannel = (byte[]) listReply.get(2);
final byte[] bmesg = (byte[]) listReply.get(3);
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
onPMessage(strpattern, strchannel, strmesg);
} else if (Arrays.equals(PSUBSCRIBE.getRaw(), resp)) {
subscribedChannels = ((Long) listReply.get(2)).intValue();
final byte[] bpattern = (byte[]) listReply.get(1);
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
onPSubscribe(strpattern, subscribedChannels);
} else if (Arrays.equals(PUNSUBSCRIBE.getRaw(), resp)) {
subscribedChannels = ((Long) listReply.get(2)).intValue();
final byte[] bpattern = (byte[]) listReply.get(1);
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
onPUnsubscribe(strpattern, subscribedChannels);
} else if (Arrays.equals(PONG.getRaw(), resp)) {
final byte[] bpattern = (byte[]) listReply.get(1);
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
onPong(strpattern);
} else {
throw new JedisException("Unknown message type: " + firstObj);
}
} else if (reply instanceof byte[]) {
byte[] resp = (byte[]) reply;
String str = SafeEncoder.encode(resp);
if ("PONG".equalsIgnoreCase(str)) {
onPong(null);
} else {
onPong(str);
}
} else {
throw new JedisException("Unknown message type: " + reply);
}
} while (isSubscribed());

// /* Invalidate instance since this thread is no longer listening */
// this.client = null;
}
public abstract class JedisPubSub extends JedisPubSubBase<String> {

public int getSubscribedChannels() {
return subscribedChannels;
@Override
protected final String encode(byte[] raw) {
return SafeEncoder.encode(raw);
}
}
Loading