From f9326a07d7535eabefc00ded98bd0976ec1b87dd Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Mon, 4 Mar 2019 09:53:53 -0500 Subject: [PATCH] Clean up MessageDispatcher by changing processOutstandingBatches to explicitly loop instead of while(true) with breaks. There is now only 1 explicit return and 1 runtime error. --- .../cloud/pubsub/v1/MessageDispatcher.java | 58 ++++++++----------- 1 file changed, 24 insertions(+), 34 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index ce130d167ff8..509d71ae5aeb 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -371,41 +371,31 @@ public void processReceivedMessages(List messages, Runnable don processOutstandingBatches(); } - public void processOutstandingBatches() { - while (true) { - boolean batchDone = false; - Runnable batchCallback = null; - OutstandingMessage outstandingMessage; - synchronized (outstandingMessageBatches) { - OutstandingMessageBatch nextBatch = outstandingMessageBatches.peek(); - if (nextBatch == null) { - return; - } - outstandingMessage = nextBatch.messages.peek(); - if (outstandingMessage == null) { - return; - } - try { - // This is a non-blocking flow controller. - flowController.reserve( - 1, outstandingMessage.receivedMessage().getMessage().getSerializedSize()); - } catch (FlowController.MaxOutstandingElementCountReachedException - | FlowController.MaxOutstandingRequestBytesReachedException flowControlException) { - return; - } catch (FlowControlException unexpectedException) { - throw new IllegalStateException("Flow control unexpected exception", unexpectedException); - } - nextBatch.messages.poll(); // We got a hold to the message already. - batchDone = nextBatch.messages.isEmpty(); - if (batchDone) { - outstandingMessageBatches.poll(); - batchCallback = nextBatch.doneCallback; + private void processOutstandingBatches() { + synchronized (outstandingMessageBatches) { + for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll(); + nextBatch != null; + nextBatch = outstandingMessageBatches.poll()) { + for (OutstandingMessage nextMessage = nextBatch.messages.poll(); + nextMessage != null; + nextMessage = nextBatch.messages.poll()) { + try { + // This is a non-blocking flow controller. + flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize()); + } catch (FlowController.MaxOutstandingElementCountReachedException + | FlowController.MaxOutstandingRequestBytesReachedException flowControlException) { + // Unwind previous changes in the batches outstanding. + nextBatch.messages.addFirst(nextMessage); + outstandingMessageBatches.addFirst(nextBatch); + return; + } catch (FlowControlException unexpectedException) { + throw new IllegalStateException( + "Flow control unexpected exception", unexpectedException); + } + processOutstandingMessage( + nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler); } - } - processOutstandingMessage( - outstandingMessage.receivedMessage.getMessage(), outstandingMessage.ackHandler); - if (batchDone) { - batchCallback.run(); + nextBatch.doneCallback.run(); } } }