From 82283ba38a36d65e86d1f67d97b39743a1406f04 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Thu, 15 Dec 2016 15:42:45 +1100 Subject: [PATCH] use the term "bundling" instead of "batching" --- .../com/google/cloud/pubsub/Publisher.java | 84 ++++----- .../google/cloud/pubsub/PublisherImpl.java | 166 +++++++++--------- .../com/google/cloud/pubsub/Subscriber.java | 2 +- .../cloud/pubsub/SubscriberConnection.java | 4 +- .../google/cloud/pubsub/BaseSystemTest.java | 4 +- .../cloud/pubsub/PublisherImplTest.java | 78 ++++---- .../cloud/pubsub/SubscriberImplTest.java | 34 ++-- 7 files changed, 186 insertions(+), 186 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java index 17a41796022e..1712b86929ad 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java @@ -31,15 +31,15 @@ * A Cloud Pub/Sub publisher, that is * associated with a specific topic at creation. * - *

A {@link Publisher} provides built-in capabilities to automatically handle batching of + *

A {@link Publisher} provides built-in capabilities to automatically handle bundling of * messages, controlling memory utilization, and retrying API calls on transient errors. * *

With customizable options that control: * *

* *

If no credentials are provided, the {@link Publisher} will use application default credentials @@ -51,7 +51,7 @@ *

  *  Publisher publisher =
  *       Publisher.Builder.newBuilder(MY_TOPIC)
- *           .setMaxBatchDuration(new Duration(10 * 1000))
+ *           .setMaxBundleDuration(new Duration(10 * 1000))
  *           .build();
  *  List> results = new ArrayList<>();
  *
@@ -81,15 +81,15 @@ public interface Publisher {
   String PUBSUB_API_SCOPE = "https://www.googleapis.com/auth/pubsub";
 
   // API limits.
-  int MAX_BATCH_MESSAGES = 1000;
-  int MAX_BATCH_BYTES = 10 * 1000 * 1000; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
+  int MAX_BUNDLE_MESSAGES = 1000;
+  int MAX_BUNDLE_BYTES = 10 * 1000 * 1000; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
 
   // Meaningful defaults.
-  int DEFAULT_MAX_BATCH_MESSAGES = 100;
-  int DEFAULT_MAX_BATCH_BYTES = 1000; // 1 kB
-  Duration DEFAULT_MAX_BATCH_DURATION = new Duration(1); // 1ms
+  int DEFAULT_MAX_BUNDLE_MESSAGES = 100;
+  int DEFAULT_MAX_BUNDLE_BYTES = 1000; // 1 kB
+  Duration DEFAULT_MAX_BUNDLE_DURATION = new Duration(1); // 1ms
   Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds
-  Duration MIN_SEND_BATCH_DURATION = new Duration(10 * 1000); // 10 seconds
+  Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds
   Duration MIN_REQUEST_TIMEOUT = new Duration(10); // 10 milliseconds
 
   /** Topic to which the publisher publishes to. */
@@ -97,7 +97,7 @@ public interface Publisher {
 
   /**
    * Schedules the publishing of a message. The publishing of the message may occur immediately or
-   * be delayed based on the publisher batching options.
+   * be delayed based on the publisher bundling options.
    *
    * 

Depending on chosen flow control {@link #failOnFlowControlLimits option}, the returned * future might immediately fail with a {@link CloudPubsubFlowControlException} or block the @@ -109,13 +109,13 @@ public interface Publisher { ListenableFuture publish(PubsubMessage message); /** Maximum amount of time to wait until scheduling the publishing of messages. */ - Duration getMaxBatchDuration(); + Duration getMaxBundleDuration(); - /** Maximum number of bytes to batch before publishing. */ - long getMaxBatchBytes(); + /** Maximum number of bytes to bundle before publishing. */ + long getMaxBundleBytes(); - /** Maximum number of messages to batch before publishing. */ - long getMaxBatchMessages(); + /** Maximum number of messages to bundle before publishing. */ + long getMaxBundleMessages(); /** * Maximum number of outstanding (i.e. pending to publish) messages before limits are enforced. @@ -155,18 +155,18 @@ public interface Publisher { final class Builder { String topic; - // Batching options - int maxBatchMessages; - int maxBatchBytes; - Duration maxBatchDuration; + // Bundling options + int maxBundleMessages; + int maxBundleBytes; + Duration maxBundleDuration; // Client-side flow control options Optional maxOutstandingMessages; Optional maxOutstandingBytes; boolean failOnFlowControlLimits; - // Send batch deadline - Duration sendBatchDeadline; + // Send bundle deadline + Duration sendBundleDeadline; // RPC options Duration requestTimeout; @@ -192,11 +192,11 @@ private void setDefaults() { channelBuilder = Optional.absent(); maxOutstandingMessages = Optional.absent(); maxOutstandingBytes = Optional.absent(); - maxBatchMessages = DEFAULT_MAX_BATCH_MESSAGES; - maxBatchBytes = DEFAULT_MAX_BATCH_BYTES; - maxBatchDuration = DEFAULT_MAX_BATCH_DURATION; + maxBundleMessages = DEFAULT_MAX_BUNDLE_MESSAGES; + maxBundleBytes = DEFAULT_MAX_BUNDLE_BYTES; + maxBundleDuration = DEFAULT_MAX_BUNDLE_DURATION; requestTimeout = DEFAULT_REQUEST_TIMEOUT; - sendBatchDeadline = MIN_SEND_BATCH_DURATION; + sendBundleDeadline = MIN_SEND_BUNDLE_DURATION; failOnFlowControlLimits = false; executor = Optional.absent(); } @@ -224,16 +224,16 @@ public Builder setChannelBuilder( return this; } - // Batching options + // Bundling options /** * Maximum number of messages to send per publish call. * *

It also sets a target to when to trigger a publish. */ - public Builder setMaxBatchMessages(int messages) { + public Builder setMaxBundleMessages(int messages) { Preconditions.checkArgument(messages > 0); - maxBatchMessages = messages; + maxBundleMessages = messages; return this; } @@ -244,19 +244,19 @@ public Builder setMaxBatchMessages(int messages) { * *

This will not be honored if a single message is published that exceeds this maximum. */ - public Builder setMaxBatchBytes(int bytes) { + public Builder setMaxBundleBytes(int bytes) { Preconditions.checkArgument(bytes > 0); - maxBatchBytes = bytes; + maxBundleBytes = bytes; return this; } /** - * Time to wait, since the first message is kept in memory for batching, before triggering a + * Time to wait, since the first message is kept in memory for bundling, before triggering a * publish call. */ - public Builder setMaxBatchDuration(Duration duration) { + public Builder setMaxBundleDuration(Duration duration) { Preconditions.checkArgument(duration.getMillis() >= 0); - maxBatchDuration = duration; + maxBundleDuration = duration; return this; } @@ -289,10 +289,10 @@ public Builder setFailOnFlowControlLimits(boolean fail) { return this; } - /** Maximum time to attempt sending (and retrying) a batch of messages before giving up. */ - public Builder setSendBatchDeadline(Duration deadline) { - Preconditions.checkArgument(deadline.compareTo(MIN_SEND_BATCH_DURATION) >= 0); - sendBatchDeadline = deadline; + /** Maximum time to attempt sending (and retrying) a bundle of messages before giving up. */ + public Builder setSendBundleDeadline(Duration deadline) { + Preconditions.checkArgument(deadline.compareTo(MIN_SEND_BUNDLE_DURATION) >= 0); + sendBundleDeadline = deadline; return this; } @@ -329,14 +329,14 @@ public MaxOutstandingMessagesReachedException(int currentMaxMessages) { this.currentMaxMessages = currentMaxMessages; } - public int getCurrentMaxBatchMessages() { + public int getCurrentMaxBundleMessages() { return currentMaxMessages; } @Override public String toString() { return String.format( - "The maximum number of batch messages: %d have been reached.", currentMaxMessages); + "The maximum number of bundle messages: %d have been reached.", currentMaxMessages); } } @@ -351,14 +351,14 @@ public MaxOutstandingBytesReachedException(int currentMaxBytes) { this.currentMaxBytes = currentMaxBytes; } - public int getCurrentMaxBatchBytes() { + public int getCurrentMaxBundleBytes() { return currentMaxBytes; } @Override public String toString() { return String.format( - "The maximum number of batch bytes: %d have been reached.", currentMaxBytes); + "The maximum number of bundle bytes: %d have been reached.", currentMaxBytes); } } } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java index bb9bc1ae7e71..12732bed4e8f 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java @@ -63,18 +63,18 @@ final class PublisherImpl implements Publisher { private final String topic; - private final int maxBatchMessages; - private final int maxBatchBytes; - private final Duration maxBatchDuration; - private final boolean hasBatchingBytes; + private final int maxBundleMessages; + private final int maxBundleBytes; + private final Duration maxBundleDuration; + private final boolean hasBundlingBytes; private final Optional maxOutstandingMessages; private final Optional maxOutstandingBytes; private final boolean failOnFlowControlLimits; - private final Lock messagesBatchLock; - private List messagesBatch; - private int batchedBytes; + private final Lock messagesBundleLock; + private List messagesBundle; + private int bundledBytes; private final AtomicBoolean activeAlarm; @@ -87,16 +87,16 @@ final class PublisherImpl implements Publisher { private final ScheduledExecutorService executor; private final AtomicBoolean shutdown; private final MessagesWaiter messagesWaiter; - private final Duration sendBatchDeadline; + private final Duration sendBundleDeadline; private ScheduledFuture currentAlarmFuture; PublisherImpl(Builder builder) throws IOException { topic = builder.topic; - maxBatchMessages = builder.maxBatchMessages; - maxBatchBytes = builder.maxBatchBytes; - maxBatchDuration = builder.maxBatchDuration; - hasBatchingBytes = maxBatchBytes > 0; + maxBundleMessages = builder.maxBundleMessages; + maxBundleBytes = builder.maxBundleBytes; + maxBundleDuration = builder.maxBundleDuration; + hasBundlingBytes = maxBundleBytes > 0; maxOutstandingMessages = builder.maxOutstandingMessages; maxOutstandingBytes = builder.maxOutstandingBytes; @@ -104,12 +104,12 @@ final class PublisherImpl implements Publisher { this.flowController = new FlowController(maxOutstandingMessages, maxOutstandingBytes, failOnFlowControlLimits); - sendBatchDeadline = builder.sendBatchDeadline; + sendBundleDeadline = builder.sendBundleDeadline; requestTimeout = builder.requestTimeout; - messagesBatch = new LinkedList<>(); - messagesBatchLock = new ReentrantLock(); + messagesBundle = new LinkedList<>(); + messagesBundleLock = new ReentrantLock(); activeAlarm = new AtomicBoolean(false); int numCores = Math.max(1, Runtime.getRuntime().availableProcessors()); executor = @@ -150,18 +150,18 @@ public PublisherStats getStats() { } @Override - public Duration getMaxBatchDuration() { - return maxBatchDuration; + public Duration getMaxBundleDuration() { + return maxBundleDuration; } @Override - public long getMaxBatchBytes() { - return maxBatchBytes; + public long getMaxBundleBytes() { + return maxBundleBytes; } @Override - public long getMaxBatchMessages() { - return maxBatchMessages; + public long getMaxBundleMessages() { + return maxBundleMessages; } @Override @@ -206,36 +206,36 @@ public ListenableFuture publish(PubsubMessage message) { } catch (CloudPubsubFlowControlException e) { return Futures.immediateFailedFuture(e); } - OutstandingBatch batchToSend = null; + OutstandingBundle bundleToSend = null; SettableFuture publishResult = SettableFuture.create(); final OutstandingPublish outstandingPublish = new OutstandingPublish(publishResult, message); - messagesBatchLock.lock(); + messagesBundleLock.lock(); try { - // Check if the next message makes the batch exceed the current batch byte size. - if (!messagesBatch.isEmpty() - && hasBatchingBytes - && batchedBytes + messageSize >= getMaxBatchBytes()) { - batchToSend = new OutstandingBatch(messagesBatch, batchedBytes); - messagesBatch = new LinkedList<>(); - batchedBytes = 0; + // Check if the next message makes the bundle exceed the current bundle byte size. + if (!messagesBundle.isEmpty() + && hasBundlingBytes + && bundledBytes + messageSize >= getMaxBundleBytes()) { + bundleToSend = new OutstandingBundle(messagesBundle, bundledBytes); + messagesBundle = new LinkedList<>(); + bundledBytes = 0; } - // Border case if the message to send is greater equals to the max batch size then can't be - // included in the current batch and instead sent immediately. - if (!hasBatchingBytes || messageSize < getMaxBatchBytes()) { - batchedBytes += messageSize; - messagesBatch.add(outstandingPublish); + // Border case if the message to send is greater equals to the max bundle size then can't be + // included in the current bundle and instead sent immediately. + if (!hasBundlingBytes || messageSize < getMaxBundleBytes()) { + bundledBytes += messageSize; + messagesBundle.add(outstandingPublish); - // If after adding the message we have reached the batch max messages then we have a batch + // If after adding the message we have reached the bundle max messages then we have a bundle // to send. - if (messagesBatch.size() == getMaxBatchMessages()) { - batchToSend = new OutstandingBatch(messagesBatch, batchedBytes); - messagesBatch = new LinkedList<>(); - batchedBytes = 0; + if (messagesBundle.size() == getMaxBundleMessages()) { + bundleToSend = new OutstandingBundle(messagesBundle, bundledBytes); + messagesBundle = new LinkedList<>(); + bundledBytes = 0; } } - // Setup the next duration based delivery alarm if there are messages batched. - if (!messagesBatch.isEmpty()) { + // Setup the next duration based delivery alarm if there are messages bundled. + if (!messagesBundle.isEmpty()) { setupDurationBasedPublishAlarm(); } else if (currentAlarmFuture != null) { logger.debug("Cancelling alarm"); @@ -244,33 +244,33 @@ public ListenableFuture publish(PubsubMessage message) { } } } finally { - messagesBatchLock.unlock(); + messagesBundleLock.unlock(); } messagesWaiter.incrementPendingMessages(1); - if (batchToSend != null) { - logger.debug("Scheduling a batch for immediate sending."); - final OutstandingBatch finalBatchToSend = batchToSend; + if (bundleToSend != null) { + logger.debug("Scheduling a bundle for immediate sending."); + final OutstandingBundle finalBundleToSend = bundleToSend; executor.execute( new Runnable() { @Override public void run() { - publishOutstandingBatch(finalBatchToSend); + publishOutstandingBundle(finalBundleToSend); } }); } // If the message is over the size limit, it was not added to the pending messages and it will - // be sent in its own batch immediately. - if (hasBatchingBytes && messageSize >= getMaxBatchBytes()) { - logger.debug("Message exceeds the max batch bytes, scheduling it for immediate send."); + // be sent in its own bundle immediately. + if (hasBundlingBytes && messageSize >= getMaxBundleBytes()) { + logger.debug("Message exceeds the max bundle bytes, scheduling it for immediate send."); executor.execute( new Runnable() { @Override public void run() { - publishOutstandingBatch( - new OutstandingBatch(ImmutableList.of(outstandingPublish), messageSize)); + publishOutstandingBundle( + new OutstandingBundle(ImmutableList.of(outstandingPublish), messageSize)); } }); } @@ -280,7 +280,7 @@ public void run() { private void setupDurationBasedPublishAlarm() { if (!activeAlarm.getAndSet(true)) { - logger.debug("Setting up alarm for the next %d ms.", getMaxBatchDuration().getMillis()); + logger.debug("Setting up alarm for the next %d ms.", getMaxBundleDuration().getMillis()); currentAlarmFuture = executor.schedule( new Runnable() { @@ -291,31 +291,31 @@ public void run() { publishAllOustanding(); } }, - getMaxBatchDuration().getMillis(), + getMaxBundleDuration().getMillis(), TimeUnit.MILLISECONDS); } } private void publishAllOustanding() { - messagesBatchLock.lock(); - OutstandingBatch batchToSend; + messagesBundleLock.lock(); + OutstandingBundle bundleToSend; try { - if (messagesBatch.isEmpty()) { + if (messagesBundle.isEmpty()) { return; } - batchToSend = new OutstandingBatch(messagesBatch, batchedBytes); - messagesBatch = new LinkedList<>(); - batchedBytes = 0; + bundleToSend = new OutstandingBundle(messagesBundle, bundledBytes); + messagesBundle = new LinkedList<>(); + bundledBytes = 0; } finally { - messagesBatchLock.unlock(); + messagesBundleLock.unlock(); } - publishOutstandingBatch(batchToSend); + publishOutstandingBundle(bundleToSend); } - private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) { + private void publishOutstandingBundle(final OutstandingBundle outstandingBundle) { PublishRequest.Builder publishRequest = PublishRequest.newBuilder(); publishRequest.setTopic(topic); - for (OutstandingPublish outstandingPublish : outstandingBatch.outstandingPublishes) { + for (OutstandingPublish outstandingPublish : outstandingBundle.outstandingPublishes) { publishRequest.addMessages(outstandingPublish.message); } int currentChannel = (int) (channelIndex.getAndIncrement() % channels.length); @@ -328,46 +328,46 @@ private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) { @Override public void onSuccess(PublishResponse result) { try { - if (result.getMessageIdsCount() != outstandingBatch.size()) { + if (result.getMessageIdsCount() != outstandingBundle.size()) { Throwable t = new IllegalStateException( String.format( "The publish result count %s does not match " + "the expected %s results. Please contact Cloud Pub/Sub support " + "if this frequently occurs", - result.getMessageIdsCount(), outstandingBatch.size())); - for (OutstandingPublish oustandingMessage : outstandingBatch.outstandingPublishes) { + result.getMessageIdsCount(), outstandingBundle.size())); + for (OutstandingPublish oustandingMessage : outstandingBundle.outstandingPublishes) { oustandingMessage.publishResult.setException(t); } return; } Iterator messagesResultsIt = - outstandingBatch.outstandingPublishes.iterator(); + outstandingBundle.outstandingPublishes.iterator(); for (String messageId : result.getMessageIdsList()) { messagesResultsIt.next().publishResult.set(messageId); } } finally { - flowController.release(outstandingBatch.size(), outstandingBatch.batchSizeBytes); - messagesWaiter.incrementPendingMessages(-outstandingBatch.size()); + flowController.release(outstandingBundle.size(), outstandingBundle.bundleSizeBytes); + messagesWaiter.incrementPendingMessages(-outstandingBundle.size()); } } @Override public void onFailure(Throwable t) { - long nextBackoffDelay = computeNextBackoffDelayMs(outstandingBatch); + long nextBackoffDelay = computeNextBackoffDelayMs(outstandingBundle); if (!isRetryable(t) || System.currentTimeMillis() + nextBackoffDelay - > outstandingBatch.creationTime - + PublisherImpl.this.sendBatchDeadline.getMillis()) { + > outstandingBundle.creationTime + + PublisherImpl.this.sendBundleDeadline.getMillis()) { try { for (OutstandingPublish outstandingPublish : - outstandingBatch.outstandingPublishes) { + outstandingBundle.outstandingPublishes) { outstandingPublish.publishResult.setException(t); } } finally { - messagesWaiter.incrementPendingMessages(-outstandingBatch.size()); + messagesWaiter.incrementPendingMessages(-outstandingBundle.size()); } return; } @@ -376,7 +376,7 @@ public void onFailure(Throwable t) { new Runnable() { @Override public void run() { - publishOutstandingBatch(outstandingBatch); + publishOutstandingBundle(outstandingBundle); } }, nextBackoffDelay, @@ -385,17 +385,17 @@ public void run() { }); } - private static final class OutstandingBatch { + private static final class OutstandingBundle { final List outstandingPublishes; final long creationTime; int attempt; - int batchSizeBytes; + int bundleSizeBytes; - OutstandingBatch(List outstandingPublishes, int batchSizeBytes) { + OutstandingBundle(List outstandingPublishes, int bundleSizeBytes) { this.outstandingPublishes = outstandingPublishes; attempt = 1; creationTime = System.currentTimeMillis(); - this.batchSizeBytes = batchSizeBytes; + this.bundleSizeBytes = bundleSizeBytes; } public int size() { @@ -425,12 +425,12 @@ public void shutdown() { messagesWaiter.waitNoMessages(); } - private static long computeNextBackoffDelayMs(OutstandingBatch outstandingBatch) { - long delayMillis = Math.round(Math.scalb(INITIAL_BACKOFF_MS, outstandingBatch.attempt)); + private static long computeNextBackoffDelayMs(OutstandingBundle outstandingBundle) { + long delayMillis = Math.round(Math.scalb(INITIAL_BACKOFF_MS, outstandingBundle.attempt)); int randomWaitMillis = Ints.saturatedCast( (long) ((Math.random() - 0.5) * 2 * delayMillis * BACKOFF_RANDOMNESS_FACTOR)); - ++outstandingBatch.attempt; + ++outstandingBundle.attempt; return delayMillis + randomWaitMillis; } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java index 307afcccb6a0..cf24e2eca2d5 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java @@ -67,7 +67,7 @@ * * Subscriber subscriber = * Subscriber.Builder.newBuilder(MY_SUBSCRIPTION, receiver) - * .setMaxBatchAcks(100) + * .setMaxBundleAcks(100) * .build(); * * subscriber.startAsync(); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriberConnection.java index ea6ce2564bd1..19e5857dc112 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriberConnection.java @@ -401,7 +401,7 @@ public void run() { }); } flowController.reserve(receivedMessagesCount, totalByteCount); - // Only if not shutdown we will request one more batch of messages to be delivered. + // Only if not shutdown we will request one more bundle of messages to be delivered. if (isAlive()) { requestObserver.request(1); } @@ -580,7 +580,7 @@ private void sendOutstandingAckOperations(List ackDead } } - // Send the modify ack deadlines in batches as not to exceed the max request + // Send the modify ack deadlines in bundles as not to exceed the max request // size. List> ackChunks = Lists.partition(acksToSend, MAX_PER_REQUEST_CHANGES); List> modifyAckDeadlineChunks = diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java index d9524af5925e..2b5ab2247a16 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java @@ -465,7 +465,7 @@ public void testPullMessagesAndAutoRenewDeadline() throws InterruptedException { SubscriptionInfo.newBuilder(topic, subscription).setAckDeadLineSeconds(10).build()); Message message1 = Message.of("payload1"); Message message2 = Message.of("payload2"); - // todo(mziccard): use batch publish if #1017 gets fixed, or remove this comment + // todo(mziccard): use bundle publish if #1017 gets fixed, or remove this comment pubsub().publish(topic, message1); pubsub().publish(topic, message2); Iterator iterator = pubsub().pull(subscription, 2); @@ -505,7 +505,7 @@ public void testPullMessagesAndModifyAckDeadline() throws InterruptedException { SubscriptionInfo.newBuilder(topic, subscription).setAckDeadLineSeconds(10).build()); Message message1 = Message.of("payload1"); Message message2 = Message.of("payload2"); - // todo(mziccard): use batch publish if #1017 gets fixed, or remove this comment + // todo(mziccard): use bundle publish if #1017 gets fixed, or remove this comment pubsub().publish(topic, message1); pubsub().publish(topic, message2); // Consume all messages and stop ack renewal diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java index fece0268447f..8d404b2b79dd 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java @@ -99,9 +99,9 @@ public static void tearDownClass() throws Exception { public void testPublishByDuration() throws Exception { Publisher publisher = getTestPublisherBuilder() - .setMaxBatchDuration(Duration.standardSeconds(5)) + .setMaxBundleDuration(Duration.standardSeconds(5)) // To demonstrate that reaching duration will trigger publish - .setMaxBatchMessages(10) + .setMaxBundleMessages(10) .build(); testPublisherServiceImpl.addPublishResponse( @@ -124,11 +124,11 @@ public void testPublishByDuration() throws Exception { } @Test - public void testPublishByNumBatchedMessages() throws Exception { + public void testPublishByNumBundledMessages() throws Exception { Publisher publisher = getTestPublisherBuilder() - .setMaxBatchDuration(Duration.standardSeconds(100)) - .setMaxBatchMessages(2) + .setMaxBundleDuration(Duration.standardSeconds(100)) + .setMaxBundleMessages(2) .build(); testPublisherServiceImpl @@ -162,8 +162,8 @@ public void testPublishByNumBatchedMessages() throws Exception { public void testSinglePublishByNumBytes() throws Exception { Publisher publisher = getTestPublisherBuilder() - .setMaxBatchDuration(Duration.standardSeconds(100)) - .setMaxBatchMessages(2) + .setMaxBundleDuration(Duration.standardSeconds(100)) + .setMaxBundleMessages(2) .build(); testPublisherServiceImpl @@ -192,9 +192,9 @@ public void testSinglePublishByNumBytes() throws Exception { public void testPublishMixedSizeAndDuration() throws Exception { Publisher publisher = getTestPublisherBuilder() - .setMaxBatchDuration(Duration.standardSeconds(5)) + .setMaxBundleDuration(Duration.standardSeconds(5)) // To demonstrate that reaching duration will trigger publish - .setMaxBatchMessages(2) + .setMaxBundleMessages(2) .build(); testPublisherServiceImpl.addPublishResponse( @@ -208,7 +208,7 @@ public void testPublishMixedSizeAndDuration() throws Exception { ListenableFuture publishFuture2 = sendTestMessage(publisher, "B"); - // Publishing triggered by batch size + // Publishing triggered by bundle size assertEquals("1", publishFuture1.get()); assertEquals("2", publishFuture2.get()); @@ -237,8 +237,8 @@ public void testPublishFailureRetries() throws Exception { Publisher publisher = getTestPublisherBuilder() .setExecutor(Executors.newSingleThreadScheduledExecutor()) - .setMaxBatchDuration(Duration.standardSeconds(5)) - .setMaxBatchMessages(1) + .setMaxBundleDuration(Duration.standardSeconds(5)) + .setMaxBundleMessages(1) .build(); // To demonstrate that reaching duration will trigger publish ListenableFuture publishFuture1 = sendTestMessage(publisher, "A"); @@ -257,9 +257,9 @@ public void testPublishFailureRetries_exceededsRetryDuration() throws Exception Publisher publisher = getTestPublisherBuilder() .setExecutor(Executors.newSingleThreadScheduledExecutor()) - .setSendBatchDeadline(Duration.standardSeconds(10)) - .setMaxBatchDuration(Duration.standardSeconds(5)) - .setMaxBatchMessages(1) + .setSendBundleDeadline(Duration.standardSeconds(10)) + .setMaxBundleDuration(Duration.standardSeconds(5)) + .setMaxBundleMessages(1) .build(); // To demonstrate that reaching duration will trigger publish ListenableFuture publishFuture1 = sendTestMessage(publisher, "A"); @@ -282,9 +282,9 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce Publisher publisher = getTestPublisherBuilder() .setExecutor(Executors.newSingleThreadScheduledExecutor()) - .setSendBatchDeadline(Duration.standardSeconds(10)) - .setMaxBatchDuration(Duration.standardSeconds(5)) - .setMaxBatchMessages(1) + .setSendBundleDeadline(Duration.standardSeconds(10)) + .setMaxBundleDuration(Duration.standardSeconds(5)) + .setMaxBundleMessages(1) .build(); // To demonstrate that reaching duration will trigger publish ListenableFuture publishFuture1 = sendTestMessage(publisher, "A"); @@ -309,19 +309,19 @@ public void testPublisherGetters() throws Exception { builder.setCredentials(credentials); builder.setExecutor(executor); builder.setFailOnFlowControlLimits(true); - builder.setMaxBatchBytes(10); - builder.setMaxBatchDuration(new Duration(11)); - builder.setMaxBatchMessages(12); + builder.setMaxBundleBytes(10); + builder.setMaxBundleDuration(new Duration(11)); + builder.setMaxBundleMessages(12); builder.setMaxOutstandingBytes(13); builder.setMaxOutstandingMessages(14); builder.setRequestTimeout(new Duration(15)); - builder.setSendBatchDeadline(new Duration(16000)); + builder.setSendBundleDeadline(new Duration(16000)); Publisher publisher = builder.build(); assertEquals(TEST_TOPIC, publisher.getTopic()); - assertEquals(10, publisher.getMaxBatchBytes()); - assertEquals(new Duration(11), publisher.getMaxBatchDuration()); - assertEquals(12, publisher.getMaxBatchMessages()); + assertEquals(10, publisher.getMaxBundleBytes()); + assertEquals(new Duration(11), publisher.getMaxBundleDuration()); + assertEquals(12, publisher.getMaxBundleMessages()); assertEquals(Optional.of(13), publisher.getMaxOutstandingBytes()); assertEquals(Optional.of(14), publisher.getMaxOutstandingMessages()); assertTrue(publisher.failOnFlowControlLimits()); @@ -334,13 +334,13 @@ public void testBuilderParametersAndDefaults() { assertEquals(Optional.absent(), builder.channelBuilder); assertEquals(Optional.absent(), builder.executor); assertFalse(builder.failOnFlowControlLimits); - assertEquals(Publisher.DEFAULT_MAX_BATCH_BYTES, builder.maxBatchBytes); - assertEquals(Publisher.DEFAULT_MAX_BATCH_DURATION, builder.maxBatchDuration); - assertEquals(Publisher.DEFAULT_MAX_BATCH_MESSAGES, builder.maxBatchMessages); + assertEquals(Publisher.DEFAULT_MAX_BUNDLE_BYTES, builder.maxBundleBytes); + assertEquals(Publisher.DEFAULT_MAX_BUNDLE_DURATION, builder.maxBundleDuration); + assertEquals(Publisher.DEFAULT_MAX_BUNDLE_MESSAGES, builder.maxBundleMessages); assertEquals(Optional.absent(), builder.maxOutstandingBytes); assertEquals(Optional.absent(), builder.maxOutstandingMessages); assertEquals(Publisher.DEFAULT_REQUEST_TIMEOUT, builder.requestTimeout); - assertEquals(Publisher.MIN_SEND_BATCH_DURATION, builder.sendBatchDeadline); + assertEquals(Publisher.MIN_SEND_BUNDLE_DURATION, builder.sendBundleDeadline); assertEquals(Optional.absent(), builder.userCredentials); } @@ -369,41 +369,41 @@ public void testBuilderInvalidArguments() { // Expected } try { - builder.setMaxBatchBytes(0); + builder.setMaxBundleBytes(0); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { // Expected } try { - builder.setMaxBatchBytes(-1); + builder.setMaxBundleBytes(-1); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { // Expected } - builder.setMaxBatchDuration(new Duration(1)); + builder.setMaxBundleDuration(new Duration(1)); try { - builder.setMaxBatchDuration(null); + builder.setMaxBundleDuration(null); fail("Should have thrown an IllegalArgumentException"); } catch (NullPointerException expected) { // Expected } try { - builder.setMaxBatchDuration(new Duration(-1)); + builder.setMaxBundleDuration(new Duration(-1)); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { // Expected } - builder.setMaxBatchMessages(1); + builder.setMaxBundleMessages(1); try { - builder.setMaxBatchMessages(0); + builder.setMaxBundleMessages(0); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { // Expected } try { - builder.setMaxBatchMessages(-1); + builder.setMaxBundleMessages(-1); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { // Expected @@ -444,9 +444,9 @@ public void testBuilderInvalidArguments() { } catch (IllegalArgumentException expected) { // Expected } - builder.setSendBatchDeadline(Publisher.MIN_SEND_BATCH_DURATION); + builder.setSendBundleDeadline(Publisher.MIN_SEND_BUNDLE_DURATION); try { - builder.setSendBatchDeadline(Publisher.MIN_SEND_BATCH_DURATION.minus(1)); + builder.setSendBundleDeadline(Publisher.MIN_SEND_BUNDLE_DURATION.minus(1)); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { // Expected diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/SubscriberImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/SubscriberImplTest.java index b61b6b5668cf..f56c8a6ac068 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/SubscriberImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/SubscriberImplTest.java @@ -217,48 +217,48 @@ public void testReceiverError_NacksMessage() throws Exception { } @Test - public void testBatchAcks() throws Exception { + public void testBundleAcks() throws Exception { Subscriber subscriber = getTestSubscriberBuilder(testReceiver).build(); subscriber.startAsync().awaitRunning(); - List testAckIdsBatch1 = ImmutableList.of("A", "B", "C"); - sendMessages(testAckIdsBatch1); + List testAckIdsBundle1 = ImmutableList.of("A", "B", "C"); + sendMessages(testAckIdsBundle1); // Trigger ack sending fakeExecutor.advanceTime(SubscriberConnection.PENDING_ACKS_SEND_DELAY); - assertEquivalent(testAckIdsBatch1, fakeSubscriberServiceImpl.waitAndConsumeReceivedAcks(3)); + assertEquivalent(testAckIdsBundle1, fakeSubscriberServiceImpl.waitAndConsumeReceivedAcks(3)); // Ensures the next ack sending alarm gets properly setup - List testAckIdsBatch2 = ImmutableList.of("D", "E"); - sendMessages(testAckIdsBatch2); + List testAckIdsBundle2 = ImmutableList.of("D", "E"); + sendMessages(testAckIdsBundle2); fakeExecutor.advanceTime(SubscriberConnection.PENDING_ACKS_SEND_DELAY); - assertEquivalent(testAckIdsBatch2, fakeSubscriberServiceImpl.waitAndConsumeReceivedAcks(2)); + assertEquivalent(testAckIdsBundle2, fakeSubscriberServiceImpl.waitAndConsumeReceivedAcks(2)); subscriber.stopAsync().awaitTerminated(); } @Test - public void testBatchAcksAndNacks() throws Exception { + public void testBundleAcksAndNacks() throws Exception { Subscriber subscriber = getTestSubscriberBuilder(testReceiver).build(); subscriber.startAsync().awaitRunning(); // Send messages to be acked - List testAckIdsBatch1 = ImmutableList.of("A", "B", "C"); - sendMessages(testAckIdsBatch1); + List testAckIdsBundle1 = ImmutableList.of("A", "B", "C"); + sendMessages(testAckIdsBundle1); // Send messages to be nacked - List testAckIdsBatch2 = ImmutableList.of("D", "E"); + List testAckIdsBundle2 = ImmutableList.of("D", "E"); // Nack messages testReceiver.setReply(AckReply.NACK); - sendMessages(testAckIdsBatch2); + sendMessages(testAckIdsBundle2); // Trigger ack sending fakeExecutor.advanceTime(SubscriberConnection.PENDING_ACKS_SEND_DELAY); - assertEquivalent(testAckIdsBatch1, fakeSubscriberServiceImpl.waitAndConsumeReceivedAcks(3)); + assertEquivalent(testAckIdsBundle1, fakeSubscriberServiceImpl.waitAndConsumeReceivedAcks(3)); assertEquivalent( ImmutableList.of(new ModifyAckDeadline("D", 0), new ModifyAckDeadline("E", 0)), fakeSubscriberServiceImpl.waitAndConsumeModifyAckDeadlines(2)); @@ -273,15 +273,15 @@ public void testModifyAckDeadline() throws Exception { subscriber.startAsync().awaitRunning(); // Send messages to be acked - List testAckIdsBatch = ImmutableList.of("A", "B", "C"); + List testAckIdsBundle = ImmutableList.of("A", "B", "C"); testReceiver.setExplicitAck(true); - sendMessages(testAckIdsBatch); + sendMessages(testAckIdsBundle); // Trigger modify ack deadline sending - 10s initial stream ack deadline - 1 padding fakeExecutor.advanceTime(Duration.standardSeconds(9)); assertEquivalentWithTransformation( - testAckIdsBatch, + testAckIdsBundle, fakeSubscriberServiceImpl.waitAndConsumeModifyAckDeadlines(3), new Function() { @Override @@ -294,7 +294,7 @@ public ModifyAckDeadline apply(String ack) { fakeExecutor.advanceTime(Duration.standardSeconds(2)); assertEquivalentWithTransformation( - testAckIdsBatch, + testAckIdsBundle, fakeSubscriberServiceImpl.waitAndConsumeModifyAckDeadlines(3), new Function() { @Override