From f6da38387aef4f53097bcb70ce9e6a9f3af0f847 Mon Sep 17 00:00:00 2001 From: Jared Harper Date: Tue, 29 Nov 2016 11:06:56 -0800 Subject: [PATCH] Upgrade curator, use NodeCache recipe --- pom.xml | 11 ++- .../librato/rollout/zk/RolloutZKClient.java | 69 ++++++------------- 2 files changed, 30 insertions(+), 50 deletions(-) diff --git a/pom.xml b/pom.xml index 7946e19..1e6eb75 100644 --- a/pom.xml +++ b/pom.xml @@ -33,6 +33,15 @@ + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + 1.6 + 1.6 + + org.apache.maven.plugins maven-source-plugin @@ -82,7 +91,7 @@ org.apache.curator curator-recipes - 2.6.0 + 2.11.1 diff --git a/src/main/java/com/librato/rollout/zk/RolloutZKClient.java b/src/main/java/com/librato/rollout/zk/RolloutZKClient.java index eb7ed74..8ca1046 100644 --- a/src/main/java/com/librato/rollout/zk/RolloutZKClient.java +++ b/src/main/java/com/librato/rollout/zk/RolloutZKClient.java @@ -2,26 +2,20 @@ import com.google.common.base.Preconditions; import com.google.common.base.Splitter; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.librato.rollout.RolloutClient; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.api.CuratorEvent; -import org.apache.curator.framework.api.CuratorEventType; -import org.apache.curator.framework.api.CuratorListener; import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -35,39 +29,13 @@ public class RolloutZKClient implements RolloutClient { private final AtomicReference> features = new AtomicReference>(); private final AtomicBoolean isStarted = new AtomicBoolean(false); private final CuratorFramework framework; - private final String rolloutPath; - private final CuratorListener listener; + private final NodeCache nodeCache; public RolloutZKClient(final CuratorFramework framework, final String rolloutPath) { Preconditions.checkNotNull(framework, "CuratorFramework cannot be null"); Preconditions.checkArgument(rolloutPath != null && !rolloutPath.isEmpty(), "rolloutPath cannot be null or blank"); this.framework = framework; - this.rolloutPath = rolloutPath; - this.listener = new CuratorListener() { - @SuppressWarnings("ThrowFromFinallyBlock") - @Override - public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception { - if (framework.getState() != CuratorFrameworkState.STARTED || - !isStarted.get() || - event.getType() != CuratorEventType.WATCHED || - !rolloutPath.equals(event.getPath())) { - return; - } - try { - getAndSet(); - } catch (Exception ex) { - log.error("Error on event update", ex); - } finally { - // Set the watch just in case it was simply bad data - try { - setWatch(); - } catch (Exception e) { - log.error("Could not set watch", e); - throw Throwables.propagate(e); - } - } - } - }; + this.nodeCache = new NodeCache(framework, rolloutPath); } @Override @@ -113,11 +81,14 @@ public void start() throws Exception { if (framework.getState() != CuratorFrameworkState.STARTED) { throw new RuntimeException("CuratorFramework is not started!"); } - // We initialize the value here to mitigate the race condition of watching the path and attempting to get a value - // from the map - getAndSet(); - framework.getCuratorListenable().addListener(listener); - setWatch(); + nodeCache.getListenable().addListener(new NodeCacheListener() { + @Override + public void nodeChanged() throws Exception { + build(); + } + }); + nodeCache.start(true); + build(); } @Override @@ -125,15 +96,15 @@ public void stop() { if (!isStarted.compareAndSet(true, false)) { throw new RuntimeException("Service already stopped or never started!"); } - framework.getCuratorListenable().removeListener(listener); - } - - private void setWatch() throws Exception { - framework.getData().watched().inBackground().forPath(rolloutPath); + try { + nodeCache.close(); + } catch (IOException ex) { + log.warn("Error when closing NodeCache", ex); + } } - private void getAndSet() throws Exception { - features.set(ImmutableMap.copyOf(parseData(framework.getData().forPath(rolloutPath)))); + private void build() throws IOException { + features.set(ImmutableMap.copyOf(parseData(nodeCache.getCurrentData().getData()))); } private Map parseData(byte[] data) throws IOException {