Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
ARUHA-1255 Fixes after review
Browse files Browse the repository at this point in the history
  • Loading branch information
antban committed Oct 26, 2017
1 parent 588acb3 commit 7911af4
Showing 1 changed file with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@
import org.zalando.nakadi.exceptions.NakadiRuntimeException;

import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

public abstract class ZkSubscriptionImpl<ReturnType, ZkType> implements ZkSubscription<ReturnType>, Watcher {
protected final CuratorFramework curatorFramework;
protected final String key;
private final AtomicReference<Runnable> listener;
private final AtomicReference<ExceptionOrData<ReturnType>> data = new AtomicReference<>();
private volatile Runnable listener;
private volatile ExceptionOrData<ReturnType> data;
private final Function<ZkType, ReturnType> converter;

private static class ExceptionOrData<T> {
Expand Down Expand Up @@ -45,29 +44,30 @@ public ZkSubscriptionImpl(
final Runnable listener,
final Function<ZkType, ReturnType> converter,
final String key) {
this.listener = new AtomicReference<>(listener);
this.listener = listener;
this.curatorFramework = curatorFramework;
this.key = key;
this.converter = converter;
this.data = null;
}

@Override
public ReturnType getData() throws NakadiRuntimeException {
if (data.get() == null) { // If there is new value pending
if (data == null) { // If there is new value pending
try {
// create listener only in case if subscription is still active.
final ZkType zkData = query(null != listener.get());
data.set(new ExceptionOrData<>(converter.apply(zkData)));
final ZkType zkData = query(null != listener);
data = new ExceptionOrData<>(converter.apply(zkData));
} catch (NakadiRuntimeException ex) {
data.set(new ExceptionOrData<>(ex));
data = new ExceptionOrData<>(ex);
}
}
return data.get().get();
return data.get();
}

@Override
public void close() {
listener.set(null);
listener = null;
}

protected abstract ZkType query(boolean createListener) throws NakadiRuntimeException;
Expand All @@ -77,8 +77,8 @@ public void process(final WatchedEvent event) {
// on this call one actually notifies that data has changed and waits for refresh call.
// The reason for that is that sometimes it is not possible to query data from zk while being called from
// notification callback.
data.set(null);
final Runnable toNotify = listener.get();
data = null;
final Runnable toNotify = listener;
// In case if subscription is still active - notify
if (null != toNotify) {
toNotify.run();
Expand Down

0 comments on commit 7911af4

Please sign in to comment.