Skip to content

Commit

Permalink
Updates for flowcontrol (#1687)
Browse files Browse the repository at this point in the history
* Update to accomodate changes to gax
* Regenerate with changes to bundling
* Remove smoke test, update settings, bump GAX version to 0.3.0
  • Loading branch information
michaelbausor authored Mar 7, 2017
1 parent 5cbbbae commit e92f570
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@
import static com.google.cloud.logging.spi.v2.PagedResponseWrappers.ListMonitoredResourceDescriptorsPagedResponse;

import com.google.api.MonitoredResourceDescriptor;
import com.google.api.gax.bundling.BundlingSettings;
import com.google.api.gax.bundling.RequestBuilder;
import com.google.api.gax.core.FlowControlSettings;
import com.google.api.gax.core.FlowController.LimitExceededBehavior;
import com.google.api.gax.core.GoogleCredentialsProvider;
import com.google.api.gax.core.RetrySettings;
import com.google.api.gax.grpc.BundledRequestIssuer;
import com.google.api.gax.grpc.BundlingCallSettings;
import com.google.api.gax.grpc.BundlingDescriptor;
import com.google.api.gax.grpc.BundlingSettings;
import com.google.api.gax.grpc.CallContext;
import com.google.api.gax.grpc.ChannelProvider;
import com.google.api.gax.grpc.ClientSettings;
Expand All @@ -34,7 +38,6 @@
import com.google.api.gax.grpc.PagedCallSettings;
import com.google.api.gax.grpc.PagedListDescriptor;
import com.google.api.gax.grpc.PagedListResponseFactory;
import com.google.api.gax.grpc.RequestIssuer;
import com.google.api.gax.grpc.SimpleCallSettings;
import com.google.api.gax.grpc.UnaryCallSettings;
import com.google.api.gax.grpc.UnaryCallable;
Expand All @@ -58,9 +61,7 @@
import com.google.protobuf.ExperimentalApi;
import io.grpc.Status;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import javax.annotation.Generated;
import org.joda.time.Duration;

Expand Down Expand Up @@ -398,33 +399,32 @@ public String getBundlePartitionKey(WriteLogEntriesRequest request) {
}

@Override
public WriteLogEntriesRequest mergeRequests(
Collection<WriteLogEntriesRequest> requests) {
WriteLogEntriesRequest firstRequest = requests.iterator().next();

List<LogEntry> elements = new ArrayList<>();
for (WriteLogEntriesRequest request : requests) {
elements.addAll(request.getEntriesList());
}

WriteLogEntriesRequest bundleRequest =
WriteLogEntriesRequest.newBuilder()
.setLogName(firstRequest.getLogName())
.setResource(firstRequest.getResource())
.putAllLabels(firstRequest.getLabels())
.addAllEntries(elements)
.build();
return bundleRequest;
public RequestBuilder<WriteLogEntriesRequest> getRequestBuilder() {
return new RequestBuilder<WriteLogEntriesRequest>() {
private WriteLogEntriesRequest.Builder builder;

@Override
public void appendRequest(WriteLogEntriesRequest request) {
if (builder == null) {
builder = request.toBuilder();
} else {
builder.addAllEntries(request.getEntriesList());
}
}

@Override
public WriteLogEntriesRequest build() {
return builder.build();
}
};
}

@Override
public void splitResponse(
WriteLogEntriesResponse bundleResponse,
Collection<? extends RequestIssuer<WriteLogEntriesRequest, WriteLogEntriesResponse>>
bundle) {
Collection<? extends BundledRequestIssuer<WriteLogEntriesResponse>> bundle) {
int bundleMessageIndex = 0;
for (RequestIssuer<WriteLogEntriesRequest, WriteLogEntriesResponse> responder :
bundle) {
for (BundledRequestIssuer<WriteLogEntriesResponse> responder : bundle) {
WriteLogEntriesResponse response = WriteLogEntriesResponse.newBuilder().build();
responder.setResponse(response);
}
Expand All @@ -433,10 +433,8 @@ public void splitResponse(
@Override
public void splitException(
Throwable throwable,
Collection<? extends RequestIssuer<WriteLogEntriesRequest, WriteLogEntriesResponse>>
bundle) {
for (RequestIssuer<WriteLogEntriesRequest, WriteLogEntriesResponse> responder :
bundle) {
Collection<? extends BundledRequestIssuer<WriteLogEntriesResponse>> bundle) {
for (BundledRequestIssuer<WriteLogEntriesResponse> responder : bundle) {
responder.setException(throwable);
}
}
Expand Down Expand Up @@ -556,9 +554,15 @@ private static Builder createDefault() {
builder
.writeLogEntriesSettings()
.getBundlingSettingsBuilder()
.setElementCountThreshold(100)
.setRequestByteThreshold(1024)
.setDelayThreshold(Duration.millis(10));
.setElementCountThreshold(1000)
.setRequestByteThreshold(1048576)
.setDelayThreshold(Duration.millis(50))
.setFlowControlSettings(
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(100000)
.setMaxOutstandingRequestBytes(10485760)
.setLimitExceededBehavior(LimitExceededBehavior.ThrowException)
.build());
builder
.writeLogEntriesSettings()
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("non_idempotent"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.google.cloud.pubsub.spi.v1;

import com.google.api.gax.grpc.FlowController;
import com.google.api.gax.core.FlowController;
import com.google.api.stats.Distribution;
import com.google.cloud.Clock;
import com.google.common.annotations.VisibleForTesting;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import static com.google.cloud.pubsub.spi.v1.StatusUtil.isRetryable;

import com.google.api.gax.grpc.FlowController;
import com.google.api.gax.core.FlowController;
import com.google.api.stats.Distribution;
import com.google.auth.Credentials;
import com.google.cloud.Clock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@

package com.google.cloud.pubsub.spi.v1;

import com.google.api.gax.bundling.BundlingSettings;
import com.google.api.gax.core.ApiFuture;
import com.google.api.gax.core.ApiFutureCallback;
import com.google.api.gax.core.ApiFutures;
import com.google.api.gax.core.FlowControlSettings;
import com.google.api.gax.core.FlowController;
import com.google.api.gax.core.Function;
import com.google.api.gax.core.RetrySettings;
import com.google.api.gax.grpc.BundlingSettings;
import com.google.api.gax.grpc.ChannelProvider;
import com.google.api.gax.grpc.ExecutorProvider;
import com.google.api.gax.grpc.FlowControlSettings;
import com.google.api.gax.grpc.FlowController;
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -88,7 +88,6 @@ public class Publisher {
private final LongRandom longRandom;

private final FlowControlSettings flowControlSettings;
private final boolean failOnFlowControlLimits;

private final Lock messagesBundleLock;
private List<OutstandingPublish> messagesBundle;
Expand Down Expand Up @@ -125,8 +124,7 @@ private Publisher(Builder builder) throws IOException {
this.longRandom = builder.longRandom;

flowControlSettings = builder.flowControlSettings;
failOnFlowControlLimits = builder.failOnFlowControlLimits;
this.flowController = new FlowController(flowControlSettings, failOnFlowControlLimits);
this.flowController = new FlowController(flowControlSettings);

messagesBundle = new LinkedList<>();
messagesBundleLock = new ReentrantLock();
Expand Down Expand Up @@ -173,12 +171,14 @@ public TopicName getTopicName() {
* Schedules the publishing of a message. The publishing of the message may occur immediately or
* be delayed based on the publisher bundling options.
*
* <p>Depending on chosen flow control {@link #failOnFlowControlLimits option}, the returned
* future might immediately fail with a {@link com.google.api.gax.grpc.FlowController.FlowControlException}
* or block the current thread until there are more resources available to publish.
* <p>Depending on chosen flow control {@link FlowControlSettings#getLimitExceededBehavior
* option}, the returned future might immediately fail with a {@link
* FlowController.FlowControlException} or block the current thread until there are more resources
* available to publish.
*
* <p>Example of publishing a message.
* <pre> {@code
*
* <pre>{@code
* String message = "my_message";
* ByteString data = ByteString.copyFromUtf8(message);
* PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
Expand Down Expand Up @@ -473,26 +473,19 @@ private long getMaxBundleBytes() {
}

/**
* The bundling settings configured on this {@code Publisher}. See {@link
* #failOnFlowControlLimits()}.
* The bundling settings configured on this {@code Publisher}, including whether to block publish
* calls when reaching flow control limits.
*
* <p>If {@link FlowControlSettings#getLimitExceededBehavior()} is set to {@link
* FlowController.LimitExceededBehavior#ThrowException}, a publish call will fail with either
* {@link FlowController.MaxOutstandingRequestBytesReachedException} or {@link
* FlowController.MaxOutstandingElementCountReachedException}, as appropriate, when flow control
* limits are reached.
*/
public FlowControlSettings getFlowControlSettings() {
return flowControlSettings;
}

/**
* Whether to block publish calls when reaching flow control limits (see {@link
* #getFlowControlSettings()}).
*
* <p>If set to false, a publish call will fail with either {@link
* com.google.api.gax.grpc.FlowController.MaxOutstandingRequestBytesReachedException} or {@link
* com.google.api.gax.grpc.FlowController.MaxOutstandingElementCountReachedException}, as
* appropriate, when flow contro limits are reached.
*/
public boolean failOnFlowControlLimits() {
return failOnFlowControlLimits;
}

/**
* Schedules immediate publishing of any outstanding messages and waits until all are processed.
*
Expand Down Expand Up @@ -619,7 +612,6 @@ public long nextLong(long least, long bound) {

// Client-side flow control options
FlowControlSettings flowControlSettings = FlowControlSettings.getDefaultInstance();
boolean failOnFlowControlLimits;

RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS;
LongRandom longRandom = DEFAULT_LONG_RANDOM;
Expand Down Expand Up @@ -665,20 +657,6 @@ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) {
return this;
}

/**
* Whether to fail publish when reaching any of the flow control limits, with either a {@link
* com.google.api.gax.grpc.FlowController.MaxOutstandingRequestBytesReachedException} or {@link
* com.google.api.gax.grpc.FlowController.MaxOutstandingElementCountReachedException} as
* appropriate.
*
* <p>If set to false, then publish operations will block the current thread until the
* outstanding requests go under the limits.
*/
public Builder setFailOnFlowControlLimits(boolean fail) {
failOnFlowControlLimits = fail;
return this;
}

/** Configures the Publisher's retry parameters. */
public Builder setRetrySettings(RetrySettings retrySettings) {
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@
import static com.google.cloud.pubsub.spi.v1.PagedResponseWrappers.ListTopicSubscriptionsPagedResponse;
import static com.google.cloud.pubsub.spi.v1.PagedResponseWrappers.ListTopicsPagedResponse;

import com.google.api.gax.bundling.BundlingSettings;
import com.google.api.gax.bundling.RequestBuilder;
import com.google.api.gax.core.FlowControlSettings;
import com.google.api.gax.core.FlowController.LimitExceededBehavior;
import com.google.api.gax.core.GoogleCredentialsProvider;
import com.google.api.gax.core.RetrySettings;
import com.google.api.gax.grpc.BundledRequestIssuer;
import com.google.api.gax.grpc.BundlingCallSettings;
import com.google.api.gax.grpc.BundlingDescriptor;
import com.google.api.gax.grpc.BundlingSettings;
import com.google.api.gax.grpc.CallContext;
import com.google.api.gax.grpc.ChannelProvider;
import com.google.api.gax.grpc.ClientSettings;
Expand All @@ -32,7 +36,6 @@
import com.google.api.gax.grpc.PagedCallSettings;
import com.google.api.gax.grpc.PagedListDescriptor;
import com.google.api.gax.grpc.PagedListResponseFactory;
import com.google.api.gax.grpc.RequestIssuer;
import com.google.api.gax.grpc.SimpleCallSettings;
import com.google.api.gax.grpc.UnaryCallSettings;
import com.google.api.gax.grpc.UnaryCallable;
Expand All @@ -58,7 +61,6 @@
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PublisherGrpc;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.Topic;
import io.grpc.Status;
import java.io.IOException;
Expand Down Expand Up @@ -363,30 +365,34 @@ public String getBundlePartitionKey(PublishRequest request) {
}

@Override
public PublishRequest mergeRequests(Collection<PublishRequest> requests) {
PublishRequest firstRequest = requests.iterator().next();
public RequestBuilder<PublishRequest> getRequestBuilder() {
return new RequestBuilder<PublishRequest>() {
private PublishRequest.Builder builder;

List<PubsubMessage> elements = new ArrayList<>();
for (PublishRequest request : requests) {
elements.addAll(request.getMessagesList());
}
@Override
public void appendRequest(PublishRequest request) {
if (builder == null) {
builder = request.toBuilder();
} else {
builder.addAllMessages(request.getMessagesList());
}
}

PublishRequest bundleRequest =
PublishRequest.newBuilder()
.setTopic(firstRequest.getTopic())
.addAllMessages(elements)
.build();
return bundleRequest;
@Override
public PublishRequest build() {
return builder.build();
}
};
}

@Override
public void splitResponse(
PublishResponse bundleResponse,
Collection<? extends RequestIssuer<PublishRequest, PublishResponse>> bundle) {
Collection<? extends BundledRequestIssuer<PublishResponse>> bundle) {
int bundleMessageIndex = 0;
for (RequestIssuer<PublishRequest, PublishResponse> responder : bundle) {
for (BundledRequestIssuer<PublishResponse> responder : bundle) {
List<String> subresponseElements = new ArrayList<>();
int subresponseCount = responder.getRequest().getMessagesCount();
long subresponseCount = responder.getMessageCount();
for (int i = 0; i < subresponseCount; i++) {
subresponseElements.add(bundleResponse.getMessageIds(bundleMessageIndex));
bundleMessageIndex += 1;
Expand All @@ -400,8 +406,8 @@ public void splitResponse(
@Override
public void splitException(
Throwable throwable,
Collection<? extends RequestIssuer<PublishRequest, PublishResponse>> bundle) {
for (RequestIssuer<PublishRequest, PublishResponse> responder : bundle) {
Collection<? extends BundledRequestIssuer<PublishResponse>> bundle) {
for (BundledRequestIssuer<PublishResponse> responder : bundle) {
responder.setException(throwable);
}
}
Expand Down Expand Up @@ -539,7 +545,11 @@ private static Builder createDefault() {
.getBundlingSettingsBuilder()
.setElementCountThreshold(10)
.setRequestByteThreshold(1024)
.setDelayThreshold(Duration.millis(10));
.setDelayThreshold(Duration.millis(10))
.setFlowControlSettings(
FlowControlSettings.newBuilder()
.setLimitExceededBehavior(LimitExceededBehavior.Ignore)
.build());
builder
.publishSettings()
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("one_plus_delivery"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import static com.google.cloud.pubsub.spi.v1.StatusUtil.isRetryable;

import com.google.api.gax.grpc.FlowController;
import com.google.api.gax.core.FlowController;
import com.google.api.stats.Distribution;
import com.google.auth.Credentials;
import com.google.cloud.Clock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package com.google.cloud.pubsub.spi.v1;

import com.google.api.gax.core.FlowControlSettings;
import com.google.api.gax.core.FlowController;
import com.google.api.gax.grpc.ExecutorProvider;
import com.google.api.gax.grpc.FlowControlSettings;
import com.google.api.gax.grpc.FlowController;
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
import com.google.api.stats.Distribution;
import com.google.auth.Credentials;
Expand Down Expand Up @@ -283,7 +283,7 @@ private SubscriberImpl(Builder builder) throws IOException {
Ints.saturatedCast(ackExpirationPadding.getStandardSeconds()));
clock = builder.clock.isPresent() ? builder.clock.get() : Clock.defaultClock();

flowController = new FlowController(builder.flowControlSettings, false);
flowController = new FlowController(builder.flowControlSettings);

executor = builder.executorProvider.getExecutor();
if (builder.executorProvider.shouldAutoClose()) {
Expand Down
Loading

0 comments on commit e92f570

Please sign in to comment.