From 267cd0d068766faab8f877859e819fcb330e275e Mon Sep 17 00:00:00 2001 From: siloneco Date: Wed, 23 Mar 2022 21:24:36 +0900 Subject: [PATCH] :bug: Fixed a bug that occurred when changing leaders. --- pom.xml | 2 +- src/main/java/net/azisaba/kuvel/Kuvel.java | 4 ++ .../kuvel/redis/RedisConnectionLeader.java | 47 +++++++++++++++++++ .../net/azisaba/kuvel/redis/RedisKeys.java | 4 +- .../azisaba/kuvel/redis/RedisSubscriber.java | 12 +++-- 5 files changed, 64 insertions(+), 5 deletions(-) diff --git a/pom.xml b/pom.xml index 98e0bbf2..b97e9520 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ net.azisaba Kuvel - 1.0.0 + 1.0.1 jar ${project.artifactId} diff --git a/src/main/java/net/azisaba/kuvel/Kuvel.java b/src/main/java/net/azisaba/kuvel/Kuvel.java index 8af9c2fd..37361a55 100644 --- a/src/main/java/net/azisaba/kuvel/Kuvel.java +++ b/src/main/java/net/azisaba/kuvel/Kuvel.java @@ -72,8 +72,11 @@ public void onProxyInitialization(ProxyInitializeEvent event) { kuvelConfig.getProxyGroupName()); proxyIdProvider.runTask(proxy, this); + logger.info("This proxy's id is: " + proxyIdProvider.getId()); + redisConnectionLeader = new RedisConnectionLeader( + this, kuvelConfig.getRedisConnectionData().createJedisPool(), kuvelConfig.getProxyGroupName(), proxyIdProvider.getId()); @@ -82,6 +85,7 @@ public void onProxyInitialization(ProxyInitializeEvent event) { if (redisConnectionLeader.isLeader()) { logger.info("This proxy is selected as leader."); } + kuvelServiceHandler.setAndRunLoadBalancerDiscovery( new RedisLoadBalancerDiscovery( client, diff --git a/src/main/java/net/azisaba/kuvel/redis/RedisConnectionLeader.java b/src/main/java/net/azisaba/kuvel/redis/RedisConnectionLeader.java index fd1bffc2..00432483 100644 --- a/src/main/java/net/azisaba/kuvel/redis/RedisConnectionLeader.java +++ b/src/main/java/net/azisaba/kuvel/redis/RedisConnectionLeader.java @@ -2,12 +2,16 @@ import java.util.Objects; import lombok.RequiredArgsConstructor; +import net.azisaba.kuvel.Kuvel; +import net.azisaba.kuvel.discovery.impl.RedisLoadBalancerDiscovery; +import net.azisaba.kuvel.discovery.impl.RedisServerDiscovery; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; @RequiredArgsConstructor public class RedisConnectionLeader { + private final Kuvel plugin; private final JedisPool jedisPool; private final String groupName; private final String proxyId; @@ -31,6 +35,10 @@ public boolean trySwitch() { jedis.expire(key, 600); leader = true; leaderExpireAt = System.currentTimeMillis() + (600 * 1000); + + plugin.getLogger().info("This proxy was selected as a new leader."); + jedis.publish(RedisKeys.LEADER_CHANGED_NOTIFY_PREFIX.getKey() + groupName, proxyId); + runDiscoveryTask(); return true; } else { String currentLeader = jedis.get(RedisKeys.LEADER_PREFIX.getKey() + groupName); @@ -38,6 +46,11 @@ public boolean trySwitch() { leader = true; return true; } + + if (leader) { + stopDiscoveryTask(); + } + leader = false; return false; } } @@ -62,6 +75,7 @@ public void leaveLeader() { } jedis.del(RedisKeys.LEADER_PREFIX.getKey() + groupName); + jedis.publish(RedisKeys.LEADER_LEAVE_NOTIFY_PREFIX.getKey() + groupName, proxyId); } } @@ -92,4 +106,37 @@ public void publishDeletedServer(String podUid) { jedis.publish(RedisKeys.POD_DELETED_NOTIFY_PREFIX.getKey() + groupName, podUid); } } + + private void runDiscoveryTask() { + if (plugin.getKuvelConfig().getRedisConnectionData() == null) { + return; + } + + plugin + .getKuvelServiceHandler() + .setAndRunLoadBalancerDiscovery( + new RedisLoadBalancerDiscovery( + plugin.getClient(), + plugin, + plugin.getKuvelConfig().getRedisConnectionData().createJedisPool(), + plugin.getKuvelConfig().getProxyGroupName(), + this, + plugin.getKuvelServiceHandler())); + + plugin + .getKuvelServiceHandler() + .setAndRunServerDiscovery( + new RedisServerDiscovery( + plugin.getClient(), + plugin, + plugin.getKuvelConfig().getRedisConnectionData().createJedisPool(), + plugin.getKuvelConfig().getProxyGroupName(), + this, + plugin.getKuvelServiceHandler())); + } + + private void stopDiscoveryTask() { + plugin.getKuvelServiceHandler().setAndRunLoadBalancerDiscovery(null); + plugin.getKuvelServiceHandler().setAndRunServerDiscovery(null); + } } diff --git a/src/main/java/net/azisaba/kuvel/redis/RedisKeys.java b/src/main/java/net/azisaba/kuvel/redis/RedisKeys.java index 90290eb8..b1ea5c1b 100644 --- a/src/main/java/net/azisaba/kuvel/redis/RedisKeys.java +++ b/src/main/java/net/azisaba/kuvel/redis/RedisKeys.java @@ -14,7 +14,9 @@ public enum RedisKeys { POD_ADDED_NOTIFY_PREFIX("kuvel:notify:add:pod:"), LOAD_BALANCER_ADDED_NOTIFY_PREFIX("kuvel:notify:add:lb:"), POD_DELETED_NOTIFY_PREFIX("kuvel:notify:del:pod:"), - LOAD_BALANCER_DELETED_NOTIFY_PREFIX("kuvel:notify:del:lb:"); + LOAD_BALANCER_DELETED_NOTIFY_PREFIX("kuvel:notify:del:lb:"), + LEADER_LEAVE_NOTIFY_PREFIX("kuvel:notify:leader-leave:"), + LEADER_CHANGED_NOTIFY_PREFIX("kuvel:notify:leader-changed:"); private final String key; diff --git a/src/main/java/net/azisaba/kuvel/redis/RedisSubscriber.java b/src/main/java/net/azisaba/kuvel/redis/RedisSubscriber.java index 3cd8a391..ddbf2330 100644 --- a/src/main/java/net/azisaba/kuvel/redis/RedisSubscriber.java +++ b/src/main/java/net/azisaba/kuvel/redis/RedisSubscriber.java @@ -32,12 +32,18 @@ public void subscribe() { new JedisPubSub() { @Override public void onPMessage(String pattern, String channel, String message) { - if (redisConnectionLeader.isLeader()) { + String receivedGroupName = channel.split(":")[channel.split(":").length - 1]; + if (!receivedGroupName.equalsIgnoreCase(groupName)) { return; } - String receivedGroupName = channel.split(":")[channel.split(":").length - 1]; - if (!receivedGroupName.equalsIgnoreCase(groupName)) { + if (channel.startsWith(RedisKeys.LEADER_CHANGED_NOTIFY_PREFIX.getKey()) + || channel.startsWith(RedisKeys.LEADER_LEAVE_NOTIFY_PREFIX.getKey())) { + redisConnectionLeader.trySwitch(); + return; + } + + if (redisConnectionLeader.isLeader()) { return; }