Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use the term "bundling" instead of "batching" #1479

Merged
merged 1 commit into from
Dec 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@
* A Cloud Pub/Sub <a href="https://cloud.google.com/pubsub/docs/publisher">publisher</a>, that is
* associated with a specific topic at creation.
*
* <p>A {@link Publisher} provides built-in capabilities to automatically handle batching of
* <p>A {@link Publisher} provides built-in capabilities to automatically handle bundling of
* messages, controlling memory utilization, and retrying API calls on transient errors.
*
* <p>With customizable options that control:
*
* <ul>
* <li>Message batching: such as number of messages or max batch byte size.
* <li>Message bundling: such as number of messages or max bundle byte size.
* <li>Flow control: such as max outstanding messages and maximum outstanding bytes.
* <li>Retries: such as the maximum duration of retries for a failing batch of messages.
* <li>Retries: such as the maximum duration of retries for a failing bundle of messages.
* </ul>
*
* <p>If no credentials are provided, the {@link Publisher} will use application default credentials
Expand All @@ -51,7 +51,7 @@
* <pre>
* Publisher publisher =
* Publisher.Builder.newBuilder(MY_TOPIC)
* .setMaxBatchDuration(new Duration(10 * 1000))
* .setMaxBundleDuration(new Duration(10 * 1000))
* .build();
* List<ListenableFuture<String>> results = new ArrayList<>();
*
Expand Down Expand Up @@ -81,23 +81,23 @@ public interface Publisher {
String PUBSUB_API_SCOPE = "https://www.googleapis.com/auth/pubsub";

// API limits.
int MAX_BATCH_MESSAGES = 1000;
int MAX_BATCH_BYTES = 10 * 1000 * 1000; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
int MAX_BUNDLE_MESSAGES = 1000;
int MAX_BUNDLE_BYTES = 10 * 1000 * 1000; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)

// Meaningful defaults.
int DEFAULT_MAX_BATCH_MESSAGES = 100;
int DEFAULT_MAX_BATCH_BYTES = 1000; // 1 kB
Duration DEFAULT_MAX_BATCH_DURATION = new Duration(1); // 1ms
int DEFAULT_MAX_BUNDLE_MESSAGES = 100;
int DEFAULT_MAX_BUNDLE_BYTES = 1000; // 1 kB
Duration DEFAULT_MAX_BUNDLE_DURATION = new Duration(1); // 1ms
Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds
Duration MIN_SEND_BATCH_DURATION = new Duration(10 * 1000); // 10 seconds
Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds
Duration MIN_REQUEST_TIMEOUT = new Duration(10); // 10 milliseconds

/** Topic to which the publisher publishes to. */
String getTopic();

/**
* Schedules the publishing of a message. The publishing of the message may occur immediately or
* be delayed based on the publisher batching options.
* be delayed based on the publisher bundling options.
*
* <p>Depending on chosen flow control {@link #failOnFlowControlLimits option}, the returned
* future might immediately fail with a {@link CloudPubsubFlowControlException} or block the
Expand All @@ -109,13 +109,13 @@ public interface Publisher {
ListenableFuture<String> publish(PubsubMessage message);

/** Maximum amount of time to wait until scheduling the publishing of messages. */
Duration getMaxBatchDuration();
Duration getMaxBundleDuration();

/** Maximum number of bytes to batch before publishing. */
long getMaxBatchBytes();
/** Maximum number of bytes to bundle before publishing. */
long getMaxBundleBytes();

/** Maximum number of messages to batch before publishing. */
long getMaxBatchMessages();
/** Maximum number of messages to bundle before publishing. */
long getMaxBundleMessages();

/**
* Maximum number of outstanding (i.e. pending to publish) messages before limits are enforced.
Expand Down Expand Up @@ -155,18 +155,18 @@ public interface Publisher {
final class Builder {
String topic;

// Batching options
int maxBatchMessages;
int maxBatchBytes;
Duration maxBatchDuration;
// Bundling options
int maxBundleMessages;
int maxBundleBytes;
Duration maxBundleDuration;

// Client-side flow control options
Optional<Integer> maxOutstandingMessages;
Optional<Integer> maxOutstandingBytes;
boolean failOnFlowControlLimits;

// Send batch deadline
Duration sendBatchDeadline;
// Send bundle deadline
Duration sendBundleDeadline;

// RPC options
Duration requestTimeout;
Expand All @@ -192,11 +192,11 @@ private void setDefaults() {
channelBuilder = Optional.absent();
maxOutstandingMessages = Optional.absent();
maxOutstandingBytes = Optional.absent();
maxBatchMessages = DEFAULT_MAX_BATCH_MESSAGES;
maxBatchBytes = DEFAULT_MAX_BATCH_BYTES;
maxBatchDuration = DEFAULT_MAX_BATCH_DURATION;
maxBundleMessages = DEFAULT_MAX_BUNDLE_MESSAGES;
maxBundleBytes = DEFAULT_MAX_BUNDLE_BYTES;
maxBundleDuration = DEFAULT_MAX_BUNDLE_DURATION;
requestTimeout = DEFAULT_REQUEST_TIMEOUT;
sendBatchDeadline = MIN_SEND_BATCH_DURATION;
sendBundleDeadline = MIN_SEND_BUNDLE_DURATION;
failOnFlowControlLimits = false;
executor = Optional.absent();
}
Expand Down Expand Up @@ -224,16 +224,16 @@ public Builder setChannelBuilder(
return this;
}

// Batching options
// Bundling options

/**
* Maximum number of messages to send per publish call.
*
* <p>It also sets a target to when to trigger a publish.
*/
public Builder setMaxBatchMessages(int messages) {
public Builder setMaxBundleMessages(int messages) {
Preconditions.checkArgument(messages > 0);
maxBatchMessages = messages;
maxBundleMessages = messages;
return this;
}

Expand All @@ -244,19 +244,19 @@ public Builder setMaxBatchMessages(int messages) {
*
* <p>This will not be honored if a single message is published that exceeds this maximum.
*/
public Builder setMaxBatchBytes(int bytes) {
public Builder setMaxBundleBytes(int bytes) {
Preconditions.checkArgument(bytes > 0);
maxBatchBytes = bytes;
maxBundleBytes = bytes;
return this;
}

/**
* Time to wait, since the first message is kept in memory for batching, before triggering a
* Time to wait, since the first message is kept in memory for bundling, before triggering a
* publish call.
*/
public Builder setMaxBatchDuration(Duration duration) {
public Builder setMaxBundleDuration(Duration duration) {
Preconditions.checkArgument(duration.getMillis() >= 0);
maxBatchDuration = duration;
maxBundleDuration = duration;
return this;
}

Expand Down Expand Up @@ -289,10 +289,10 @@ public Builder setFailOnFlowControlLimits(boolean fail) {
return this;
}

/** Maximum time to attempt sending (and retrying) a batch of messages before giving up. */
public Builder setSendBatchDeadline(Duration deadline) {
Preconditions.checkArgument(deadline.compareTo(MIN_SEND_BATCH_DURATION) >= 0);
sendBatchDeadline = deadline;
/** Maximum time to attempt sending (and retrying) a bundle of messages before giving up. */
public Builder setSendBundleDeadline(Duration deadline) {
Preconditions.checkArgument(deadline.compareTo(MIN_SEND_BUNDLE_DURATION) >= 0);
sendBundleDeadline = deadline;
return this;
}

Expand Down Expand Up @@ -329,14 +329,14 @@ public MaxOutstandingMessagesReachedException(int currentMaxMessages) {
this.currentMaxMessages = currentMaxMessages;
}

public int getCurrentMaxBatchMessages() {
public int getCurrentMaxBundleMessages() {
return currentMaxMessages;
}

@Override
public String toString() {
return String.format(
"The maximum number of batch messages: %d have been reached.", currentMaxMessages);
"The maximum number of bundle messages: %d have been reached.", currentMaxMessages);
}
}

Expand All @@ -351,14 +351,14 @@ public MaxOutstandingBytesReachedException(int currentMaxBytes) {
this.currentMaxBytes = currentMaxBytes;
}

public int getCurrentMaxBatchBytes() {
public int getCurrentMaxBundleBytes() {
return currentMaxBytes;
}

@Override
public String toString() {
return String.format(
"The maximum number of batch bytes: %d have been reached.", currentMaxBytes);
"The maximum number of bundle bytes: %d have been reached.", currentMaxBytes);
}
}
}
Loading