diff --git a/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/LoggingServiceV2Settings.java b/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/LoggingServiceV2Settings.java index adf9ee22181f..08a31bafc375 100644 --- a/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/LoggingServiceV2Settings.java +++ b/google-cloud-logging/src/main/java/com/google/cloud/logging/spi/v2/LoggingServiceV2Settings.java @@ -20,11 +20,15 @@ import static com.google.cloud.logging.spi.v2.PagedResponseWrappers.ListMonitoredResourceDescriptorsPagedResponse; import com.google.api.MonitoredResourceDescriptor; +import com.google.api.gax.bundling.BundlingSettings; +import com.google.api.gax.bundling.RequestBuilder; +import com.google.api.gax.core.FlowControlSettings; +import com.google.api.gax.core.FlowController.LimitExceededBehavior; import com.google.api.gax.core.GoogleCredentialsProvider; import com.google.api.gax.core.RetrySettings; +import com.google.api.gax.grpc.BundledRequestIssuer; import com.google.api.gax.grpc.BundlingCallSettings; import com.google.api.gax.grpc.BundlingDescriptor; -import com.google.api.gax.grpc.BundlingSettings; import com.google.api.gax.grpc.CallContext; import com.google.api.gax.grpc.ChannelProvider; import com.google.api.gax.grpc.ClientSettings; @@ -34,7 +38,6 @@ import com.google.api.gax.grpc.PagedCallSettings; import com.google.api.gax.grpc.PagedListDescriptor; import com.google.api.gax.grpc.PagedListResponseFactory; -import com.google.api.gax.grpc.RequestIssuer; import com.google.api.gax.grpc.SimpleCallSettings; import com.google.api.gax.grpc.UnaryCallSettings; import com.google.api.gax.grpc.UnaryCallable; @@ -58,9 +61,7 @@ import com.google.protobuf.ExperimentalApi; import io.grpc.Status; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; -import java.util.List; import javax.annotation.Generated; import org.joda.time.Duration; @@ -398,33 +399,32 @@ public String getBundlePartitionKey(WriteLogEntriesRequest request) { } @Override - public WriteLogEntriesRequest mergeRequests( - Collection requests) { - WriteLogEntriesRequest firstRequest = requests.iterator().next(); - - List elements = new ArrayList<>(); - for (WriteLogEntriesRequest request : requests) { - elements.addAll(request.getEntriesList()); - } - - WriteLogEntriesRequest bundleRequest = - WriteLogEntriesRequest.newBuilder() - .setLogName(firstRequest.getLogName()) - .setResource(firstRequest.getResource()) - .putAllLabels(firstRequest.getLabels()) - .addAllEntries(elements) - .build(); - return bundleRequest; + public RequestBuilder getRequestBuilder() { + return new RequestBuilder() { + private WriteLogEntriesRequest.Builder builder; + + @Override + public void appendRequest(WriteLogEntriesRequest request) { + if (builder == null) { + builder = request.toBuilder(); + } else { + builder.addAllEntries(request.getEntriesList()); + } + } + + @Override + public WriteLogEntriesRequest build() { + return builder.build(); + } + }; } @Override public void splitResponse( WriteLogEntriesResponse bundleResponse, - Collection> - bundle) { + Collection> bundle) { int bundleMessageIndex = 0; - for (RequestIssuer responder : - bundle) { + for (BundledRequestIssuer responder : bundle) { WriteLogEntriesResponse response = WriteLogEntriesResponse.newBuilder().build(); responder.setResponse(response); } @@ -433,10 +433,8 @@ public void splitResponse( @Override public void splitException( Throwable throwable, - Collection> - bundle) { - for (RequestIssuer responder : - bundle) { + Collection> bundle) { + for (BundledRequestIssuer responder : bundle) { responder.setException(throwable); } } @@ -556,9 +554,15 @@ private static Builder createDefault() { builder .writeLogEntriesSettings() .getBundlingSettingsBuilder() - .setElementCountThreshold(100) - .setRequestByteThreshold(1024) - .setDelayThreshold(Duration.millis(10)); + .setElementCountThreshold(1000) + .setRequestByteThreshold(1048576) + .setDelayThreshold(Duration.millis(50)) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setMaxOutstandingElementCount(100000) + .setMaxOutstandingRequestBytes(10485760) + .setLimitExceededBehavior(LimitExceededBehavior.ThrowException) + .build()); builder .writeLogEntriesSettings() .setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("non_idempotent")) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java index c43f03bc1c6b..ce0bd5c8cc49 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java @@ -16,7 +16,7 @@ package com.google.cloud.pubsub.spi.v1; -import com.google.api.gax.grpc.FlowController; +import com.google.api.gax.core.FlowController; import com.google.api.stats.Distribution; import com.google.cloud.Clock; import com.google.common.annotations.VisibleForTesting; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java index 2c3b6d7fa84c..e2a8073f431f 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java @@ -18,7 +18,7 @@ import static com.google.cloud.pubsub.spi.v1.StatusUtil.isRetryable; -import com.google.api.gax.grpc.FlowController; +import com.google.api.gax.core.FlowController; import com.google.api.stats.Distribution; import com.google.auth.Credentials; import com.google.cloud.Clock; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java index ebb0329d9b12..9ba3bfe928ca 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java @@ -16,16 +16,16 @@ package com.google.cloud.pubsub.spi.v1; +import com.google.api.gax.bundling.BundlingSettings; import com.google.api.gax.core.ApiFuture; import com.google.api.gax.core.ApiFutureCallback; import com.google.api.gax.core.ApiFutures; +import com.google.api.gax.core.FlowControlSettings; +import com.google.api.gax.core.FlowController; import com.google.api.gax.core.Function; import com.google.api.gax.core.RetrySettings; -import com.google.api.gax.grpc.BundlingSettings; import com.google.api.gax.grpc.ChannelProvider; import com.google.api.gax.grpc.ExecutorProvider; -import com.google.api.gax.grpc.FlowControlSettings; -import com.google.api.gax.grpc.FlowController; import com.google.api.gax.grpc.InstantiatingExecutorProvider; import com.google.auth.oauth2.GoogleCredentials; import com.google.common.annotations.VisibleForTesting; @@ -88,7 +88,6 @@ public class Publisher { private final LongRandom longRandom; private final FlowControlSettings flowControlSettings; - private final boolean failOnFlowControlLimits; private final Lock messagesBundleLock; private List messagesBundle; @@ -125,8 +124,7 @@ private Publisher(Builder builder) throws IOException { this.longRandom = builder.longRandom; flowControlSettings = builder.flowControlSettings; - failOnFlowControlLimits = builder.failOnFlowControlLimits; - this.flowController = new FlowController(flowControlSettings, failOnFlowControlLimits); + this.flowController = new FlowController(flowControlSettings); messagesBundle = new LinkedList<>(); messagesBundleLock = new ReentrantLock(); @@ -173,12 +171,14 @@ public TopicName getTopicName() { * Schedules the publishing of a message. The publishing of the message may occur immediately or * be delayed based on the publisher bundling options. * - *

Depending on chosen flow control {@link #failOnFlowControlLimits option}, the returned - * future might immediately fail with a {@link com.google.api.gax.grpc.FlowController.FlowControlException} - * or block the current thread until there are more resources available to publish. + *

Depending on chosen flow control {@link FlowControlSettings#getLimitExceededBehavior + * option}, the returned future might immediately fail with a {@link + * FlowController.FlowControlException} or block the current thread until there are more resources + * available to publish. * *

Example of publishing a message. - *

 {@code
+   *
+   * 
{@code
    * String message = "my_message";
    * ByteString data = ByteString.copyFromUtf8(message);
    * PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
@@ -473,26 +473,19 @@ private long getMaxBundleBytes() {
   }
 
   /**
-   * The bundling settings configured on this {@code Publisher}. See {@link
-   * #failOnFlowControlLimits()}.
+   * The bundling settings configured on this {@code Publisher}, including whether to block publish
+   * calls when reaching flow control limits.
+   *
+   * 

If {@link FlowControlSettings#getLimitExceededBehavior()} is set to {@link + * FlowController.LimitExceededBehavior#ThrowException}, a publish call will fail with either + * {@link FlowController.MaxOutstandingRequestBytesReachedException} or {@link + * FlowController.MaxOutstandingElementCountReachedException}, as appropriate, when flow control + * limits are reached. */ public FlowControlSettings getFlowControlSettings() { return flowControlSettings; } - /** - * Whether to block publish calls when reaching flow control limits (see {@link - * #getFlowControlSettings()}). - * - *

If set to false, a publish call will fail with either {@link - * com.google.api.gax.grpc.FlowController.MaxOutstandingRequestBytesReachedException} or {@link - * com.google.api.gax.grpc.FlowController.MaxOutstandingElementCountReachedException}, as - * appropriate, when flow contro limits are reached. - */ - public boolean failOnFlowControlLimits() { - return failOnFlowControlLimits; - } - /** * Schedules immediate publishing of any outstanding messages and waits until all are processed. * @@ -619,7 +612,6 @@ public long nextLong(long least, long bound) { // Client-side flow control options FlowControlSettings flowControlSettings = FlowControlSettings.getDefaultInstance(); - boolean failOnFlowControlLimits; RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS; LongRandom longRandom = DEFAULT_LONG_RANDOM; @@ -665,20 +657,6 @@ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) { return this; } - /** - * Whether to fail publish when reaching any of the flow control limits, with either a {@link - * com.google.api.gax.grpc.FlowController.MaxOutstandingRequestBytesReachedException} or {@link - * com.google.api.gax.grpc.FlowController.MaxOutstandingElementCountReachedException} as - * appropriate. - * - *

If set to false, then publish operations will block the current thread until the - * outstanding requests go under the limits. - */ - public Builder setFailOnFlowControlLimits(boolean fail) { - failOnFlowControlLimits = fail; - return this; - } - /** Configures the Publisher's retry parameters. */ public Builder setRetrySettings(RetrySettings retrySettings) { Preconditions.checkArgument( diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherSettings.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherSettings.java index 535487c4a7f3..82813efbfef5 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherSettings.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherSettings.java @@ -18,11 +18,15 @@ import static com.google.cloud.pubsub.spi.v1.PagedResponseWrappers.ListTopicSubscriptionsPagedResponse; import static com.google.cloud.pubsub.spi.v1.PagedResponseWrappers.ListTopicsPagedResponse; +import com.google.api.gax.bundling.BundlingSettings; +import com.google.api.gax.bundling.RequestBuilder; +import com.google.api.gax.core.FlowControlSettings; +import com.google.api.gax.core.FlowController.LimitExceededBehavior; import com.google.api.gax.core.GoogleCredentialsProvider; import com.google.api.gax.core.RetrySettings; +import com.google.api.gax.grpc.BundledRequestIssuer; import com.google.api.gax.grpc.BundlingCallSettings; import com.google.api.gax.grpc.BundlingDescriptor; -import com.google.api.gax.grpc.BundlingSettings; import com.google.api.gax.grpc.CallContext; import com.google.api.gax.grpc.ChannelProvider; import com.google.api.gax.grpc.ClientSettings; @@ -32,7 +36,6 @@ import com.google.api.gax.grpc.PagedCallSettings; import com.google.api.gax.grpc.PagedListDescriptor; import com.google.api.gax.grpc.PagedListResponseFactory; -import com.google.api.gax.grpc.RequestIssuer; import com.google.api.gax.grpc.SimpleCallSettings; import com.google.api.gax.grpc.UnaryCallSettings; import com.google.api.gax.grpc.UnaryCallable; @@ -58,7 +61,6 @@ import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PublisherGrpc; -import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.Topic; import io.grpc.Status; import java.io.IOException; @@ -363,30 +365,34 @@ public String getBundlePartitionKey(PublishRequest request) { } @Override - public PublishRequest mergeRequests(Collection requests) { - PublishRequest firstRequest = requests.iterator().next(); + public RequestBuilder getRequestBuilder() { + return new RequestBuilder() { + private PublishRequest.Builder builder; - List elements = new ArrayList<>(); - for (PublishRequest request : requests) { - elements.addAll(request.getMessagesList()); - } + @Override + public void appendRequest(PublishRequest request) { + if (builder == null) { + builder = request.toBuilder(); + } else { + builder.addAllMessages(request.getMessagesList()); + } + } - PublishRequest bundleRequest = - PublishRequest.newBuilder() - .setTopic(firstRequest.getTopic()) - .addAllMessages(elements) - .build(); - return bundleRequest; + @Override + public PublishRequest build() { + return builder.build(); + } + }; } @Override public void splitResponse( PublishResponse bundleResponse, - Collection> bundle) { + Collection> bundle) { int bundleMessageIndex = 0; - for (RequestIssuer responder : bundle) { + for (BundledRequestIssuer responder : bundle) { List subresponseElements = new ArrayList<>(); - int subresponseCount = responder.getRequest().getMessagesCount(); + long subresponseCount = responder.getMessageCount(); for (int i = 0; i < subresponseCount; i++) { subresponseElements.add(bundleResponse.getMessageIds(bundleMessageIndex)); bundleMessageIndex += 1; @@ -400,8 +406,8 @@ public void splitResponse( @Override public void splitException( Throwable throwable, - Collection> bundle) { - for (RequestIssuer responder : bundle) { + Collection> bundle) { + for (BundledRequestIssuer responder : bundle) { responder.setException(throwable); } } @@ -539,7 +545,11 @@ private static Builder createDefault() { .getBundlingSettingsBuilder() .setElementCountThreshold(10) .setRequestByteThreshold(1024) - .setDelayThreshold(Duration.millis(10)); + .setDelayThreshold(Duration.millis(10)) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior(LimitExceededBehavior.Ignore) + .build()); builder .publishSettings() .setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("one_plus_delivery")) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java index 8f2d121b3e6e..7c3a6a8b7c4b 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java @@ -18,7 +18,7 @@ import static com.google.cloud.pubsub.spi.v1.StatusUtil.isRetryable; -import com.google.api.gax.grpc.FlowController; +import com.google.api.gax.core.FlowController; import com.google.api.stats.Distribution; import com.google.auth.Credentials; import com.google.cloud.Clock; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java index 902226db408f..8133e4829a68 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java @@ -16,9 +16,9 @@ package com.google.cloud.pubsub.spi.v1; +import com.google.api.gax.core.FlowControlSettings; +import com.google.api.gax.core.FlowController; import com.google.api.gax.grpc.ExecutorProvider; -import com.google.api.gax.grpc.FlowControlSettings; -import com.google.api.gax.grpc.FlowController; import com.google.api.gax.grpc.InstantiatingExecutorProvider; import com.google.api.stats.Distribution; import com.google.auth.Credentials; @@ -283,7 +283,7 @@ private SubscriberImpl(Builder builder) throws IOException { Ints.saturatedCast(ackExpirationPadding.getStandardSeconds())); clock = builder.clock.isPresent() ? builder.clock.get() : Clock.defaultClock(); - flowController = new FlowController(builder.flowControlSettings, false); + flowController = new FlowController(builder.flowControlSettings); executor = builder.executorProvider.getExecutor(); if (builder.executorProvider.shouldAutoClose()) { diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java index c4c919fc04dd..8a96e8823928 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java @@ -18,17 +18,17 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.times; -import com.google.api.gax.grpc.BundlingSettings; +import com.google.api.gax.bundling.BundlingSettings; +import com.google.api.gax.core.FlowControlSettings; +import com.google.api.gax.core.FlowController.LimitExceededBehavior; import com.google.api.gax.core.ApiFuture; import com.google.api.gax.grpc.ChannelProvider; import com.google.api.gax.grpc.ExecutorProvider; import com.google.api.gax.grpc.FixedExecutorProvider; -import com.google.api.gax.grpc.FlowControlSettings; import com.google.api.gax.grpc.InstantiatingExecutorProvider; import com.google.cloud.pubsub.spi.v1.Publisher.Builder; import com.google.protobuf.ByteString; @@ -372,7 +372,6 @@ public void testPublisherGetters() throws Exception { Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC); builder.setChannelProvider(TEST_CHANNEL_PROVIDER); builder.setExecutorProvider(SINGLE_THREAD_EXECUTOR); - builder.setFailOnFlowControlLimits(true); builder.setBundlingSettings( BundlingSettings.newBuilder() .setRequestByteThreshold(10) @@ -383,6 +382,7 @@ public void testPublisherGetters() throws Exception { FlowControlSettings.newBuilder() .setMaxOutstandingRequestBytes(13) .setMaxOutstandingElementCount(14) + .setLimitExceededBehavior(LimitExceededBehavior.ThrowException) .build()); Publisher publisher = builder.build(); @@ -392,7 +392,9 @@ public void testPublisherGetters() throws Exception { assertEquals(12, (long) publisher.getBundlingSettings().getElementCountThreshold()); assertEquals(13, (long) publisher.getFlowControlSettings().getMaxOutstandingRequestBytes()); assertEquals(14, (long) publisher.getFlowControlSettings().getMaxOutstandingElementCount()); - assertTrue(publisher.failOnFlowControlLimits()); + assertEquals( + LimitExceededBehavior.ThrowException, + publisher.getFlowControlSettings().getLimitExceededBehavior()); publisher.shutdown(); } @@ -401,7 +403,8 @@ public void testBuilderParametersAndDefaults() { Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC); assertEquals(TEST_TOPIC, builder.topicName); assertEquals(Publisher.Builder.DEFAULT_EXECUTOR_PROVIDER, builder.executorProvider); - assertFalse(builder.failOnFlowControlLimits); + assertEquals( + LimitExceededBehavior.Block, builder.flowControlSettings.getLimitExceededBehavior()); assertEquals( Publisher.Builder.DEFAULT_REQUEST_BYTES_THRESHOLD, builder.bundlingSettings.getRequestByteThreshold().longValue()); diff --git a/pom.xml b/pom.xml index ee844891e030..207164f29494 100644 --- a/pom.xml +++ b/pom.xml @@ -92,7 +92,7 @@ github 0.6.0 1.0.3 - 0.2.0 + 0.3.0 0.1.5 0.9.5-alpha-SNAPSHOT 0.9.5-beta-SNAPSHOT