Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #766 from zalando/ARUHA-1255
Browse files Browse the repository at this point in the history
ARUHA-1255 Optimize subscription reaction on changed offsets
  • Loading branch information
antban authored Nov 2, 2017
2 parents c2f553f + 7911af4 commit 224554f
Show file tree
Hide file tree
Showing 14 changed files with 337 additions and 250 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
## [2.2.4] - 2017-10-13

### Fixed
- Optimized reactions on offset change while streaming subscription
- Committing with empty X-Nakadi-StreamId causes 503

### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
import org.zalando.nakadi.service.subscription.state.DummyState;
import org.zalando.nakadi.service.subscription.state.StartingState;
import org.zalando.nakadi.service.subscription.state.State;
import org.zalando.nakadi.service.subscription.zk.ZKSubscription;
import org.zalando.nakadi.service.subscription.zk.ZkSubscription;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient;
import org.zalando.nakadi.service.timeline.TimelineService;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -60,7 +61,7 @@ public class StreamingContext implements SubscriptionStreamer {
private final EventTypeChangeListener eventTypeChangeListener;

private State currentState = new DummyState();
private ZKSubscription clientListChanges;
private ZkSubscription<List<String>> sessionListSubscription;
private Closeable authorizationCheckSubscription;

private final Logger log;
Expand Down Expand Up @@ -184,20 +185,20 @@ public void switchState(final State newState) {
});
}

public void registerSession() {
public void registerSession() throws Exception {
log.info("Registering session {}", session);
// Install rebalance hook on client list change.
clientListChanges = zkClient.subscribeForSessionListChanges(() -> addTask(this::rebalance));
sessionListSubscription = zkClient.subscribeForSessionListChanges(() -> addTask(this::rebalance));
zkClient.registerSession(session);
}

public void unregisterSession() {
log.info("Unregistering session {}", session);
if (null != clientListChanges) {
if (null != sessionListSubscription) {
try {
clientListChanges.cancel();
sessionListSubscription.close();
} finally {
this.clientListChanges = null;
this.sessionListSubscription = null;
zkClient.unregisterSession(session);
}
}
Expand Down Expand Up @@ -234,8 +235,9 @@ public ObjectMapper getObjectMapper() {
}

private void rebalance() {
if (null != clientListChanges) {
clientListChanges.refresh();
if (null != sessionListSubscription) {
// This call is needed to renew subscription for session list changes.
sessionListSubscription.getData();
zkClient.runLocked(() -> {
final Partition[] changeset = rebalancer.apply(zkClient.listSessions(), zkClient.listPartitions());
if (changeset.length > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import org.zalando.nakadi.exceptions.NakadiRuntimeException;
import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1;
import org.zalando.nakadi.service.subscription.model.Partition;
import org.zalando.nakadi.service.subscription.zk.ZKSubscription;
import org.zalando.nakadi.service.subscription.zk.ZkSubscription;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient;
import org.zalando.nakadi.view.SubscriptionCursorWithoutToken;

import java.util.Collection;
import java.util.Collections;
Expand All @@ -22,8 +24,8 @@ class ClosingState extends State {
private final Supplier<Map<EventTypePartition, NakadiCursor>> uncommittedOffsetsSupplier;
private final LongSupplier lastCommitSupplier;
private Map<EventTypePartition, NakadiCursor> uncommittedOffsets;
private final Map<EventTypePartition, ZKSubscription> listeners = new HashMap<>();
private ZKSubscription topologyListener;
private final Map<EventTypePartition, ZkSubscription<SubscriptionCursorWithoutToken>> listeners = new HashMap<>();
private ZkSubscription<ZkSubscriptionClient.Topology> topologyListener;

ClosingState(final Supplier<Map<EventTypePartition, NakadiCursor>> uncommittedOffsetsSupplier,
final LongSupplier lastCommitSupplier) {
Expand All @@ -41,7 +43,7 @@ public void onExit() {
} finally {
if (null != topologyListener) {
try {
topologyListener.cancel();
topologyListener.close();
} finally {
topologyListener = null;
}
Expand All @@ -56,7 +58,12 @@ public void onEnter() {
uncommittedOffsets = uncommittedOffsetsSupplier.get();
if (!uncommittedOffsets.isEmpty() && timeToWaitMillis > 0) {
scheduleTask(() -> switchState(new CleanupState()), timeToWaitMillis, TimeUnit.MILLISECONDS);
topologyListener = getZk().subscribeForTopologyChanges(() -> addTask(this::onTopologyChanged));
try {
topologyListener = getZk().subscribeForTopologyChanges(() -> addTask(this::onTopologyChanged));
} catch (final Exception e) {
switchState(new CleanupState(e));
return;
}
reactOnTopologyChange();
} else {
switchState(new CleanupState());
Expand All @@ -68,19 +75,19 @@ private void onTopologyChanged() {
throw new IllegalStateException(
"topologyListener should not be null when calling onTopologyChanged method");
}
topologyListener.refresh();
reactOnTopologyChange();
}

private void reactOnTopologyChange() {
private void reactOnTopologyChange() throws NakadiRuntimeException {
final ZkSubscriptionClient.Topology topology = topologyListener.getData();

// Collect current partitions state from Zk
final Map<EventTypePartition, Partition> partitions = new HashMap<>();
getZk().runLocked(() -> Stream.of(getZk().listPartitions())
Stream.of(topology.getPartitions())
.filter(p -> getSessionId().equals(p.getSession()))
.forEach(p -> partitions.put(p.getKey(), p)));
.forEach(p -> partitions.put(p.getKey(), p));

// Select which partitions need to be freed from this session
// Ithere
final Set<EventTypePartition> freeRightNow = new HashSet<>();
final Set<EventTypePartition> addListeners = new HashSet<>();
for (final Partition p : partitions.values()) {
Expand Down Expand Up @@ -108,21 +115,17 @@ private void registerListener(final EventTypePartition key) {
listeners.put(
key,
getZk().subscribeForOffsetChanges(
key, () -> addTask(() -> this.offsetChanged(key))));
reactOnOffset(key);
}

private void offsetChanged(final EventTypePartition key) {
if (listeners.containsKey(key)) {
listeners.get(key).refresh();
}
key, () -> addTask(() -> this.reactOnOffset(key))));
reactOnOffset(key);
}

private void reactOnOffset(final EventTypePartition key) {
if (!listeners.containsKey(key)) {
return;
}
final NakadiCursor newCursor;
try {
newCursor = getContext().getCursorConverter().convert(key.getEventType(), getZk().getOffset(key));
newCursor = getContext().getCursorConverter().convert(key.getEventType(), listeners.get(key).getData());
} catch (Exception ex) {
throw new NakadiRuntimeException(ex);
}
Expand All @@ -142,10 +145,10 @@ private void freePartitions(final Collection<EventTypePartition> keys) {
RuntimeException exceptionCaught = null;
for (final EventTypePartition partitionKey : keys) {
uncommittedOffsets.remove(partitionKey);
final ZKSubscription listener = listeners.remove(partitionKey);
final ZkSubscription<SubscriptionCursorWithoutToken> listener = listeners.remove(partitionKey);
if (null != listener) {
try {
listener.cancel();
listener.close();
} catch (final RuntimeException ex) {
exceptionCaught = ex;
getLog().error("Failed to cancel offsets listener {}", listener, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import org.slf4j.LoggerFactory;
import org.zalando.nakadi.domain.ConsumedEvent;
import org.zalando.nakadi.domain.NakadiCursor;
import org.zalando.nakadi.service.subscription.zk.ZKSubscription;
import org.zalando.nakadi.service.subscription.zk.ZkSubscription;
import org.zalando.nakadi.view.SubscriptionCursorWithoutToken;

import javax.annotation.Nullable;
import java.util.ArrayList;
Expand All @@ -16,7 +17,7 @@
import java.util.TreeSet;

class PartitionData {
private final ZKSubscription subscription;
private final ZkSubscription<SubscriptionCursorWithoutToken> subscription;
private final List<ConsumedEvent> nakadiEvents = new LinkedList<>();
private final NavigableSet<NakadiCursor> allCursorsOrdered = new TreeSet<>();
private final Logger log;
Expand All @@ -27,12 +28,13 @@ class PartitionData {
private int keepAliveInARow;

@VisibleForTesting
PartitionData(final ZKSubscription subscription, final NakadiCursor commitOffset, final long currentTime) {
PartitionData(final ZkSubscription<SubscriptionCursorWithoutToken> subscription, final NakadiCursor commitOffset,
final long currentTime) {
this(subscription, commitOffset, LoggerFactory.getLogger(PartitionData.class), currentTime);
}

PartitionData(
final ZKSubscription subscription,
final ZkSubscription<SubscriptionCursorWithoutToken> subscription,
final NakadiCursor commitOffset,
final Logger log,
final long currentTime) {
Expand Down Expand Up @@ -161,7 +163,7 @@ int getUnconfirmed() {
return allCursorsOrdered.headSet(sentOffset, true).size();
}

public ZKSubscription getSubscription() {
public ZkSubscription<SubscriptionCursorWithoutToken> getSubscription() {
return subscription;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,12 @@ protected Response.StatusType getStatus() {
return;
}

getContext().registerSession();
try {
getContext().registerSession();
} catch (Exception ex) {
switchState(new CleanupState(ex));
return;
}

try {
getOut().onInitialized(getSessionId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import org.zalando.nakadi.metrics.MetricUtils;
import org.zalando.nakadi.repository.EventConsumer;
import org.zalando.nakadi.service.subscription.model.Partition;
import org.zalando.nakadi.service.subscription.zk.ZKSubscription;
import org.zalando.nakadi.service.subscription.zk.ZkSubscription;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient;
import org.zalando.nakadi.view.SubscriptionCursor;
import org.zalando.nakadi.view.SubscriptionCursorWithoutToken;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -43,7 +45,7 @@ class StreamingState extends State {
// The reasons for that if there are two partitions (p0, p1) and p0 is reassigned, if p1 is working
// correctly, and p0 is not receiving any updates - reassignment won't complete.
private final Map<EventTypePartition, Long> releasingPartitions = new HashMap<>();
private ZKSubscription topologyChangeSubscription;
private ZkSubscription<ZkSubscriptionClient.Topology> topologyChangeSubscription;
private EventConsumer.ReassignableEventConsumer eventConsumer;
private boolean pollPaused;
private long committedEvents;
Expand All @@ -52,7 +54,7 @@ class StreamingState extends State {
private Meter bytesSentMeter;
// Uncommitted offsets are calculated right on exiting from Streaming state.
private Map<EventTypePartition, NakadiCursor> uncommittedOffsets;
private ZKSubscription cursorResetSubscription;
private Closeable cursorResetSubscription;

/**
* Time that is used for commit timeout check. Commit timeout check is working only in case when there is something
Expand All @@ -74,7 +76,8 @@ public void onEnter() {
this.eventConsumer = getContext().getTimelineService().createEventConsumer(null);

// Subscribe for topology changes.
this.topologyChangeSubscription = getZk().subscribeForTopologyChanges(() -> addTask(this::topologyChanged));
this.topologyChangeSubscription =
getZk().subscribeForTopologyChanges(() -> addTask(this::reactOnTopologyChange));
// and call directly
reactOnTopologyChange();
addTask(this::pollDataFromKafka);
Expand Down Expand Up @@ -261,7 +264,7 @@ public void onExit() {

if (null != topologyChangeSubscription) {
try {
topologyChangeSubscription.cancel();
topologyChangeSubscription.close();
} catch (final RuntimeException ex) {
getLog().warn("Failed to cancel topology subscription", ex);
} finally {
Expand All @@ -280,25 +283,23 @@ public void onExit() {
}

if (cursorResetSubscription != null) {
cursorResetSubscription.cancel();
try {
cursorResetSubscription.close();
} catch (IOException ignore) {
}
cursorResetSubscription = null;
}
}

void topologyChanged() {
if (null != topologyChangeSubscription) {
topologyChangeSubscription.refresh();
void reactOnTopologyChange() {
if (null == topologyChangeSubscription) {
return;
}
reactOnTopologyChange();
}

private void reactOnTopologyChange() {
getZk().runLocked(() -> {
final Partition[] assignedPartitions = Stream.of(getZk().listPartitions())
.filter(p -> getSessionId().equals(p.getSession()))
.toArray(Partition[]::new);
addTask(() -> refreshTopologyUnlocked(assignedPartitions));
});
final ZkSubscriptionClient.Topology topology = topologyChangeSubscription.getData();
final Partition[] assignedPartitions = Stream.of(topology.getPartitions())
.filter(p -> getSessionId().equals(p.getSession()))
.toArray(Partition[]::new);
addTask(() -> refreshTopologyUnlocked(assignedPartitions));
}

void refreshTopologyUnlocked(final Partition[] assignedPartitions) {
Expand Down Expand Up @@ -439,7 +440,7 @@ private NakadiCursor createNakadiCursor(final SubscriptionCursorWithoutToken cur
private void addToStreaming(final Partition partition) {
final NakadiCursor cursor = createNakadiCursor(getZk().getOffset(partition.getKey()));
getLog().info("Adding to streaming {} with start position {}", partition.getKey(), cursor);
final ZKSubscription subscription = getZk().subscribeForOffsetChanges(
final ZkSubscription<SubscriptionCursorWithoutToken> subscription = getZk().subscribeForOffsetChanges(
partition.getKey(),
() -> addTask(() -> offsetChanged(partition.getKey())));
final PartitionData pd = new PartitionData(
Expand Down Expand Up @@ -467,16 +468,15 @@ private void reassignCommitted() {
void offsetChanged(final EventTypePartition key) {
if (offsets.containsKey(key)) {
final PartitionData data = offsets.get(key);
data.getSubscription().refresh();

final NakadiCursor cursor = createNakadiCursor(getZk().getOffset(key));
final NakadiCursor cursor = createNakadiCursor(data.getSubscription().getData());

final PartitionData.CommitResult commitResult = data.onCommitOffset(cursor);
if (commitResult.seekOnKafka) {
reconfigureKafkaConsumer(true);
}

if (commitResult.committedCount > 0) {
if (commitResult.committedCount > 0) {
committedEvents += commitResult.committedCount;
this.lastCommitMillis = System.currentTimeMillis();
streamToOutput();
Expand All @@ -503,7 +503,7 @@ private void removeFromStreaming(final EventTypePartition key) {
getLog().warn("Skipping commits: {}, commit={}, sent={}",
key, data.getCommitOffset(), data.getSentOffset());
}
data.getSubscription().cancel();
data.getSubscription().close();
} catch (final RuntimeException ex) {
getLog().warn("Failed to cancel subscription, skipping exception", ex);
}
Expand Down
Loading

0 comments on commit 224554f

Please sign in to comment.