From 7911af4dac40319f9acfc648cfe24ff6a1bd8f1b Mon Sep 17 00:00:00 2001 From: dsorokin Date: Thu, 26 Oct 2017 09:57:41 +0200 Subject: [PATCH] ARUHA-1255 Fixes after review --- .../subscription/zk/ZkSubscriptionImpl.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionImpl.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionImpl.java index 58727ced4a..529919eadb 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionImpl.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionImpl.java @@ -8,14 +8,13 @@ import org.zalando.nakadi.exceptions.NakadiRuntimeException; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; public abstract class ZkSubscriptionImpl implements ZkSubscription, Watcher { protected final CuratorFramework curatorFramework; protected final String key; - private final AtomicReference listener; - private final AtomicReference> data = new AtomicReference<>(); + private volatile Runnable listener; + private volatile ExceptionOrData data; private final Function converter; private static class ExceptionOrData { @@ -45,29 +44,30 @@ public ZkSubscriptionImpl( final Runnable listener, final Function converter, final String key) { - this.listener = new AtomicReference<>(listener); + this.listener = listener; this.curatorFramework = curatorFramework; this.key = key; this.converter = converter; + this.data = null; } @Override public ReturnType getData() throws NakadiRuntimeException { - if (data.get() == null) { // If there is new value pending + if (data == null) { // If there is new value pending try { // create listener only in case if subscription is still active. - final ZkType zkData = query(null != listener.get()); - data.set(new ExceptionOrData<>(converter.apply(zkData))); + final ZkType zkData = query(null != listener); + data = new ExceptionOrData<>(converter.apply(zkData)); } catch (NakadiRuntimeException ex) { - data.set(new ExceptionOrData<>(ex)); + data = new ExceptionOrData<>(ex); } } - return data.get().get(); + return data.get(); } @Override public void close() { - listener.set(null); + listener = null; } protected abstract ZkType query(boolean createListener) throws NakadiRuntimeException; @@ -77,8 +77,8 @@ public void process(final WatchedEvent event) { // on this call one actually notifies that data has changed and waits for refresh call. // The reason for that is that sometimes it is not possible to query data from zk while being called from // notification callback. - data.set(null); - final Runnable toNotify = listener.get(); + data = null; + final Runnable toNotify = listener; // In case if subscription is still active - notify if (null != toNotify) { toNotify.run();