Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up MessageDispatcher v2 #4619

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -371,41 +371,31 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
processOutstandingBatches();
}

public void processOutstandingBatches() {
while (true) {
boolean batchDone = false;
Runnable batchCallback = null;
OutstandingMessage outstandingMessage;
synchronized (outstandingMessageBatches) {
OutstandingMessageBatch nextBatch = outstandingMessageBatches.peek();
if (nextBatch == null) {
return;
}
outstandingMessage = nextBatch.messages.peek();
if (outstandingMessage == null) {
return;
}
try {
// This is a non-blocking flow controller.
flowController.reserve(
1, outstandingMessage.receivedMessage().getMessage().getSerializedSize());
} catch (FlowController.MaxOutstandingElementCountReachedException
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
return;
} catch (FlowControlException unexpectedException) {
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
}
nextBatch.messages.poll(); // We got a hold to the message already.
batchDone = nextBatch.messages.isEmpty();
if (batchDone) {
outstandingMessageBatches.poll();
batchCallback = nextBatch.doneCallback;
private void processOutstandingBatches() {
synchronized (outstandingMessageBatches) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you still need this global lock when you are taking element out and putting it back?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't. Thats the very next change but @sduskis has asked me to split up functional from cleanup changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dpcollins-google, thanks for going step-by-step on this.

for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll();
nextBatch != null;
nextBatch = outstandingMessageBatches.poll()) {
for (OutstandingMessage nextMessage = nextBatch.messages.poll();
nextMessage != null;
nextMessage = nextBatch.messages.poll()) {
try {
// This is a non-blocking flow controller.
flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize());
} catch (FlowController.MaxOutstandingElementCountReachedException
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
// Unwind previous changes in the batches outstanding.
nextBatch.messages.addFirst(nextMessage);
outstandingMessageBatches.addFirst(nextBatch);
return;
} catch (FlowControlException unexpectedException) {
throw new IllegalStateException(
"Flow control unexpected exception", unexpectedException);
}
processOutstandingMessage(
nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler);
}
}
processOutstandingMessage(
outstandingMessage.receivedMessage.getMessage(), outstandingMessage.ackHandler);
if (batchDone) {
batchCallback.run();
nextBatch.doneCallback.run();
}
}
}
Expand Down