From 78f1d055397519e5537bc0f8d97d10351fc12d40 Mon Sep 17 00:00:00 2001 From: dsorokin Date: Tue, 24 Oct 2017 12:05:10 +0200 Subject: [PATCH 1/8] ARUHA-1255 Use different approach for zk listeners implementation --- .../subscription/StreamingContext.java | 36 +++-- .../subscription/state/StartingState.java | 7 +- .../zk/AbstractZkSubscriptionClient.java | 12 +- .../service/subscription/zk/ZkSubscr.java | 11 ++ .../service/subscription/zk/ZkSubscrImpl.java | 128 ++++++++++++++++++ .../subscription/zk/ZkSubscriptionClient.java | 2 +- 6 files changed, 172 insertions(+), 24 deletions(-) create mode 100644 src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscr.java create mode 100644 src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscrImpl.java diff --git a/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java b/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java index 8aaf752a71..1ea4935364 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java @@ -21,12 +21,13 @@ import org.zalando.nakadi.service.subscription.state.DummyState; import org.zalando.nakadi.service.subscription.state.StartingState; import org.zalando.nakadi.service.subscription.state.State; -import org.zalando.nakadi.service.subscription.zk.ZKSubscription; +import org.zalando.nakadi.service.subscription.zk.ZkSubscr; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; import org.zalando.nakadi.service.timeline.TimelineService; import java.io.Closeable; import java.io.IOException; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; @@ -60,7 +61,7 @@ public class StreamingContext implements SubscriptionStreamer { private final EventTypeChangeListener eventTypeChangeListener; private State currentState = new DummyState(); - private ZKSubscription clientListChanges; + private ZkSubscr> sessionListSubscription; private Closeable authorizationCheckSubscription; private final Logger log; @@ -184,20 +185,20 @@ public void switchState(final State newState) { }); } - public void registerSession() { + public void registerSession() throws Exception { log.info("Registering session {}", session); // Install rebalance hook on client list change. - clientListChanges = zkClient.subscribeForSessionListChanges(() -> addTask(this::rebalance)); + sessionListSubscription = zkClient.subscribeForSessionListChanges(() -> addTask(this::rebalance)); zkClient.registerSession(session); } public void unregisterSession() { log.info("Unregistering session {}", session); - if (null != clientListChanges) { + if (null != sessionListSubscription) { try { - clientListChanges.cancel(); + sessionListSubscription.close(); } finally { - this.clientListChanges = null; + this.sessionListSubscription = null; zkClient.unregisterSession(session); } } @@ -234,14 +235,19 @@ public ObjectMapper getObjectMapper() { } private void rebalance() { - if (null != clientListChanges) { - clientListChanges.refresh(); - zkClient.runLocked(() -> { - final Partition[] changeset = rebalancer.apply(zkClient.listSessions(), zkClient.listPartitions()); - if (changeset.length > 0) { - zkClient.updatePartitionsConfiguration(changeset); - } - }); + if (null != sessionListSubscription) { + try { + // This call is needed to renew subscription for session list changes. + sessionListSubscription.getData(); + zkClient.runLocked(() -> { + final Partition[] changeset = rebalancer.apply(zkClient.listSessions(), zkClient.listPartitions()); + if (changeset.length > 0) { + zkClient.updatePartitionsConfiguration(changeset); + } + }); + } catch (Exception e) { + switchState(new CleanupState(e)); + } } } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/StartingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/StartingState.java index 6b6cfb0a70..dc1259c0e9 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/StartingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/StartingState.java @@ -78,7 +78,12 @@ protected Response.StatusType getStatus() { return; } - getContext().registerSession(); + try { + getContext().registerSession(); + } catch (Exception ex) { + switchState(new CleanupState(ex)); + return; + } try { getOut().onInitialized(getSessionId()); diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java index 47e31ecaf0..3a4fc95d09 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java @@ -264,7 +264,7 @@ public ZKSubscription subscribeForOffsetChanges(final EventTypePartition key, fi public final void resetCursors(final List cursors, final long timeout) throws OperationTimeoutException, ZookeeperException, OperationInterruptedException, RequestInProgressException { - ZKSubscription sessionsListener = null; + ZkSubscr> sessionsListener = null; boolean resetWasAlreadyInitiated = false; try { // close subscription connections @@ -281,13 +281,11 @@ public final void resetCursors(final List cursor final long finishAt = System.currentTimeMillis() + timeout; while (finishAt > System.currentTimeMillis()) { if (sessionsChanged.compareAndSet(true, false)) { - if (getCurator().getChildren().forPath(getSubscriptionPath("/sessions")).isEmpty()) { + if (sessionsListener.getData().isEmpty()) { forceCommitOffsets(cursors); return; } - sessionsListener.refresh(); } - synchronized (sessionsChanged) { sessionsChanged.wait(100); } @@ -305,7 +303,7 @@ public final void resetCursors(final List cursor throw new ZookeeperException("Unexpected problem occurred when resetting cursors", e); } finally { if (sessionsListener != null) { - sessionsListener.cancel(); + sessionsListener.close(); } try { @@ -321,9 +319,9 @@ public final void resetCursors(final List cursor } @Override - public final ZKSubscription subscribeForSessionListChanges(final Runnable listener) { + public final ZkSubscr> subscribeForSessionListChanges(final Runnable listener) throws Exception { getLog().info("subscribeForSessionListChanges: " + listener.hashCode()); - return ChangeListener.forChildren(getCurator(), getSubscriptionPath("/sessions"), listener); + return new ZkSubscrImpl.ZkSubscrChildrenImpl(getCurator(), listener, getSubscriptionPath("/sessions")); } @Override diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscr.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscr.java new file mode 100644 index 0000000000..bbcc570d4c --- /dev/null +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscr.java @@ -0,0 +1,11 @@ +package org.zalando.nakadi.service.subscription.zk; + +import java.io.Closeable; + +public interface ZkSubscr extends Closeable { + + T getData() throws Exception; + + @Override + void close(); +} diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscrImpl.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscrImpl.java new file mode 100644 index 0000000000..6af762700f --- /dev/null +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscrImpl.java @@ -0,0 +1,128 @@ +package org.zalando.nakadi.service.subscription.zk; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.GetChildrenBuilder; +import org.apache.curator.framework.api.GetDataBuilder; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; + +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +public abstract class ZkSubscrImpl implements ZkSubscr, Watcher { + protected final CuratorFramework curatorFramework; + protected final String key; + private final AtomicReference listener; + private final AtomicReference> data = new AtomicReference<>(); + private final Function converter; + + private static class ExceptionOrData { + private final Exception ex; + private final T data; + + ExceptionOrData(final Exception ex) { + this.ex = ex; + this.data = null; + } + + ExceptionOrData(final T data) { + this.data = data; + this.ex = null; + } + + public T get() throws Exception { + if (null != ex) { + throw ex; + } + return data; + } + } + + public ZkSubscrImpl( + final CuratorFramework curatorFramework, + final Runnable listener, + final Function converter, + final String key) { + this.listener = new AtomicReference<>(listener); + this.curatorFramework = curatorFramework; + this.key = key; + this.converter = converter; + } + + @Override + public ReturnType getData() throws Exception { + if (data.get() == null) { // If there is new value pending + try { + // create listener only in case if subscription is still active. + final ZkType zkData = query(null != listener.get()); + data.set(new ExceptionOrData<>(converter.apply(zkData))); + } catch (Exception ex) { + data.set(new ExceptionOrData<>(ex)); + } + } + return data.get().get(); + } + + @Override + public void close() { + listener.set(null); + } + + protected abstract ZkType query(boolean createListener) throws Exception; + + @Override + public void process(final WatchedEvent event) { + // on this call one actually notifies that data has changed and waits for refresh call. + // The reason for that is that sometimes it is not possible to query data from zk while being called from + // notification callback. + data.set(null); + final Runnable toNotify = listener.get(); + // In case if subscription is still active - notify + if (null != toNotify) { + toNotify.run(); + } + } + + public static class ZkSubscrValueImpl extends ZkSubscrImpl { + + public ZkSubscrValueImpl( + final CuratorFramework curatorFramework, + final Runnable listener, + final Function converter, + final String key) throws Exception { + super(curatorFramework, listener, converter, key); + // The very first call is used to initialize listener + query(true); + } + + @Override + protected byte[] query(final boolean setListener) throws Exception { + final GetDataBuilder builder = curatorFramework.getData(); + if (setListener) { + builder.usingWatcher(this); + } + return builder.forPath(key); + } + } + + public static class ZkSubscrChildrenImpl extends ZkSubscrImpl, List> { + + public ZkSubscrChildrenImpl( + final CuratorFramework curatorFramework, + final Runnable listener, + final String key) throws Exception { + super(curatorFramework, listener, Function.identity(), key); + query(true); + } + + @Override + protected List query(final boolean setListener) throws Exception { + final GetChildrenBuilder builder = curatorFramework.getChildren(); + if (setListener) { + builder.usingWatcher(this); + } + return builder.forPath(key); + } + } +} diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java index b8a4b75329..9f528913a5 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java @@ -75,7 +75,7 @@ void updatePartitionsConfiguration(Partition[] partitions) throws NakadiRuntimeE * * @param listener method to call on any change of client list. */ - ZKSubscription subscribeForSessionListChanges(Runnable listener); + ZkSubscr> subscribeForSessionListChanges(Runnable listener) throws Exception; /** * Subscribe for topology changes. From 10b09f618bc88489fb100cb4170cd57e601ab662 Mon Sep 17 00:00:00 2001 From: dsorokin Date: Tue, 24 Oct 2017 13:22:31 +0200 Subject: [PATCH 2/8] ARUHA-1255 Use different zk listener for topology changes --- .../subscription/StreamingContext.java | 20 ++--- .../subscription/state/ClosingState.java | 23 ++++-- .../subscription/state/StreamingState.java | 32 ++++---- .../zk/AbstractZkSubscriptionClient.java | 6 -- .../zk/NewZkSubscriptionClient.java | 73 ++++++------------- .../service/subscription/zk/ZkSubscr.java | 4 +- .../service/subscription/zk/ZkSubscrImpl.java | 37 ++++++---- .../subscription/zk/ZkSubscriptionClient.java | 47 +++++++++++- .../state/StreamingStateTest.java | 15 ++-- 9 files changed, 143 insertions(+), 114 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java b/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java index 1ea4935364..0698b6bc54 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java @@ -236,18 +236,14 @@ public ObjectMapper getObjectMapper() { private void rebalance() { if (null != sessionListSubscription) { - try { - // This call is needed to renew subscription for session list changes. - sessionListSubscription.getData(); - zkClient.runLocked(() -> { - final Partition[] changeset = rebalancer.apply(zkClient.listSessions(), zkClient.listPartitions()); - if (changeset.length > 0) { - zkClient.updatePartitionsConfiguration(changeset); - } - }); - } catch (Exception e) { - switchState(new CleanupState(e)); - } + // This call is needed to renew subscription for session list changes. + sessionListSubscription.getData(); + zkClient.runLocked(() -> { + final Partition[] changeset = rebalancer.apply(zkClient.listSessions(), zkClient.listPartitions()); + if (changeset.length > 0) { + zkClient.updatePartitionsConfiguration(changeset); + } + }); } } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/ClosingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/ClosingState.java index cb2193c7c3..3936975489 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/ClosingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/ClosingState.java @@ -6,6 +6,8 @@ import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1; import org.zalando.nakadi.service.subscription.model.Partition; import org.zalando.nakadi.service.subscription.zk.ZKSubscription; +import org.zalando.nakadi.service.subscription.zk.ZkSubscr; +import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; import java.util.Collection; import java.util.Collections; @@ -23,7 +25,7 @@ class ClosingState extends State { private final LongSupplier lastCommitSupplier; private Map uncommittedOffsets; private final Map listeners = new HashMap<>(); - private ZKSubscription topologyListener; + private ZkSubscr topologyListener; ClosingState(final Supplier> uncommittedOffsetsSupplier, final LongSupplier lastCommitSupplier) { @@ -41,7 +43,7 @@ public void onExit() { } finally { if (null != topologyListener) { try { - topologyListener.cancel(); + topologyListener.close(); } finally { topologyListener = null; } @@ -56,7 +58,12 @@ public void onEnter() { uncommittedOffsets = uncommittedOffsetsSupplier.get(); if (!uncommittedOffsets.isEmpty() && timeToWaitMillis > 0) { scheduleTask(() -> switchState(new CleanupState()), timeToWaitMillis, TimeUnit.MILLISECONDS); - topologyListener = getZk().subscribeForTopologyChanges(() -> addTask(this::onTopologyChanged)); + try { + topologyListener = getZk().subscribeForTopologyChanges(() -> addTask(this::onTopologyChanged)); + } catch (final Exception e) { + switchState(new CleanupState(e)); + return; + } reactOnTopologyChange(); } else { switchState(new CleanupState()); @@ -68,19 +75,19 @@ private void onTopologyChanged() { throw new IllegalStateException( "topologyListener should not be null when calling onTopologyChanged method"); } - topologyListener.refresh(); reactOnTopologyChange(); } - private void reactOnTopologyChange() { + private void reactOnTopologyChange() throws NakadiRuntimeException { + final ZkSubscriptionClient.Topology topology = topologyListener.getData(); // Collect current partitions state from Zk final Map partitions = new HashMap<>(); - getZk().runLocked(() -> Stream.of(getZk().listPartitions()) + Stream.of(topology.getPartitions()) .filter(p -> getSessionId().equals(p.getSession())) - .forEach(p -> partitions.put(p.getKey(), p))); + .forEach(p -> partitions.put(p.getKey(), p)); + // Select which partitions need to be freed from this session - // Ithere final Set freeRightNow = new HashSet<>(); final Set addListeners = new HashSet<>(); for (final Partition p : partitions.values()) { diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java index de495184ae..92a3a3769e 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java @@ -18,6 +18,8 @@ import org.zalando.nakadi.repository.EventConsumer; import org.zalando.nakadi.service.subscription.model.Partition; import org.zalando.nakadi.service.subscription.zk.ZKSubscription; +import org.zalando.nakadi.service.subscription.zk.ZkSubscr; +import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; import org.zalando.nakadi.view.SubscriptionCursor; import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; @@ -43,7 +45,7 @@ class StreamingState extends State { // The reasons for that if there are two partitions (p0, p1) and p0 is reassigned, if p1 is working // correctly, and p0 is not receiving any updates - reassignment won't complete. private final Map releasingPartitions = new HashMap<>(); - private ZKSubscription topologyChangeSubscription; + private ZkSubscr topologyChangeSubscription; private EventConsumer.ReassignableEventConsumer eventConsumer; private boolean pollPaused; private long committedEvents; @@ -74,7 +76,8 @@ public void onEnter() { this.eventConsumer = getContext().getTimelineService().createEventConsumer(null); // Subscribe for topology changes. - this.topologyChangeSubscription = getZk().subscribeForTopologyChanges(() -> addTask(this::topologyChanged)); + this.topologyChangeSubscription = + getZk().subscribeForTopologyChanges(() -> addTask(this::reactOnTopologyChange)); // and call directly reactOnTopologyChange(); addTask(this::pollDataFromKafka); @@ -261,7 +264,7 @@ public void onExit() { if (null != topologyChangeSubscription) { try { - topologyChangeSubscription.cancel(); + topologyChangeSubscription.close(); } catch (final RuntimeException ex) { getLog().warn("Failed to cancel topology subscription", ex); } finally { @@ -285,20 +288,15 @@ public void onExit() { } } - void topologyChanged() { - if (null != topologyChangeSubscription) { - topologyChangeSubscription.refresh(); + void reactOnTopologyChange() { + if (null == topologyChangeSubscription) { + return; } - reactOnTopologyChange(); - } - - private void reactOnTopologyChange() { - getZk().runLocked(() -> { - final Partition[] assignedPartitions = Stream.of(getZk().listPartitions()) - .filter(p -> getSessionId().equals(p.getSession())) - .toArray(Partition[]::new); - addTask(() -> refreshTopologyUnlocked(assignedPartitions)); - }); + final ZkSubscriptionClient.Topology topology = topologyChangeSubscription.getData(); + final Partition[] assignedPartitions = Stream.of(topology.getPartitions()) + .filter(p -> getSessionId().equals(p.getSession())) + .toArray(Partition[]::new); + addTask(() -> refreshTopologyUnlocked(assignedPartitions)); } void refreshTopologyUnlocked(final Partition[] assignedPartitions) { @@ -476,7 +474,7 @@ void offsetChanged(final EventTypePartition key) { reconfigureKafkaConsumer(true); } - if (commitResult.committedCount > 0) { + if (commitResult.committedCount > 0) { committedEvents += commitResult.committedCount; this.lastCommitMillis = System.currentTimeMillis(); streamToOutput(); diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java index 3a4fc95d09..6e1e2072d9 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java @@ -324,12 +324,6 @@ public final ZkSubscr> subscribeForSessionListChanges(final Runnabl return new ZkSubscrImpl.ZkSubscrChildrenImpl(getCurator(), listener, getSubscriptionPath("/sessions")); } - @Override - public final ZKSubscription subscribeForTopologyChanges(final Runnable onTopologyChanged) { - getLog().info("subscribeForTopologyChanges"); - return ChangeListener.forData(getCurator(), getSubscriptionPath(NODE_TOPOLOGY), onTopologyChanged); - } - @Override public final ZkSubscriptionNode getZkSubscriptionNodeLocked() throws SubscriptionNotInitializedException { final ZkSubscriptionNode subscriptionNode = new ZkSubscriptionNode(); diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java index 40ce13e47e..3eba4fa154 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java @@ -1,17 +1,15 @@ package org.zalando.nakadi.service.subscription.zk; -import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.KeeperException; import org.zalando.nakadi.domain.EventTypePartition; import org.zalando.nakadi.exceptions.NakadiRuntimeException; -import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1; import org.zalando.nakadi.service.subscription.model.Partition; import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; +import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.stream.Stream; @@ -59,48 +57,6 @@ public class NewZkSubscriptionClient extends AbstractZkSubscriptionClient { private final ObjectMapper objectMapper; - public static class Topology { - private final Partition[] partitions; - private final int version; - - public Topology( - @JsonProperty("partitions") final Partition[] partitions, - @JsonProperty("version") final int version) { - this.partitions = partitions; - this.version = version; - } - - public Partition[] getPartitions() { - return partitions; - } - - public Topology withUpdatedPartitions(final Partition[] partitions) { - final Partition[] resultPartitions = Arrays.copyOf(this.partitions, this.partitions.length); - for (final Partition newValue : partitions) { - int selectedIdx = -1; - for (int idx = 0; idx < resultPartitions.length; ++idx) { - if (resultPartitions[idx].getKey().equals(newValue.getKey())) { - selectedIdx = idx; - } - } - if (selectedIdx < 0) { - throw new MyNakadiRuntimeException1( - "Failed to find partition " + newValue.getKey() + " in " + this); - } - resultPartitions[selectedIdx] = newValue; - } - return new Topology(resultPartitions, version + 1); - } - - @Override - public String toString() { - return "Topology{" + - "partitions=" + Arrays.toString(partitions) + - ", version=" + version + - '}'; - } - } - public NewZkSubscriptionClient( final String subscriptionId, final CuratorFramework curatorFramework, @@ -145,10 +101,7 @@ public void updatePartitionsConfiguration(final Partition[] partitions) throws N private Topology readTopology() throws NakadiRuntimeException, SubscriptionNotInitializedException { try { - final byte[] data = getCurator().getData().forPath(getSubscriptionPath(NODE_TOPOLOGY)); - final Topology result = objectMapper.readValue(data, Topology.class); - getLog().info("Topology is {}", result); - return result; + return parseTopology(getCurator().getData().forPath(getSubscriptionPath(NODE_TOPOLOGY))); } catch (KeeperException.NoNodeException ex) { throw new SubscriptionNotInitializedException(getSubscriptionId()); } catch (final Exception ex) { @@ -156,6 +109,28 @@ private Topology readTopology() throws NakadiRuntimeException, } } + private Topology parseTopology(final byte[] data) { + try { + final Topology result = objectMapper.readValue(data, Topology.class); + getLog().info("Topology is {}", result); + return result; + } catch (IOException e) { + throw new NakadiRuntimeException(e); + } + } + + @Override + public final ZkSubscr subscribeForTopologyChanges(final Runnable onTopologyChanged) + throws NakadiRuntimeException { + getLog().info("subscribeForTopologyChanges"); + return new ZkSubscrImpl.ZkSubscrValueImpl<>( + getCurator(), + onTopologyChanged, + this::parseTopology, + getSubscriptionPath(NODE_TOPOLOGY)); + } + + @Override public Partition[] listPartitions() throws NakadiRuntimeException, SubscriptionNotInitializedException { return readTopology().getPartitions(); diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscr.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscr.java index bbcc570d4c..709d0d0fc0 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscr.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscr.java @@ -1,10 +1,12 @@ package org.zalando.nakadi.service.subscription.zk; +import org.zalando.nakadi.exceptions.NakadiRuntimeException; + import java.io.Closeable; public interface ZkSubscr extends Closeable { - T getData() throws Exception; + T getData() throws NakadiRuntimeException; @Override void close(); diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscrImpl.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscrImpl.java index 6af762700f..37ce2f291c 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscrImpl.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscrImpl.java @@ -5,6 +5,7 @@ import org.apache.curator.framework.api.GetDataBuilder; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import org.zalando.nakadi.exceptions.NakadiRuntimeException; import java.util.List; import java.util.concurrent.atomic.AtomicReference; @@ -18,10 +19,10 @@ public abstract class ZkSubscrImpl implements ZkSubscr converter; private static class ExceptionOrData { - private final Exception ex; + private final NakadiRuntimeException ex; private final T data; - ExceptionOrData(final Exception ex) { + ExceptionOrData(final NakadiRuntimeException ex) { this.ex = ex; this.data = null; } @@ -31,7 +32,7 @@ private static class ExceptionOrData { this.ex = null; } - public T get() throws Exception { + public T get() throws NakadiRuntimeException { if (null != ex) { throw ex; } @@ -51,13 +52,13 @@ public ZkSubscrImpl( } @Override - public ReturnType getData() throws Exception { + public ReturnType getData() throws NakadiRuntimeException { if (data.get() == null) { // If there is new value pending try { // create listener only in case if subscription is still active. final ZkType zkData = query(null != listener.get()); data.set(new ExceptionOrData<>(converter.apply(zkData))); - } catch (Exception ex) { + } catch (NakadiRuntimeException ex) { data.set(new ExceptionOrData<>(ex)); } } @@ -69,7 +70,7 @@ public void close() { listener.set(null); } - protected abstract ZkType query(boolean createListener) throws Exception; + protected abstract ZkType query(boolean createListener) throws NakadiRuntimeException; @Override public void process(final WatchedEvent event) { @@ -90,19 +91,23 @@ public ZkSubscrValueImpl( final CuratorFramework curatorFramework, final Runnable listener, final Function converter, - final String key) throws Exception { + final String key) throws NakadiRuntimeException { super(curatorFramework, listener, converter, key); // The very first call is used to initialize listener - query(true); + getData(); } @Override - protected byte[] query(final boolean setListener) throws Exception { + protected byte[] query(final boolean setListener) throws NakadiRuntimeException { final GetDataBuilder builder = curatorFramework.getData(); if (setListener) { builder.usingWatcher(this); } - return builder.forPath(key); + try { + return builder.forPath(key); + } catch (final Exception ex) { + throw new NakadiRuntimeException(ex); + } } } @@ -111,18 +116,22 @@ public static class ZkSubscrChildrenImpl extends ZkSubscrImpl, List public ZkSubscrChildrenImpl( final CuratorFramework curatorFramework, final Runnable listener, - final String key) throws Exception { + final String key) throws NakadiRuntimeException { super(curatorFramework, listener, Function.identity(), key); - query(true); + getData(); } @Override - protected List query(final boolean setListener) throws Exception { + protected List query(final boolean setListener) throws NakadiRuntimeException { final GetChildrenBuilder builder = curatorFramework.getChildren(); if (setListener) { builder.usingWatcher(this); } - return builder.forPath(key); + try { + return builder.forPath(key); + } catch (final Exception ex) { + throw new NakadiRuntimeException(ex); + } } } } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java index 9f528913a5..836471ad0a 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java @@ -1,14 +1,17 @@ package org.zalando.nakadi.service.subscription.zk; +import com.fasterxml.jackson.annotation.JsonProperty; import org.zalando.nakadi.domain.EventTypePartition; import org.zalando.nakadi.exceptions.NakadiRuntimeException; import org.zalando.nakadi.exceptions.ServiceUnavailableException; +import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1; import org.zalando.nakadi.exceptions.runtime.OperationTimeoutException; import org.zalando.nakadi.exceptions.runtime.ZookeeperException; import org.zalando.nakadi.service.subscription.model.Partition; import org.zalando.nakadi.service.subscription.model.Session; import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; +import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.List; @@ -83,7 +86,7 @@ void updatePartitionsConfiguration(Partition[] partitions) throws NakadiRuntimeE * @param listener called whenever /nakadi/subscriptions/{subscriptionId}/topology node is changed. * @return Subscription instance */ - ZKSubscription subscribeForTopologyChanges(Runnable listener); + ZkSubscr subscribeForTopologyChanges(Runnable listener) throws NakadiRuntimeException; ZKSubscription subscribeForOffsetChanges(EventTypePartition key, Runnable commitListener); @@ -152,4 +155,46 @@ ZKSubscription subscribeForCursorsReset(Runnable listener) */ void resetCursors(List cursors, long timeout) throws OperationTimeoutException, ZookeeperException; + + class Topology { + private final Partition[] partitions; + private final int version; + + public Topology( + @JsonProperty("partitions") final Partition[] partitions, + @JsonProperty("version") final int version) { + this.partitions = partitions; + this.version = version; + } + + public Partition[] getPartitions() { + return partitions; + } + + public Topology withUpdatedPartitions(final Partition[] partitions) { + final Partition[] resultPartitions = Arrays.copyOf(this.partitions, this.partitions.length); + for (final Partition newValue : partitions) { + int selectedIdx = -1; + for (int idx = 0; idx < resultPartitions.length; ++idx) { + if (resultPartitions[idx].getKey().equals(newValue.getKey())) { + selectedIdx = idx; + } + } + if (selectedIdx < 0) { + throw new MyNakadiRuntimeException1( + "Failed to find partition " + newValue.getKey() + " in " + this); + } + resultPartitions[selectedIdx] = newValue; + } + return new Topology(resultPartitions, version + 1); + } + + @Override + public String toString() { + return "Topology{" + + "partitions=" + Arrays.toString(partitions) + + ", version=" + version + + '}'; + } + } } diff --git a/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java b/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java index f9225e4b1b..a4074e49b5 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java @@ -20,6 +20,7 @@ import org.zalando.nakadi.service.subscription.StreamingContext; import org.zalando.nakadi.service.subscription.model.Partition; import org.zalando.nakadi.service.subscription.zk.ZKSubscription; +import org.zalando.nakadi.service.subscription.zk.ZkSubscr; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; import org.zalando.nakadi.service.timeline.TimelineService; import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; @@ -85,22 +86,24 @@ public void prepareMocks() throws Exception { @Test public void ensureTopologyEventListenerRegisteredRefreshedClosed() { - final ZKSubscription topologySubscription = mock(ZKSubscription.class); + final ZkSubscr topologySubscription = mock(ZkSubscr.class); + Mockito.when(topologySubscription.getData()) + .thenReturn(new ZkSubscriptionClient.Topology(new Partition[]{}, 1)); Mockito.when(zkMock.subscribeForTopologyChanges(Mockito.anyObject())).thenReturn(topologySubscription); state.onEnter(); Mockito.verify(zkMock, Mockito.times(1)).subscribeForTopologyChanges(Mockito.any()); - Mockito.verify(topologySubscription, Mockito.times(0)).refresh(); + Mockito.verify(topologySubscription, Mockito.times(1)).getData(); - state.topologyChanged(); + state.reactOnTopologyChange(); - Mockito.verify(topologySubscription, Mockito.times(1)).refresh(); - Mockito.verify(topologySubscription, Mockito.times(0)).cancel(); + Mockito.verify(topologySubscription, Mockito.times(2)).getData(); + Mockito.verify(topologySubscription, Mockito.times(0)).close(); state.onExit(); - Mockito.verify(topologySubscription, Mockito.times(1)).cancel(); + Mockito.verify(topologySubscription, Mockito.times(1)).close(); // verify that no new locks created. Mockito.verify(zkMock, Mockito.times(1)).subscribeForTopologyChanges(Mockito.any()); } From 698344253f2359b26cfc725b1de3b19a46efdaa9 Mon Sep 17 00:00:00 2001 From: dsorokin Date: Tue, 24 Oct 2017 13:36:37 +0200 Subject: [PATCH 3/8] ARUHA-1255 Use different zk listener for offsets commits --- .../subscription/state/ClosingState.java | 22 ++++++++----------- .../subscription/state/PartitionData.java | 12 +++++----- .../subscription/state/StreamingState.java | 7 +++--- .../zk/AbstractZkSubscriptionClient.java | 10 +++++++-- .../subscription/zk/ZkSubscriptionClient.java | 2 +- .../state/StreamingStateTest.java | 15 ++++++------- 6 files changed, 35 insertions(+), 33 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/ClosingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/ClosingState.java index 3936975489..2c5cd87f0b 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/ClosingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/ClosingState.java @@ -5,9 +5,9 @@ import org.zalando.nakadi.exceptions.NakadiRuntimeException; import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1; import org.zalando.nakadi.service.subscription.model.Partition; -import org.zalando.nakadi.service.subscription.zk.ZKSubscription; import org.zalando.nakadi.service.subscription.zk.ZkSubscr; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; +import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; import java.util.Collection; import java.util.Collections; @@ -24,7 +24,7 @@ class ClosingState extends State { private final Supplier> uncommittedOffsetsSupplier; private final LongSupplier lastCommitSupplier; private Map uncommittedOffsets; - private final Map listeners = new HashMap<>(); + private final Map> listeners = new HashMap<>(); private ZkSubscr topologyListener; ClosingState(final Supplier> uncommittedOffsetsSupplier, @@ -115,21 +115,17 @@ private void registerListener(final EventTypePartition key) { listeners.put( key, getZk().subscribeForOffsetChanges( - key, () -> addTask(() -> this.offsetChanged(key)))); - reactOnOffset(key); - } - - private void offsetChanged(final EventTypePartition key) { - if (listeners.containsKey(key)) { - listeners.get(key).refresh(); - } + key, () -> addTask(() -> this.reactOnOffset(key)))); reactOnOffset(key); } private void reactOnOffset(final EventTypePartition key) { + if (!listeners.containsKey(key)) { + return; + } final NakadiCursor newCursor; try { - newCursor = getContext().getCursorConverter().convert(key.getEventType(), getZk().getOffset(key)); + newCursor = getContext().getCursorConverter().convert(key.getEventType(), listeners.get(key).getData()); } catch (Exception ex) { throw new NakadiRuntimeException(ex); } @@ -149,10 +145,10 @@ private void freePartitions(final Collection keys) { RuntimeException exceptionCaught = null; for (final EventTypePartition partitionKey : keys) { uncommittedOffsets.remove(partitionKey); - final ZKSubscription listener = listeners.remove(partitionKey); + final ZkSubscr listener = listeners.remove(partitionKey); if (null != listener) { try { - listener.cancel(); + listener.close(); } catch (final RuntimeException ex) { exceptionCaught = ex; getLog().error("Failed to cancel offsets listener {}", listener, ex); diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/PartitionData.java b/src/main/java/org/zalando/nakadi/service/subscription/state/PartitionData.java index 42fe72bc45..876dc4fd0a 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/PartitionData.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/PartitionData.java @@ -5,7 +5,8 @@ import org.slf4j.LoggerFactory; import org.zalando.nakadi.domain.ConsumedEvent; import org.zalando.nakadi.domain.NakadiCursor; -import org.zalando.nakadi.service.subscription.zk.ZKSubscription; +import org.zalando.nakadi.service.subscription.zk.ZkSubscr; +import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; import javax.annotation.Nullable; import java.util.ArrayList; @@ -16,7 +17,7 @@ import java.util.TreeSet; class PartitionData { - private final ZKSubscription subscription; + private final ZkSubscr subscription; private final List nakadiEvents = new LinkedList<>(); private final NavigableSet allCursorsOrdered = new TreeSet<>(); private final Logger log; @@ -27,12 +28,13 @@ class PartitionData { private int keepAliveInARow; @VisibleForTesting - PartitionData(final ZKSubscription subscription, final NakadiCursor commitOffset, final long currentTime) { + PartitionData(final ZkSubscr subscription, final NakadiCursor commitOffset, + final long currentTime) { this(subscription, commitOffset, LoggerFactory.getLogger(PartitionData.class), currentTime); } PartitionData( - final ZKSubscription subscription, + final ZkSubscr subscription, final NakadiCursor commitOffset, final Logger log, final long currentTime) { @@ -161,7 +163,7 @@ int getUnconfirmed() { return allCursorsOrdered.headSet(sentOffset, true).size(); } - public ZKSubscription getSubscription() { + public ZkSubscr getSubscription() { return subscription; } } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java index 92a3a3769e..8efb756eb4 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java @@ -437,7 +437,7 @@ private NakadiCursor createNakadiCursor(final SubscriptionCursorWithoutToken cur private void addToStreaming(final Partition partition) { final NakadiCursor cursor = createNakadiCursor(getZk().getOffset(partition.getKey())); getLog().info("Adding to streaming {} with start position {}", partition.getKey(), cursor); - final ZKSubscription subscription = getZk().subscribeForOffsetChanges( + final ZkSubscr subscription = getZk().subscribeForOffsetChanges( partition.getKey(), () -> addTask(() -> offsetChanged(partition.getKey()))); final PartitionData pd = new PartitionData( @@ -465,9 +465,8 @@ private void reassignCommitted() { void offsetChanged(final EventTypePartition key) { if (offsets.containsKey(key)) { final PartitionData data = offsets.get(key); - data.getSubscription().refresh(); - final NakadiCursor cursor = createNakadiCursor(getZk().getOffset(key)); + final NakadiCursor cursor = createNakadiCursor(data.getSubscription().getData()); final PartitionData.CommitResult commitResult = data.onCommitOffset(cursor); if (commitResult.seekOnKafka) { @@ -501,7 +500,7 @@ private void removeFromStreaming(final EventTypePartition key) { getLog().warn("Skipping commits: {}, commit={}, sent={}", key, data.getCommitOffset(), data.getSentOffset()); } - data.getSubscription().cancel(); + data.getSubscription().close(); } catch (final RuntimeException ex) { getLog().warn("Failed to cancel subscription, skipping exception", ex); } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java index 6e1e2072d9..16d67bc616 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java @@ -254,10 +254,16 @@ public void cancel() { } @Override - public ZKSubscription subscribeForOffsetChanges(final EventTypePartition key, final Runnable commitListener) { + public ZkSubscr subscribeForOffsetChanges( + final EventTypePartition key, final Runnable commitListener) { final String path = getOffsetPath(key); getLog().info("subscribeForOffsetChanges: {}, path: {}", key, path); - return ChangeListener.forData(getCurator(), path, commitListener); + return new ZkSubscrImpl.ZkSubscrValueImpl<>( + getCurator(), + commitListener, + data -> new SubscriptionCursorWithoutToken( + key.getEventType(), key.getPartition(), new String(data, UTF_8)), + path); } @Override diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java index 836471ad0a..ae117720c5 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java @@ -88,7 +88,7 @@ void updatePartitionsConfiguration(Partition[] partitions) throws NakadiRuntimeE */ ZkSubscr subscribeForTopologyChanges(Runnable listener) throws NakadiRuntimeException; - ZKSubscription subscribeForOffsetChanges(EventTypePartition key, Runnable commitListener); + ZkSubscr subscribeForOffsetChanges(EventTypePartition key, Runnable commitListener); /** * Returns current offset value for specified partition key. Offset includes timeline and version data. diff --git a/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java b/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java index a4074e49b5..c5d69ada5a 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java @@ -19,7 +19,6 @@ import org.zalando.nakadi.service.subscription.StreamParameters; import org.zalando.nakadi.service.subscription.StreamingContext; import org.zalando.nakadi.service.subscription.model.Partition; -import org.zalando.nakadi.service.subscription.zk.ZKSubscription; import org.zalando.nakadi.service.subscription.zk.ZkSubscr; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; import org.zalando.nakadi.service.timeline.TimelineService; @@ -112,7 +111,7 @@ public void ensureTopologyEventListenerRegisteredRefreshedClosed() { public void ensureOffsetsSubscriptionsAreRefreshedAndClosed() throws InternalNakadiException, NoSuchEventTypeException, ServiceUnavailableException, InvalidCursorException { - final ZKSubscription offsetSubscription = mock(ZKSubscription.class); + final ZkSubscr offsetSubscription = mock(ZkSubscr.class); final EventTypePartition pk = new EventTypePartition("t", "0"); Mockito.when(zkMock.subscribeForOffsetChanges(Mockito.eq(pk), Mockito.any())).thenReturn(offsetSubscription); @@ -140,19 +139,19 @@ public void ensureOffsetsSubscriptionsAreRefreshedAndClosed() pk.getEventType(), pk.getPartition(), SESSION_ID, null, Partition.State.ASSIGNED)}); Mockito.verify(zkMock, Mockito.times(1)).subscribeForOffsetChanges(Mockito.eq(pk), Mockito.any()); - Mockito.verify(offsetSubscription, Mockito.times(0)).cancel(); - Mockito.verify(offsetSubscription, Mockito.times(0)).refresh(); + Mockito.verify(offsetSubscription, Mockito.times(0)).close(); + Mockito.verify(offsetSubscription, Mockito.times(0)).getData(); state.offsetChanged(pk); Mockito.verify(zkMock, Mockito.times(1)).subscribeForOffsetChanges(Mockito.eq(pk), Mockito.any()); - Mockito.verify(offsetSubscription, Mockito.times(0)).cancel(); - Mockito.verify(offsetSubscription, Mockito.times(1)).refresh(); + Mockito.verify(offsetSubscription, Mockito.times(0)).close(); + Mockito.verify(offsetSubscription, Mockito.times(1)).getData(); // Verify that offset change listener is removed state.refreshTopologyUnlocked(new Partition[0]); Mockito.verify(zkMock, Mockito.times(1)).subscribeForOffsetChanges(Mockito.eq(pk), Mockito.any()); - Mockito.verify(offsetSubscription, Mockito.times(1)).cancel(); - Mockito.verify(offsetSubscription, Mockito.times(1)).refresh(); + Mockito.verify(offsetSubscription, Mockito.times(1)).close(); + Mockito.verify(offsetSubscription, Mockito.times(1)).getData(); } } From 6ec426341fb6b61ff72d946f32fd9162070f37ad Mon Sep 17 00:00:00 2001 From: dsorokin Date: Tue, 24 Oct 2017 13:56:01 +0200 Subject: [PATCH 4/8] ARUHA-1255 Use different zk listener for reset cursors --- .../subscription/state/StreamingState.java | 9 +++++--- .../zk/AbstractZkSubscriptionClient.java | 23 +++++++------------ .../subscription/zk/ZkSubscriptionClient.java | 3 ++- 3 files changed, 16 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java index 8efb756eb4..fe6ade46e3 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java @@ -17,12 +17,12 @@ import org.zalando.nakadi.metrics.MetricUtils; import org.zalando.nakadi.repository.EventConsumer; import org.zalando.nakadi.service.subscription.model.Partition; -import org.zalando.nakadi.service.subscription.zk.ZKSubscription; import org.zalando.nakadi.service.subscription.zk.ZkSubscr; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; import org.zalando.nakadi.view.SubscriptionCursor; import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; +import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -54,7 +54,7 @@ class StreamingState extends State { private Meter bytesSentMeter; // Uncommitted offsets are calculated right on exiting from Streaming state. private Map uncommittedOffsets; - private ZKSubscription cursorResetSubscription; + private Closeable cursorResetSubscription; /** * Time that is used for commit timeout check. Commit timeout check is working only in case when there is something @@ -283,7 +283,10 @@ public void onExit() { } if (cursorResetSubscription != null) { - cursorResetSubscription.cancel(); + try { + cursorResetSubscription.close(); + } catch (IOException ignore) { + } cursorResetSubscription = null; } } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java index 16d67bc616..67736afa9f 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java @@ -23,6 +23,7 @@ import org.zalando.nakadi.service.subscription.model.Session; import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; +import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -224,7 +225,7 @@ public final boolean isCursorResetInProgress() { } @Override - public final ZKSubscription subscribeForCursorsReset(final Runnable listener) + public final Closeable subscribeForCursorsReset(final Runnable listener) throws NakadiRuntimeException, UnsupportedOperationException { final NodeCache cursorResetCache = new NodeCache(getCurator(), resetCursorPath); cursorResetCache.getListenable().addListener(listener::run); @@ -235,20 +236,12 @@ public final ZKSubscription subscribeForCursorsReset(final Runnable listener) throw new NakadiRuntimeException(e); } - return new ZKSubscription() { - @Override - public void refresh() { - throw new UnsupportedOperationException(); - } - - @Override - public void cancel() { - try { - cursorResetCache.getListenable().clear(); - cursorResetCache.close(); - } catch (final IOException e) { - throw new NakadiRuntimeException(e); - } + return () -> { + try { + cursorResetCache.getListenable().clear(); + cursorResetCache.close(); + } catch (final IOException e) { + throw new NakadiRuntimeException(e); } }; } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java index ae117720c5..e05dd8404a 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java @@ -11,6 +11,7 @@ import org.zalando.nakadi.service.subscription.model.Session; import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; +import java.io.Closeable; import java.util.Arrays; import java.util.Collection; import java.util.Comparator; @@ -135,7 +136,7 @@ void transfer(String sessionId, Collection partitions) * @param listener callback which is called when cursor reset happens * @return {@link org.zalando.nakadi.service.subscription.zk.ZKSubscription} */ - ZKSubscription subscribeForCursorsReset(Runnable listener) + Closeable subscribeForCursorsReset(Runnable listener) throws NakadiRuntimeException, UnsupportedOperationException; /** From 54499f66d6ebee97a5dd02bc646f2efbc572af81 Mon Sep 17 00:00:00 2001 From: dsorokin Date: Tue, 24 Oct 2017 14:19:00 +0200 Subject: [PATCH 5/8] ARUHA-1255 Remove obsolete subscription classes --- .../subscription/zk/ChangeListener.java | 85 ------------------- .../subscription/zk/ZKSubscription.java | 7 -- .../subscription/zk/ZkSubscriptionClient.java | 2 +- 3 files changed, 1 insertion(+), 93 deletions(-) delete mode 100644 src/main/java/org/zalando/nakadi/service/subscription/zk/ChangeListener.java delete mode 100644 src/main/java/org/zalando/nakadi/service/subscription/zk/ZKSubscription.java diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/ChangeListener.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/ChangeListener.java deleted file mode 100644 index 717c13e4ce..0000000000 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/ChangeListener.java +++ /dev/null @@ -1,85 +0,0 @@ -package org.zalando.nakadi.service.subscription.zk; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.zalando.nakadi.exceptions.NakadiRuntimeException; - -import java.util.concurrent.atomic.AtomicBoolean; - -public abstract class ChangeListener implements ZKSubscription, Watcher { - protected final CuratorFramework curatorFramework; - protected final String key; - private final Runnable listener; - private final AtomicBoolean cancelled = new AtomicBoolean(); - private final AtomicBoolean registered = new AtomicBoolean(); - - protected ChangeListener(final CuratorFramework curatorFramework, final String key, final Runnable listener) { - this.curatorFramework = curatorFramework; - this.key = key; - this.listener = listener; - refresh(); - } - - @Override - public void process(final WatchedEvent event) { - registered.set(false); - if (!cancelled.get()) { - listener.run(); - } - } - - protected abstract void setInternal() throws Exception; - - @Override - public void refresh() { - if (!cancelled.get() && !registered.get()) { - try { - registered.set(true); - setInternal(); - } catch (final Exception e) { - throw new NakadiRuntimeException(e); - } - } - } - - @Override - public void cancel() { - this.cancelled.set(true); - } - - public static ChangeListener forChildren( - final CuratorFramework curatorFramework, final String key, final Runnable listener) { - return new ChildrenListener(curatorFramework, key, listener); - } - - public static ChangeListener forData( - final CuratorFramework curatorFramework, final String key, final Runnable listener) { - return new ValueListener(curatorFramework, key, listener); - } - - private static class ChildrenListener extends ChangeListener { - - ChildrenListener(final CuratorFramework curatorFramework, final String key, final Runnable listener) { - super(curatorFramework, key, listener); - } - - @Override - protected void setInternal() throws Exception { - curatorFramework.getChildren().usingWatcher(this).forPath(key); - } - } - - private static class ValueListener extends ChangeListener { - - ValueListener(final CuratorFramework curatorFramework, final String key, final Runnable listener) { - super(curatorFramework, key, listener); - } - - @Override - protected void setInternal() throws Exception { - curatorFramework.getData().usingWatcher(this).forPath(key); - } - } - -} diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZKSubscription.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZKSubscription.java deleted file mode 100644 index 02092af228..0000000000 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZKSubscription.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.zalando.nakadi.service.subscription.zk; - -public interface ZKSubscription { - void refresh(); - - void cancel(); -} diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java index e05dd8404a..fb22d10287 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java @@ -134,7 +134,7 @@ void transfer(String sessionId, Collection partitions) * Subscribes to cursor reset event. * * @param listener callback which is called when cursor reset happens - * @return {@link org.zalando.nakadi.service.subscription.zk.ZKSubscription} + * @return {@link Closeable} */ Closeable subscribeForCursorsReset(Runnable listener) throws NakadiRuntimeException, UnsupportedOperationException; From 62805e2186585d4fc9d8aa02ac46affe1e29f91f Mon Sep 17 00:00:00 2001 From: dsorokin Date: Tue, 24 Oct 2017 14:54:33 +0200 Subject: [PATCH 6/8] ARUHA-1255 Use correct naming for newly-created classes --- .../service/subscription/StreamingContext.java | 4 ++-- .../service/subscription/state/ClosingState.java | 8 ++++---- .../service/subscription/state/PartitionData.java | 10 +++++----- .../service/subscription/state/StreamingState.java | 6 +++--- .../zk/AbstractZkSubscriptionClient.java | 11 ++++++----- .../subscription/zk/NewZkSubscriptionClient.java | 4 ++-- .../zk/{ZkSubscr.java => ZkSubscription.java} | 2 +- .../subscription/zk/ZkSubscriptionClient.java | 7 ++++--- .../{ZkSubscrImpl.java => ZkSubscriptionImpl.java} | 12 ++++++------ .../subscription/state/StreamingStateTest.java | 6 +++--- 10 files changed, 36 insertions(+), 34 deletions(-) rename src/main/java/org/zalando/nakadi/service/subscription/zk/{ZkSubscr.java => ZkSubscription.java} (80%) rename src/main/java/org/zalando/nakadi/service/subscription/zk/{ZkSubscrImpl.java => ZkSubscriptionImpl.java} (91%) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java b/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java index 0698b6bc54..4acb6297a2 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java @@ -21,7 +21,7 @@ import org.zalando.nakadi.service.subscription.state.DummyState; import org.zalando.nakadi.service.subscription.state.StartingState; import org.zalando.nakadi.service.subscription.state.State; -import org.zalando.nakadi.service.subscription.zk.ZkSubscr; +import org.zalando.nakadi.service.subscription.zk.ZkSubscription; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; import org.zalando.nakadi.service.timeline.TimelineService; @@ -61,7 +61,7 @@ public class StreamingContext implements SubscriptionStreamer { private final EventTypeChangeListener eventTypeChangeListener; private State currentState = new DummyState(); - private ZkSubscr> sessionListSubscription; + private ZkSubscription> sessionListSubscription; private Closeable authorizationCheckSubscription; private final Logger log; diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/ClosingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/ClosingState.java index 2c5cd87f0b..c6af8ed8a2 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/ClosingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/ClosingState.java @@ -5,7 +5,7 @@ import org.zalando.nakadi.exceptions.NakadiRuntimeException; import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1; import org.zalando.nakadi.service.subscription.model.Partition; -import org.zalando.nakadi.service.subscription.zk.ZkSubscr; +import org.zalando.nakadi.service.subscription.zk.ZkSubscription; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; @@ -24,8 +24,8 @@ class ClosingState extends State { private final Supplier> uncommittedOffsetsSupplier; private final LongSupplier lastCommitSupplier; private Map uncommittedOffsets; - private final Map> listeners = new HashMap<>(); - private ZkSubscr topologyListener; + private final Map> listeners = new HashMap<>(); + private ZkSubscription topologyListener; ClosingState(final Supplier> uncommittedOffsetsSupplier, final LongSupplier lastCommitSupplier) { @@ -145,7 +145,7 @@ private void freePartitions(final Collection keys) { RuntimeException exceptionCaught = null; for (final EventTypePartition partitionKey : keys) { uncommittedOffsets.remove(partitionKey); - final ZkSubscr listener = listeners.remove(partitionKey); + final ZkSubscription listener = listeners.remove(partitionKey); if (null != listener) { try { listener.close(); diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/PartitionData.java b/src/main/java/org/zalando/nakadi/service/subscription/state/PartitionData.java index 876dc4fd0a..3e34d9e249 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/PartitionData.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/PartitionData.java @@ -5,7 +5,7 @@ import org.slf4j.LoggerFactory; import org.zalando.nakadi.domain.ConsumedEvent; import org.zalando.nakadi.domain.NakadiCursor; -import org.zalando.nakadi.service.subscription.zk.ZkSubscr; +import org.zalando.nakadi.service.subscription.zk.ZkSubscription; import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; import javax.annotation.Nullable; @@ -17,7 +17,7 @@ import java.util.TreeSet; class PartitionData { - private final ZkSubscr subscription; + private final ZkSubscription subscription; private final List nakadiEvents = new LinkedList<>(); private final NavigableSet allCursorsOrdered = new TreeSet<>(); private final Logger log; @@ -28,13 +28,13 @@ class PartitionData { private int keepAliveInARow; @VisibleForTesting - PartitionData(final ZkSubscr subscription, final NakadiCursor commitOffset, + PartitionData(final ZkSubscription subscription, final NakadiCursor commitOffset, final long currentTime) { this(subscription, commitOffset, LoggerFactory.getLogger(PartitionData.class), currentTime); } PartitionData( - final ZkSubscr subscription, + final ZkSubscription subscription, final NakadiCursor commitOffset, final Logger log, final long currentTime) { @@ -163,7 +163,7 @@ int getUnconfirmed() { return allCursorsOrdered.headSet(sentOffset, true).size(); } - public ZkSubscr getSubscription() { + public ZkSubscription getSubscription() { return subscription; } } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java index fe6ade46e3..7a0d16bfaf 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java @@ -17,7 +17,7 @@ import org.zalando.nakadi.metrics.MetricUtils; import org.zalando.nakadi.repository.EventConsumer; import org.zalando.nakadi.service.subscription.model.Partition; -import org.zalando.nakadi.service.subscription.zk.ZkSubscr; +import org.zalando.nakadi.service.subscription.zk.ZkSubscription; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; import org.zalando.nakadi.view.SubscriptionCursor; import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; @@ -45,7 +45,7 @@ class StreamingState extends State { // The reasons for that if there are two partitions (p0, p1) and p0 is reassigned, if p1 is working // correctly, and p0 is not receiving any updates - reassignment won't complete. private final Map releasingPartitions = new HashMap<>(); - private ZkSubscr topologyChangeSubscription; + private ZkSubscription topologyChangeSubscription; private EventConsumer.ReassignableEventConsumer eventConsumer; private boolean pollPaused; private long committedEvents; @@ -440,7 +440,7 @@ private NakadiCursor createNakadiCursor(final SubscriptionCursorWithoutToken cur private void addToStreaming(final Partition partition) { final NakadiCursor cursor = createNakadiCursor(getZk().getOffset(partition.getKey())); getLog().info("Adding to streaming {} with start position {}", partition.getKey(), cursor); - final ZkSubscr subscription = getZk().subscribeForOffsetChanges( + final ZkSubscription subscription = getZk().subscribeForOffsetChanges( partition.getKey(), () -> addTask(() -> offsetChanged(partition.getKey()))); final PartitionData pd = new PartitionData( diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java index 67736afa9f..734450c3ea 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java @@ -247,11 +247,11 @@ public final Closeable subscribeForCursorsReset(final Runnable listener) } @Override - public ZkSubscr subscribeForOffsetChanges( + public ZkSubscription subscribeForOffsetChanges( final EventTypePartition key, final Runnable commitListener) { final String path = getOffsetPath(key); getLog().info("subscribeForOffsetChanges: {}, path: {}", key, path); - return new ZkSubscrImpl.ZkSubscrValueImpl<>( + return new ZkSubscriptionImpl.ZkSubscriptionValueImpl<>( getCurator(), commitListener, data -> new SubscriptionCursorWithoutToken( @@ -263,7 +263,7 @@ public ZkSubscr subscribeForOffsetChanges( public final void resetCursors(final List cursors, final long timeout) throws OperationTimeoutException, ZookeeperException, OperationInterruptedException, RequestInProgressException { - ZkSubscr> sessionsListener = null; + ZkSubscription> sessionsListener = null; boolean resetWasAlreadyInitiated = false; try { // close subscription connections @@ -318,9 +318,10 @@ public final void resetCursors(final List cursor } @Override - public final ZkSubscr> subscribeForSessionListChanges(final Runnable listener) throws Exception { + public final ZkSubscription> subscribeForSessionListChanges(final Runnable listener) throws Exception { getLog().info("subscribeForSessionListChanges: " + listener.hashCode()); - return new ZkSubscrImpl.ZkSubscrChildrenImpl(getCurator(), listener, getSubscriptionPath("/sessions")); + return new ZkSubscriptionImpl.ZkSubscriptionChildrenImpl( + getCurator(), listener, getSubscriptionPath("/sessions")); } @Override diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java index 3eba4fa154..da529a1f81 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java @@ -120,10 +120,10 @@ private Topology parseTopology(final byte[] data) { } @Override - public final ZkSubscr subscribeForTopologyChanges(final Runnable onTopologyChanged) + public final ZkSubscription subscribeForTopologyChanges(final Runnable onTopologyChanged) throws NakadiRuntimeException { getLog().info("subscribeForTopologyChanges"); - return new ZkSubscrImpl.ZkSubscrValueImpl<>( + return new ZkSubscriptionImpl.ZkSubscriptionValueImpl<>( getCurator(), onTopologyChanged, this::parseTopology, diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscr.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscription.java similarity index 80% rename from src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscr.java rename to src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscription.java index 709d0d0fc0..6fdd761592 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscr.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscription.java @@ -4,7 +4,7 @@ import java.io.Closeable; -public interface ZkSubscr extends Closeable { +public interface ZkSubscription extends Closeable { T getData() throws NakadiRuntimeException; diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java index fb22d10287..863a5de983 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java @@ -79,7 +79,7 @@ void updatePartitionsConfiguration(Partition[] partitions) throws NakadiRuntimeE * * @param listener method to call on any change of client list. */ - ZkSubscr> subscribeForSessionListChanges(Runnable listener) throws Exception; + ZkSubscription> subscribeForSessionListChanges(Runnable listener) throws Exception; /** * Subscribe for topology changes. @@ -87,9 +87,10 @@ void updatePartitionsConfiguration(Partition[] partitions) throws NakadiRuntimeE * @param listener called whenever /nakadi/subscriptions/{subscriptionId}/topology node is changed. * @return Subscription instance */ - ZkSubscr subscribeForTopologyChanges(Runnable listener) throws NakadiRuntimeException; + ZkSubscription subscribeForTopologyChanges(Runnable listener) throws NakadiRuntimeException; - ZkSubscr subscribeForOffsetChanges(EventTypePartition key, Runnable commitListener); + ZkSubscription subscribeForOffsetChanges( + EventTypePartition key, Runnable commitListener); /** * Returns current offset value for specified partition key. Offset includes timeline and version data. diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscrImpl.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionImpl.java similarity index 91% rename from src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscrImpl.java rename to src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionImpl.java index 37ce2f291c..58727ced4a 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscrImpl.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionImpl.java @@ -11,7 +11,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; -public abstract class ZkSubscrImpl implements ZkSubscr, Watcher { +public abstract class ZkSubscriptionImpl implements ZkSubscription, Watcher { protected final CuratorFramework curatorFramework; protected final String key; private final AtomicReference listener; @@ -40,7 +40,7 @@ public T get() throws NakadiRuntimeException { } } - public ZkSubscrImpl( + public ZkSubscriptionImpl( final CuratorFramework curatorFramework, final Runnable listener, final Function converter, @@ -85,9 +85,9 @@ public void process(final WatchedEvent event) { } } - public static class ZkSubscrValueImpl extends ZkSubscrImpl { + public static class ZkSubscriptionValueImpl extends ZkSubscriptionImpl { - public ZkSubscrValueImpl( + public ZkSubscriptionValueImpl( final CuratorFramework curatorFramework, final Runnable listener, final Function converter, @@ -111,9 +111,9 @@ protected byte[] query(final boolean setListener) throws NakadiRuntimeException } } - public static class ZkSubscrChildrenImpl extends ZkSubscrImpl, List> { + public static class ZkSubscriptionChildrenImpl extends ZkSubscriptionImpl, List> { - public ZkSubscrChildrenImpl( + public ZkSubscriptionChildrenImpl( final CuratorFramework curatorFramework, final Runnable listener, final String key) throws NakadiRuntimeException { diff --git a/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java b/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java index c5d69ada5a..572fb57727 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/state/StreamingStateTest.java @@ -19,7 +19,7 @@ import org.zalando.nakadi.service.subscription.StreamParameters; import org.zalando.nakadi.service.subscription.StreamingContext; import org.zalando.nakadi.service.subscription.model.Partition; -import org.zalando.nakadi.service.subscription.zk.ZkSubscr; +import org.zalando.nakadi.service.subscription.zk.ZkSubscription; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; import org.zalando.nakadi.service.timeline.TimelineService; import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; @@ -85,7 +85,7 @@ public void prepareMocks() throws Exception { @Test public void ensureTopologyEventListenerRegisteredRefreshedClosed() { - final ZkSubscr topologySubscription = mock(ZkSubscr.class); + final ZkSubscription topologySubscription = mock(ZkSubscription.class); Mockito.when(topologySubscription.getData()) .thenReturn(new ZkSubscriptionClient.Topology(new Partition[]{}, 1)); Mockito.when(zkMock.subscribeForTopologyChanges(Mockito.anyObject())).thenReturn(topologySubscription); @@ -111,7 +111,7 @@ public void ensureTopologyEventListenerRegisteredRefreshedClosed() { public void ensureOffsetsSubscriptionsAreRefreshedAndClosed() throws InternalNakadiException, NoSuchEventTypeException, ServiceUnavailableException, InvalidCursorException { - final ZkSubscr offsetSubscription = mock(ZkSubscr.class); + final ZkSubscription offsetSubscription = mock(ZkSubscription.class); final EventTypePartition pk = new EventTypePartition("t", "0"); Mockito.when(zkMock.subscribeForOffsetChanges(Mockito.eq(pk), Mockito.any())).thenReturn(offsetSubscription); From 588acb38be8dd22ad6f1e261492df9ecfd984e62 Mon Sep 17 00:00:00 2001 From: dsorokin Date: Tue, 24 Oct 2017 14:55:46 +0200 Subject: [PATCH 7/8] ARUHA-1255 Update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 35279c8b1f..65923914cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] ### Fixed +- Optimized reactions on offset change while streaming subscription - Committing with empty X-Nakadi-StreamId causes 503 ## [2.2.2] - 2017-09-28 From 7911af4dac40319f9acfc648cfe24ff6a1bd8f1b Mon Sep 17 00:00:00 2001 From: dsorokin Date: Thu, 26 Oct 2017 09:57:41 +0200 Subject: [PATCH 8/8] ARUHA-1255 Fixes after review --- .../subscription/zk/ZkSubscriptionImpl.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionImpl.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionImpl.java index 58727ced4a..529919eadb 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionImpl.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionImpl.java @@ -8,14 +8,13 @@ import org.zalando.nakadi.exceptions.NakadiRuntimeException; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; public abstract class ZkSubscriptionImpl implements ZkSubscription, Watcher { protected final CuratorFramework curatorFramework; protected final String key; - private final AtomicReference listener; - private final AtomicReference> data = new AtomicReference<>(); + private volatile Runnable listener; + private volatile ExceptionOrData data; private final Function converter; private static class ExceptionOrData { @@ -45,29 +44,30 @@ public ZkSubscriptionImpl( final Runnable listener, final Function converter, final String key) { - this.listener = new AtomicReference<>(listener); + this.listener = listener; this.curatorFramework = curatorFramework; this.key = key; this.converter = converter; + this.data = null; } @Override public ReturnType getData() throws NakadiRuntimeException { - if (data.get() == null) { // If there is new value pending + if (data == null) { // If there is new value pending try { // create listener only in case if subscription is still active. - final ZkType zkData = query(null != listener.get()); - data.set(new ExceptionOrData<>(converter.apply(zkData))); + final ZkType zkData = query(null != listener); + data = new ExceptionOrData<>(converter.apply(zkData)); } catch (NakadiRuntimeException ex) { - data.set(new ExceptionOrData<>(ex)); + data = new ExceptionOrData<>(ex); } } - return data.get().get(); + return data.get(); } @Override public void close() { - listener.set(null); + listener = null; } protected abstract ZkType query(boolean createListener) throws NakadiRuntimeException; @@ -77,8 +77,8 @@ public void process(final WatchedEvent event) { // on this call one actually notifies that data has changed and waits for refresh call. // The reason for that is that sometimes it is not possible to query data from zk while being called from // notification callback. - data.set(null); - final Runnable toNotify = listener.get(); + data = null; + final Runnable toNotify = listener; // In case if subscription is still active - notify if (null != toNotify) { toNotify.run();