Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Upgrade curator, use NodeCache recipe
Browse files Browse the repository at this point in the history
  • Loading branch information
lucky committed Nov 29, 2016
1 parent 4ac3ec2 commit f6da383
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 50 deletions.
11 changes: 10 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@
</developers>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
Expand Down Expand Up @@ -82,7 +91,7 @@
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.6.0</version>
<version>2.11.1</version>
</dependency>

<dependency>
Expand Down
69 changes: 20 additions & 49 deletions src/main/java/com/librato/rollout/zk/RolloutZKClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,20 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.librato.rollout.RolloutClient;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -35,39 +29,13 @@ public class RolloutZKClient implements RolloutClient {
private final AtomicReference<Map<String, Entry>> features = new AtomicReference<Map<String, Entry>>();
private final AtomicBoolean isStarted = new AtomicBoolean(false);
private final CuratorFramework framework;
private final String rolloutPath;
private final CuratorListener listener;
private final NodeCache nodeCache;

public RolloutZKClient(final CuratorFramework framework, final String rolloutPath) {
Preconditions.checkNotNull(framework, "CuratorFramework cannot be null");
Preconditions.checkArgument(rolloutPath != null && !rolloutPath.isEmpty(), "rolloutPath cannot be null or blank");
this.framework = framework;
this.rolloutPath = rolloutPath;
this.listener = new CuratorListener() {
@SuppressWarnings("ThrowFromFinallyBlock")
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
if (framework.getState() != CuratorFrameworkState.STARTED ||
!isStarted.get() ||
event.getType() != CuratorEventType.WATCHED ||
!rolloutPath.equals(event.getPath())) {
return;
}
try {
getAndSet();
} catch (Exception ex) {
log.error("Error on event update", ex);
} finally {
// Set the watch just in case it was simply bad data
try {
setWatch();
} catch (Exception e) {
log.error("Could not set watch", e);
throw Throwables.propagate(e);
}
}
}
};
this.nodeCache = new NodeCache(framework, rolloutPath);
}

@Override
Expand Down Expand Up @@ -113,27 +81,30 @@ public void start() throws Exception {
if (framework.getState() != CuratorFrameworkState.STARTED) {
throw new RuntimeException("CuratorFramework is not started!");
}
// We initialize the value here to mitigate the race condition of watching the path and attempting to get a value
// from the map
getAndSet();
framework.getCuratorListenable().addListener(listener);
setWatch();
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
build();
}
});
nodeCache.start(true);
build();
}

@Override
public void stop() {
if (!isStarted.compareAndSet(true, false)) {
throw new RuntimeException("Service already stopped or never started!");
}
framework.getCuratorListenable().removeListener(listener);
}

private void setWatch() throws Exception {
framework.getData().watched().inBackground().forPath(rolloutPath);
try {
nodeCache.close();
} catch (IOException ex) {
log.warn("Error when closing NodeCache", ex);
}
}

private void getAndSet() throws Exception {
features.set(ImmutableMap.copyOf(parseData(framework.getData().forPath(rolloutPath))));
private void build() throws IOException {
features.set(ImmutableMap.copyOf(parseData(nodeCache.getCurrentData().getData())));
}

private Map<String, Entry> parseData(byte[] data) throws IOException {
Expand Down

0 comments on commit f6da383

Please sign in to comment.