Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: change Subscriber defaults #3147

Merged
merged 2 commits into from
Apr 12, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- `TableResult.getTotalRows()` can be called to obtain the total number of rows across pages.
- Various `Job` statistics are no longer available at `QueryResponse`.
- Use `BigQuery.getJob` then `Job.getStatistics` instead.

# v0.36.0
## Pub/Sub
- `TopicName` is renamed to `ProjectTopicName`, and now inherits from a new base class `TopicName`
Expand All @@ -34,3 +35,19 @@
- `subscription.getTopicAsTopicNameOneof()`: use `TopicNames.parse(subscription.getTopic())`
- `subscription.getNameAsSubscriptionName()`: use `ProjectSubscriptionName.parse(subscription.getName())`
- `snapshot.getNameAsSnapshotName()`: use `ProjectSnapshotName.parse(snapshot.getName())`

# v0.44.0

This comment was marked as spam.

This comment was marked as spam.

## Pub/Sub
The default flow control settings for `Subscriber` is changed.

- Previously it keeps combined size of outstanding messages below 20% of available memory.
Now it keeps the number of outstanding messages less than or equal to 1000.
- Previously it opens one stream per available CPU.
Now it opens one regardless of number of CPUs.

Slow message consumers will likely see better load-balancing across machines.
Because each machine pulls messages less eagerly, messages not yet pulled can be pulled by another machine.

Fast message consumers might see reduced performance.
If desired, these settings can be adjusted back by `Subscriber.Builder#setFlowControlSettings` and
`Subscriber.Builder#setParallelPullCount`.
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,11 @@ private Subscriber createSingleThreadedSubscriber() throws Exception {

private Subscriber createSubscriberWithCustomFlowSettings() throws Exception {
// [START pubsub_subscriber_flow_settings]
long maxMessageCount = 10L;
// Configure max number of messages to be pulled
FlowControlSettings flowControlSettings =
FlowControlSettings.newBuilder().setMaxOutstandingElementCount(maxMessageCount).build();
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(10_000L)
.setMaxOutstandingRequestBytes(1_000_000_000L)
.build();
Subscriber subscriber =
Subscriber.newBuilder(subscriptionName, receiver)
.setFlowControlSettings(flowControlSettings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,9 @@
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.pubsub.v1.GetSubscriptionRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.SubscriberGrpc;
import com.google.pubsub.v1.SubscriberGrpc.SubscriberFutureStub;
import com.google.pubsub.v1.SubscriberGrpc.SubscriberStub;
import com.google.pubsub.v1.Subscription;
import io.grpc.CallCredentials;
import io.grpc.Channel;
import io.grpc.auth.MoreCallCredentials;
Expand All @@ -56,7 +53,6 @@
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -96,7 +92,6 @@
*/
public class Subscriber extends AbstractApiService {
private static final int THREADS_PER_CHANNEL = 5;
@InternalApi static final int CHANNELS_PER_CORE = 1;
private static final int MAX_INBOUND_MESSAGE_SIZE =
20 * 1024 * 1024; // 20MB API maximum message size.
@InternalApi static final int MAX_ACK_DEADLINE_SECONDS = 600;
Expand Down Expand Up @@ -414,13 +409,11 @@ public static final class Builder {
private static final Duration MIN_ACK_EXPIRATION_PADDING = Duration.ofMillis(100);
private static final Duration DEFAULT_ACK_EXPIRATION_PADDING = Duration.ofSeconds(5);
private static final Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD = Duration.ofMinutes(60);
private static final long DEFAULT_MEMORY_PERCENTAGE = 20;

static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
InstantiatingExecutorProvider.newBuilder()
.setExecutorThreadCount(
THREADS_PER_CHANNEL
* CHANNELS_PER_CORE
* Runtime.getRuntime().availableProcessors())
.build();

Expand All @@ -431,10 +424,7 @@ public static final class Builder {
Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD;

FlowControlSettings flowControlSettings =
FlowControlSettings.newBuilder()
.setMaxOutstandingRequestBytes(
Runtime.getRuntime().maxMemory() * DEFAULT_MEMORY_PERCENTAGE / 100L)
.build();
FlowControlSettings.newBuilder().setMaxOutstandingElementCount(1000L).build();

ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
ExecutorProvider systemExecutorProvider = FixedExecutorProvider.create(SHARED_SYSTEM_EXECUTOR);
Expand All @@ -449,7 +439,7 @@ public static final class Builder {
CredentialsProvider credentialsProvider =
SubscriptionAdminSettings.defaultCredentialsProviderBuilder().build();
Optional<ApiClock> clock = Optional.absent();
int parallelPullCount = Runtime.getRuntime().availableProcessors() * CHANNELS_PER_CORE;
int parallelPullCount = 1;

Builder(String subscriptionName, MessageReceiver receiver) {
this.subscriptionName = subscriptionName;
Expand Down Expand Up @@ -500,7 +490,31 @@ Builder setInternalHeaderProvider(HeaderProvider internalHeaderProvider) {
return this;
}

/** Sets the flow control settings. */
/**
* Sets the flow control settings.
*
* <p>In the example below, the {@Subscriber} will make sure that
*
* <ul>
* <li>there are at most ten thousand outstanding messages, and
* <li>the combined size of outstanding messages does not exceed 1GB.
* </ul>
*
* "Outstanding messages" here means the messages that have already been given to {@link
* MessageReceiver} but not yet {@code acked()} or {@code nacked()}.
*
* <pre>{@code
* FlowControlSettings flowControlSettings =

This comment was marked as spam.

This comment was marked as spam.

* FlowControlSettings.newBuilder()
* .setMaxOutstandingElementCount(10_000L)
* .setMaxOutstandingRequestBytes(1_000_000_000L)
* .build();
* Subscriber subscriber =
* Subscriber.newBuilder(subscriptionName, receiver)
* .setFlowControlSettings(flowControlSettings)
* .build();
* }</pre>
*/
public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) {
this.flowControlSettings = Preconditions.checkNotNull(flowControlSettings);
return this;
Expand Down