Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pubsub ordering keys subscriber #4515

Merged
merged 6 commits into from
Feb 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class MessageDispatcher {
@InternalApi static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100);

private final Executor executor;
private final SequentialExecutorService sequentialExecutor;
private final ScheduledExecutorService systemExecutor;
private final ApiClock clock;

Expand Down Expand Up @@ -217,6 +218,7 @@ void sendAckOperations(
jobLock = new ReentrantLock();
messagesWaiter = new MessageWaiter();
this.clock = clock;
this.sequentialExecutor = new SequentialExecutorService(executor);
}

public void start() {
Expand Down Expand Up @@ -401,46 +403,51 @@ public void processOutstandingBatches() {
outstandingMessageBatches.poll();
batchCallback = nextBatch.doneCallback;
}
}

final PubsubMessage message = outstandingMessage.receivedMessage().getMessage();
final AckHandler ackHandler = outstandingMessage.ackHandler();
final SettableApiFuture<AckReply> response = SettableApiFuture.create();
final AckReplyConsumer consumer =
new AckReplyConsumer() {
@Override
public void ack() {
response.set(AckReply.ACK);
}

@Override
public void nack() {
response.set(AckReply.NACK);
}
};
ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor());
executor.execute(
new Runnable() {
@Override
public void run() {
try {
if (ackHandler
.totalExpiration
.plusSeconds(messageDeadlineSeconds.get())
.isBefore(now())) {
// Message expired while waiting. We don't extend these messages anymore,
// so it was probably sent to someone else. Don't work on it.
// Don't nack it either, because we'd be nacking someone else's message.
ackHandler.forget();
return;
}
final PubsubMessage message = outstandingMessage.receivedMessage().getMessage();
final AckHandler ackHandler = outstandingMessage.ackHandler();
final SettableApiFuture<AckReply> response = SettableApiFuture.create();
final AckReplyConsumer consumer =
new AckReplyConsumer() {
@Override
public void ack() {
response.set(AckReply.ACK);
}

receiver.receiveMessage(message, consumer);
} catch (Exception e) {
response.setException(e);
@Override
public void nack() {
response.set(AckReply.NACK);
}
}
});
};
ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor());
Runnable deliverMessageTask =
new Runnable() {
@Override
public void run() {
try {
if (ackHandler
.totalExpiration
.plusSeconds(messageDeadlineSeconds.get())
.isBefore(now())) {
// Message expired while waiting. We don't extend these messages anymore,
// so it was probably sent to someone else. Don't work on it.
// Don't nack it either, because we'd be nacking someone else's message.
ackHandler.forget();
return;
}

receiver.receiveMessage(message, consumer);
} catch (Exception e) {
response.setException(e);
}
}
};
if (message.getOrderingKey().isEmpty()) {
executor.execute(deliverMessageTask);
} else {
sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask);
}
}
if (batchDone) {
batchCallback.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import com.google.pubsub.v1.ReceivedMessage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -53,6 +55,7 @@ public void run() {

private MessageDispatcher dispatcher;
private LinkedBlockingQueue<AckReplyConsumer> consumers;
private Map<String, List<ByteString>> messagesByOrderingKey;
private List<String> sentAcks;
private List<ModAckItem> sentModAcks;
private FakeClock clock;
Expand All @@ -72,13 +75,20 @@ static ModAckItem of(String ackId, int seconds) {
@Before
public void setUp() {
consumers = new LinkedBlockingQueue<>();
messagesByOrderingKey = new HashMap<>();
sentAcks = new ArrayList<>();
sentModAcks = new ArrayList<>();

MessageReceiver receiver =
new MessageReceiver() {
@Override
public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
List<ByteString> messages = messagesByOrderingKey.get(message.getOrderingKey());
if (messages == null) {
messages = new ArrayList<>();
messagesByOrderingKey.put(message.getOrderingKey(), messages);
}
messages.add(message.getData());
consumers.add(consumer);
}
};
Expand Down Expand Up @@ -205,4 +215,47 @@ public void testDeadlineAdjustment() throws Exception {

assertThat(dispatcher.computeDeadlineSeconds()).isEqualTo(42);
}

private ReceivedMessage newReceivedMessage(String ackId, String orderingKey, String data) {
return ReceivedMessage.newBuilder()
.setAckId(ackId)
.setMessage(
PubsubMessage.newBuilder()
.setOrderingKey(orderingKey)
.setData(ByteString.copyFromUtf8(data))
.build())
.build();
}

@Test
public void testOrderingKey() throws Exception {
// Create messages with "orderA".
ReceivedMessage message1 = newReceivedMessage("ackId1", "orderA", "m1");
ReceivedMessage message2 = newReceivedMessage("ackId2", "orderA", "m2");
// Create messages with "orderB".
ReceivedMessage message3 = newReceivedMessage("ackId3", "orderB", "m3");
ReceivedMessage message4 = newReceivedMessage("ackId4", "orderB", "m4");
ReceivedMessage message5 = newReceivedMessage("ackId5", "orderB", "m5");

dispatcher.processReceivedMessages(Collections.singletonList(message1), NOOP_RUNNABLE);
consumers.take().ack();
dispatcher.processReceivedMessages(Collections.singletonList(message2), NOOP_RUNNABLE);
consumers.take().ack();
dispatcher.processReceivedMessages(Collections.singletonList(message3), NOOP_RUNNABLE);
consumers.take().ack();
dispatcher.processReceivedMessages(Collections.singletonList(message4), NOOP_RUNNABLE);
consumers.take().ack();
dispatcher.processReceivedMessages(Collections.singletonList(message5), NOOP_RUNNABLE);
consumers.take().ack();

assertThat(messagesByOrderingKey.get("orderA"))
.containsExactly(ByteString.copyFromUtf8("m1"), ByteString.copyFromUtf8("m2"))
.inOrder();
assertThat(messagesByOrderingKey.get("orderB"))
.containsExactly(
ByteString.copyFromUtf8("m3"),
ByteString.copyFromUtf8("m4"),
ByteString.copyFromUtf8("m5"))
.inOrder();
}
}