diff --git a/README.md b/README.md index a86e794a70..df74ba0bd5 100644 --- a/README.md +++ b/README.md @@ -56,13 +56,13 @@ implementation 'com.google.cloud:google-cloud-bigquerystorage' If you are using Gradle without BOM, add this to your dependencies ```Groovy -implementation 'com.google.cloud:google-cloud-bigquerystorage:2.12.2' +implementation 'com.google.cloud:google-cloud-bigquerystorage:2.13.0' ``` If you are using SBT, add this to your dependencies ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.12.2" +libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.13.0" ``` ## Authentication diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index 8f09471762..c4463715f4 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -78,6 +78,10 @@ private JsonStreamWriter(Builder builder) this.protoSchema = ProtoSchemaConverter.convert(this.descriptor); this.totalMessageSize = protoSchema.getSerializedSize(); streamWriterBuilder.setWriterSchema(protoSchema); + if (builder.flowControlSettings != null) { + streamWriterBuilder.setLimitExceededBehavior( + builder.flowControlSettings.getLimitExceededBehavior()); + } setStreamWriterSettings( builder.channelProvider, builder.credentialsProvider, @@ -214,6 +218,10 @@ private void setStreamWriterSettings( streamWriterBuilder.setMaxInflightRequests( flowControlSettings.getMaxOutstandingElementCount()); } + if (flowControlSettings.getLimitExceededBehavior() != null) { + streamWriterBuilder.setLimitExceededBehavior( + flowControlSettings.getLimitExceededBehavior()); + } } } @@ -335,7 +343,6 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) { * @return Builder */ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) { - Preconditions.checkNotNull(flowControlSettings, "FlowControlSettings is null."); this.flowControlSettings = Preconditions.checkNotNull(flowControlSettings, "FlowControlSettings is null."); return this; diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 25aae02062..ef4bc8cafa 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -17,6 +17,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.SettableApiFuture; +import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.TransportChannelProvider; @@ -74,6 +75,11 @@ public class StreamWriter implements AutoCloseable { */ private final long maxInflightBytes; + /* + * Behavior when inflight queue is exceeded. Only supports Block or Throw, default is Block. + */ + private final FlowController.LimitExceededBehavior limitExceededBehavior; + /* * TraceId for debugging purpose. */ @@ -190,6 +196,7 @@ private StreamWriter(Builder builder) throws IOException { this.writerSchema = builder.writerSchema; this.maxInflightRequests = builder.maxInflightRequest; this.maxInflightBytes = builder.maxInflightBytes; + this.limitExceededBehavior = builder.limitExceededBehavior; this.traceId = builder.traceId; this.waitingRequestQueue = new LinkedList(); this.inflightRequestQueue = new LinkedList(); @@ -332,18 +339,29 @@ private void maybeWaitForInflightQuota() { long start_time = System.currentTimeMillis(); while (this.inflightRequests >= this.maxInflightRequests || this.inflightBytes >= this.maxInflightBytes) { - try { - inflightReduced.await(100, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - log.warning( - "Interrupted while waiting for inflight quota. Stream: " - + streamName - + " Error: " - + e.toString()); + if (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException) { + throw new StatusRuntimeException( + Status.fromCode(Code.RESOURCE_EXHAUSTED) + .withDescription( + "Exceeds client side inflight buffer, consider add more buffer or open more connections.")); + } else if (this.limitExceededBehavior == FlowController.LimitExceededBehavior.Ignore) { throw new StatusRuntimeException( - Status.fromCode(Code.CANCELLED) - .withCause(e) - .withDescription("Interrupted while waiting for quota.")); + Status.fromCode(Code.INVALID_ARGUMENT) + .withDescription("LimitExceededBehavior.Ignore is not supported on StreamWriter.")); + } else { + try { + inflightReduced.await(100, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.warning( + "Interrupted while waiting for inflight quota. Stream: " + + streamName + + " Error: " + + e.toString()); + throw new StatusRuntimeException( + Status.fromCode(Code.CANCELLED) + .withCause(e) + .withDescription("Interrupted while waiting for quota.")); + } } } inflightWaitSec.set((System.currentTimeMillis() - start_time) / 1000); @@ -714,6 +732,9 @@ public static final class Builder { private CredentialsProvider credentialsProvider = BigQueryWriteSettings.defaultCredentialsProviderBuilder().build(); + private FlowController.LimitExceededBehavior limitExceededBehavior = + FlowController.LimitExceededBehavior.Block; + private String traceId = null; private TableSchema updatedTableSchema = null; @@ -784,6 +805,18 @@ public Builder setTraceId(String traceId) { return this; } + /** + * Sets the limit exceeded behavior. + * + * @param limitExceededBehavior + * @return + */ + public Builder setLimitExceededBehavior( + FlowController.LimitExceededBehavior limitExceededBehavior) { + this.limitExceededBehavior = limitExceededBehavior; + return this; + } + /** Builds the {@code StreamWriterV2}. */ public StreamWriter build() throws IOException { return new StreamWriter(this); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index 0e11aca77a..2a79b01d90 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -16,9 +16,12 @@ package com.google.cloud.bigquery.storage.v1; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import com.google.api.core.ApiFuture; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; import com.google.api.gax.core.NoCredentialsProvider; @@ -31,6 +34,8 @@ import com.google.protobuf.Descriptors.DescriptorValidationException; import com.google.protobuf.Int64Value; import com.google.protobuf.Timestamp; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; import java.io.IOException; import java.util.Arrays; import java.util.UUID; @@ -42,6 +47,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.function.ThrowingRunnable; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.threeten.bp.Instant; @@ -540,4 +546,39 @@ public void testWithIgnoreUnknownFields() throws Exception { appendFuture.get(); } } + + @Test + public void testFlowControlSetting() throws Exception { + TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build(); + try (JsonStreamWriter writer = + JsonStreamWriter.newBuilder(TEST_STREAM, tableSchema) + .setChannelProvider(channelProvider) + .setCredentialsProvider(NoCredentialsProvider.create()) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException) + .setMaxOutstandingRequestBytes(1L) + .build()) + .build()) { + JSONObject foo = new JSONObject(); + foo.put("test_int", 10); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + StatusRuntimeException ex = + assertThrows( + StatusRuntimeException.class, + new ThrowingRunnable() { + @Override + public void run() throws Throwable { + writer.append(jsonArr); + } + }); + assertEquals(ex.getStatus().getCode(), Status.RESOURCE_EXHAUSTED.getCode()); + assertTrue( + ex.getStatus() + .getDescription() + .contains( + "Exceeds client side inflight buffer, consider add more buffer or open more connections")); + } + } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index d95c056e71..4962a41a6a 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertTrue; import com.google.api.core.ApiFuture; +import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.NoCredentialsProvider; import com.google.api.gax.grpc.testing.MockGrpcService; import com.google.api.gax.grpc.testing.MockServiceHelper; @@ -543,6 +544,39 @@ public void testAppendsWithTinyMaxInflightBytes() throws Exception { writer.close(); } + @Test + public void testAppendsWithTinyMaxInflightBytesThrow() throws Exception { + StreamWriter writer = + StreamWriter.newBuilder(TEST_STREAM, client) + .setWriterSchema(createProtoSchema()) + .setMaxInflightBytes(1) + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException) + .build(); + // Server will sleep 100ms before every response. + testBigQueryWrite.setResponseSleep(Duration.ofMillis(100)); + long appendCount = 10; + for (int i = 0; i < appendCount; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + StatusRuntimeException ex = + assertThrows( + StatusRuntimeException.class, + new ThrowingRunnable() { + @Override + public void run() throws Throwable { + writer.append(createProtoRows(new String[] {String.valueOf(10)}), -1); + } + }); + assertEquals(ex.getStatus().getCode(), Status.RESOURCE_EXHAUSTED.getCode()); + assertTrue( + ex.getStatus() + .getDescription() + .contains( + "Exceeds client side inflight buffer, consider add more buffer or open more connections")); + + writer.close(); + } + @Test public void testMessageTooLarge() throws Exception { StreamWriter writer = getTestStreamWriter();