From e8579905cada35df37d25aab048af84abdcf85f1 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Sat, 29 Jun 2019 17:07:55 +0000 Subject: [PATCH 1/2] Fix publisher throughput when there are no ordering keys. --- .../com/google/cloud/pubsub/v1/Publisher.java | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 3ed22ab5c263..c204d3706acf 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -231,11 +231,12 @@ public ApiFuture publish(PubsubMessage message) { messagesBatches.remove(orderingKey); } setupAlarm(); - if (!batchesToSend.isEmpty()) { - // TODO: if this is not an ordering keys scenario, will this do anything? + // For messages with an ordering key, then we need to publish with messagesBatchLock held in + // order to ensure another publish doesn't slip in and send a batch before these batches we + // already want to send. + if (!batchesToSend.isEmpty() && !orderingKey.isEmpty()) { publishAllWithoutInflight(); - // TODO: if this is an ordering keys scenario, is this safe without messagesBatchLock? for (final OutstandingBatch batch : batchesToSend) { logger.log(Level.FINER, "Scheduling a batch for immediate sending."); publishOutstandingBatch(batch); @@ -246,6 +247,16 @@ public ApiFuture publish(PubsubMessage message) { } messagesWaiter.incrementPendingMessages(1); + + // For messages without ordering keys, it is okay to send batches without holding + // messagesBatchLock. + if (!batchesToSend.isEmpty() && orderingKey.isEmpty()) { + for (final OutstandingBatch batch : batchesToSend) { + logger.log(Level.FINER, "Scheduling a batch for immediate sending."); + publishOutstandingBatch(batch); + } + } + return outstandingPublish.publishResult; } @@ -318,6 +329,7 @@ public void publishAllOutstanding() { * for messages to send, call {@code get} on the futures returned from {@code publish}. */ private void publishAllWithoutInflight() { + OutstandingBatch unorderedOutstandingBatch = null; messagesBatchLock.lock(); try { Iterator> it = messagesBatches.entrySet().iterator(); @@ -327,8 +339,10 @@ private void publishAllWithoutInflight() { String key = entry.getKey(); if (batch.isEmpty()) { it.remove(); - } else if (key.isEmpty() || !sequentialExecutor.hasTasksInflight(key)) { - // TODO: Will this cause a performance problem for non-ordering keys scenarios? + } else if (key.isEmpty()) { + // We will publish the batch with no ordering key outside messagesBatchLock. + unorderedOutstandingBatch = batch.popOutstandingBatch(); + } else if (!sequentialExecutor.hasTasksInflight(key)) { publishOutstandingBatch(batch.popOutstandingBatch()); it.remove(); } @@ -336,6 +350,9 @@ private void publishAllWithoutInflight() { } finally { messagesBatchLock.unlock(); } + if (unorderedOutstandingBatch != null) { + publishOutstandingBatch(unorderedOutstandingBatch); + } } private ApiFuture publishCall(OutstandingBatch outstandingBatch) { From 6bcd0e3837108d111b2e4a7d4eb73791c0d8cb2b Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Mon, 1 Jul 2019 09:17:24 -0400 Subject: [PATCH 2/2] Remove batch from iterator if we are going to process it. --- .../src/main/java/com/google/cloud/pubsub/v1/Publisher.java | 1 + 1 file changed, 1 insertion(+) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index c204d3706acf..fe9d28f421fb 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -342,6 +342,7 @@ private void publishAllWithoutInflight() { } else if (key.isEmpty()) { // We will publish the batch with no ordering key outside messagesBatchLock. unorderedOutstandingBatch = batch.popOutstandingBatch(); + it.remove(); } else if (!sequentialExecutor.hasTasksInflight(key)) { publishOutstandingBatch(batch.popOutstandingBatch()); it.remove();