Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Sentinel mode with UnifiedJedis #3240

Merged
merged 2 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions src/main/java/redis/clients/jedis/JedisSentineled.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package redis.clients.jedis;

import java.util.Set;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.providers.SentineledConnectionProvider;

public class JedisSentineled extends UnifiedJedis {

/**
* This constructor is here for easier transition from {@link JedisSentinelPool#JedisSentinelPool(
* java.lang.String, java.util.Set, redis.clients.jedis.JedisClientConfig, redis.clients.jedis.JedisClientConfig)}.
*
* @deprecated Use {@link #JedisSentineled(java.lang.String, redis.clients.jedis.JedisClientConfig,
* java.util.Set, redis.clients.jedis.JedisClientConfig)}.
*/
@Deprecated
// Legacy
public JedisSentineled(String masterName, Set<HostAndPort> sentinels,
final JedisClientConfig masterClientConfig, final JedisClientConfig sentinelClientConfig) {
this(masterName, masterClientConfig, sentinels, sentinelClientConfig);
}

public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
this(new SentineledConnectionProvider(masterName, masterClientConfig, sentinels, sentinelClientConfig));
}

/**
* This constructor is here for easier transition from {@link JedisSentinelPool#JedisSentinelPool(
* java.lang.String, java.util.Set, org.apache.commons.pool2.impl.GenericObjectPoolConfig,
* redis.clients.jedis.JedisClientConfig, redis.clients.jedis.JedisClientConfig)}.
*
* @deprecated Use {@link #JedisSentineled(java.lang.String, redis.clients.jedis.JedisClientConfig,
* org.apache.commons.pool2.impl.GenericObjectPoolConfig, java.util.Set, redis.clients.jedis.JedisClientConfig)}.
*/
@Deprecated
// Legacy
public JedisSentineled(String masterName, Set<HostAndPort> sentinels,
final GenericObjectPoolConfig<Connection> poolConfig, final JedisClientConfig masterClientConfig,
final JedisClientConfig sentinelClientConfig) {
this(masterName, masterClientConfig, poolConfig, sentinels, sentinelClientConfig);
}

public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig,
final GenericObjectPoolConfig<Connection> poolConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
this(new SentineledConnectionProvider(masterName, masterClientConfig, poolConfig, sentinels, sentinelClientConfig));
}

public JedisSentineled(SentineledConnectionProvider sentineledConnectionProvider) {
super(sentineledConnectionProvider);
}

public HostAndPort getCurrentMaster() {
return ((SentineledConnectionProvider) provider).getCurrentMaster();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
package redis.clients.jedis.providers;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.CommandArguments;
import redis.clients.jedis.Connection;
import redis.clients.jedis.ConnectionPool;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisClientConfig;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.util.IOUtils;

public class SentineledConnectionProvider implements ConnectionProvider {

private static final Logger LOG = LoggerFactory.getLogger(SentineledConnectionProvider.class);

protected static final long DEFAULT_SUBSCRIBE_RETRY_WAIT_TIME_MILLIS = 5000;

private volatile HostAndPort currentMaster;

private volatile ConnectionPool pool;

private final String masterName;

private final JedisClientConfig masterClientConfig;

private final GenericObjectPoolConfig<Connection> masterPoolConfig;

protected final Collection<SentinelListener> sentinelListeners = new ArrayList<>();

private final JedisClientConfig sentinelClientConfig;

private final long subscribeRetryWaitTimeMillis;

private final Object initPoolLock = new Object();

public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
this(masterName, masterClientConfig, /*poolConfig*/ null, sentinels, sentinelClientConfig);
}

public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
final GenericObjectPoolConfig<Connection> poolConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
this(masterName, masterClientConfig, poolConfig, sentinels, sentinelClientConfig,
DEFAULT_SUBSCRIBE_RETRY_WAIT_TIME_MILLIS);
}

public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
final GenericObjectPoolConfig<Connection> poolConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig,
final long subscribeRetryWaitTimeMillis) {

this.masterName = masterName;
this.masterClientConfig = masterClientConfig;
this.masterPoolConfig = poolConfig;

this.sentinelClientConfig = sentinelClientConfig;
this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;

HostAndPort master = initSentinels(sentinels);
initMaster(master);
}

@Override
public Connection getConnection() {
return pool.getResource();
}

@Override
public Connection getConnection(CommandArguments args) {
return pool.getResource();
}

@Override
public void close() {
sentinelListeners.forEach(SentinelListener::shutdown);

pool.close();
}

public HostAndPort getCurrentMaster() {
return currentMaster;
}

private void initMaster(HostAndPort master) {
synchronized (initPoolLock) {
if (!master.equals(currentMaster)) {
currentMaster = master;

ConnectionPool newPool = masterPoolConfig != null
? new ConnectionPool(currentMaster, masterClientConfig, masterPoolConfig)
: new ConnectionPool(currentMaster, masterClientConfig);

ConnectionPool existingPool = pool;
pool = newPool;
LOG.info("Created connection pool to master at {}.", master);

if (existingPool != null) {
// although we clear the pool, we still have to check the returned object in getResource,
// this call only clears idle instances, not borrowed instances
// existingPool.clear(); // necessary??
existingPool.close();
}
}
}
}

private HostAndPort initSentinels(Set<HostAndPort> sentinels) {

HostAndPort master = null;
boolean sentinelAvailable = false;

LOG.debug("Trying to find master from available sentinels...");

for (HostAndPort sentinel : sentinels) {

LOG.debug("Connecting to Sentinel {}...", sentinel);

try (Jedis jedis = new Jedis(sentinel, sentinelClientConfig)) {

List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);

// connected to sentinel...
sentinelAvailable = true;

if (masterAddr == null || masterAddr.size() != 2) {
LOG.warn("Sentinel {} is not monitoring master {}.", sentinel, masterName);
continue;
}

master = toHostAndPort(masterAddr);
LOG.debug("Redis master reported at {}.", master);
break;
} catch (JedisException e) {
// resolves #1036, it should handle JedisException there's another chance
// of raising JedisDataException
LOG.warn("Could not get master address from {}.", sentinel, e);
}
}

if (master == null) {
if (sentinelAvailable) {
// can connect to sentinel, but master name seems to not monitored
throw new JedisException(
"Can connect to sentinel, but " + masterName + " seems to be not monitored.");
} else {
throw new JedisConnectionException(
"All sentinels down, cannot determine where " + masterName + " is running.");
}
}

LOG.info("Redis master running at {}. Starting sentinel listeners...", master);

for (HostAndPort sentinel : sentinels) {

SentinelListener listener = new SentinelListener(sentinel);
// whether SentinelListener threads are alive or not, process can be stopped
listener.setDaemon(true);
sentinelListeners.add(listener);
listener.start();
}

return master;
}

/**
* Must be of size 2.
*/
private static HostAndPort toHostAndPort(List<String> masterAddr) {
return toHostAndPort(masterAddr.get(0), masterAddr.get(1));
}

private static HostAndPort toHostAndPort(String hostStr, String portStr) {
return new HostAndPort(hostStr, Integer.parseInt(portStr));
}

protected class SentinelListener extends Thread {

protected final HostAndPort node;
protected volatile Jedis sentinelJedis;
protected AtomicBoolean running = new AtomicBoolean(false);

public SentinelListener(HostAndPort node) {
super(String.format("%s-SentinelListener-[%s]", masterName, node.toString()));
this.node = node;
}

@Override
public void run() {

running.set(true);

while (running.get()) {

try {
// double check that it is not being shutdown
if (!running.get()) {
break;
}

sentinelJedis = new Jedis(node, sentinelClientConfig);

// code for active refresh
List<String> masterAddr = sentinelJedis.sentinelGetMasterAddrByName(masterName);
if (masterAddr == null || masterAddr.size() != 2) {
LOG.warn("Can not get master {} address. Sentinel: {}.", masterName, node);
} else {
initMaster(toHostAndPort(masterAddr));
}

sentinelJedis.subscribe(new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
LOG.debug("Sentinel {} published: {}.", node, message);

String[] switchMasterMsg = message.split(" ");

if (switchMasterMsg.length > 3) {

if (masterName.equals(switchMasterMsg[0])) {
initMaster(toHostAndPort(switchMasterMsg[3], switchMasterMsg[4]));
} else {
LOG.debug(
"Ignoring message on +switch-master for master {}. Our master is {}.",
switchMasterMsg[0], masterName);
}

} else {
LOG.error("Invalid message received on sentinel {} on channel +switch-master: {}.",
node, message);
}
}
}, "+switch-master");

} catch (JedisException e) {

if (running.get()) {
LOG.error("Lost connection to sentinel {}. Sleeping {}ms and retrying.", node,
subscribeRetryWaitTimeMillis, e);
try {
Thread.sleep(subscribeRetryWaitTimeMillis);
} catch (InterruptedException se) {
LOG.error("Sleep interrupted.", se);
}
} else {
LOG.debug("Unsubscribing from sentinel {}.", node);
}
} finally {
IOUtils.closeQuietly(sentinelJedis);
}
}
}

// must not throw exception
public void shutdown() {
try {
LOG.debug("Shutting down listener on {}.", node);
running.set(false);
// This isn't good, the Jedis object is not thread safe
if (sentinelJedis != null) {
sentinelJedis.close();
}
} catch (RuntimeException e) {
LOG.error("Error while shutting down.", e);
}
}
}
}
Loading