diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index 0e8d669764..b507934b41 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -362,7 +362,7 @@ public Builder setCompressorName(String compressorName) { * Enable client lib automatic retries on request level errors. * *
- * Immeidate Retry code: + * Immediate Retry code: * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED * Backoff Retry code: * RESOURCE_EXHAUSTED diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/JsonWriterStreamCdc.java b/samples/snippets/src/main/java/com/example/bigquerystorage/JsonWriterStreamCdc.java index fef48095a2..0ba37ba13d 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/JsonWriterStreamCdc.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/JsonWriterStreamCdc.java @@ -19,6 +19,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; +import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.BigQueryOptions; @@ -37,6 +38,7 @@ import java.io.IOException; import org.json.JSONArray; import org.json.JSONObject; +import org.threeten.bp.Duration; public class JsonWriterStreamCdc { @@ -108,6 +110,18 @@ private static void query(String query) { public static void writeToDefaultStream( String projectId, String datasetName, String tableName, JSONArray data) throws DescriptorValidationException, InterruptedException, IOException { + // Configure in-stream automatic retry settings. + // Error codes that are immediately retried: + // * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED + // Error codes that are retried with exponential backoff: + // * RESOURCE_EXHAUSTED + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .build(); // To use the UPSERT functionality, the table schema needs to be padded with an additional // column "_change_type". TableSchema tableSchema = @@ -159,7 +173,9 @@ public static void writeToDefaultStream( // Use the JSON stream writer to send records in JSON format. TableName parentTable = TableName.of(projectId, datasetName, tableName); try (JsonStreamWriter writer = - JsonStreamWriter.newBuilder(parentTable.toString(), tableSchema).build()) { + JsonStreamWriter.newBuilder(parentTable.toString(), tableSchema) + .setRetrySettings(retrySettings) + .build()) { ApiFuturefuture = writer.append(data); // The append method is asynchronous. Rather than waiting for the method to complete, diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java index d7949e38cd..3680e43307 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java @@ -20,6 +20,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; +import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.BQTableSchemaToProtoDescriptor; import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; @@ -36,12 +37,12 @@ import com.google.protobuf.Descriptors.DescriptorValidationException; import com.google.protobuf.Message; import java.io.IOException; -import java.time.Duration; import java.util.concurrent.ThreadLocalRandom; import java.util.logging.Logger; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import org.json.JSONObject; +import org.threeten.bp.Duration; public class ParallelWriteCommittedStream { @@ -157,6 +158,18 @@ private void writeToStream( lastMetricsSuccessCount = 0; lastMetricsFailureCount = 0; } + // Configure in-stream automatic retry settings. + // Error codes that are immediately retried: + // * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED + // Error codes that are retried with exponential backoff: + // * RESOURCE_EXHAUSTED + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .build(); Descriptor descriptor = BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor( writeStream.getTableSchema()); @@ -164,6 +177,7 @@ private void writeToStream( try (StreamWriter writer = StreamWriter.newBuilder(writeStream.getName()) .setWriterSchema(protoSchema) + .setRetrySettings(retrySettings) .setTraceId("SAMPLE:parallel_append") .build()) { while (System.currentTimeMillis() < deadlineMillis) { diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteBufferedStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteBufferedStream.java index 26bb0d5510..718b5373d6 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteBufferedStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteBufferedStream.java @@ -18,6 +18,7 @@ // [START bigquerystorage_jsonstreamwriter_buffered] import com.google.api.core.ApiFuture; +import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; @@ -33,6 +34,7 @@ import java.util.concurrent.ExecutionException; import org.json.JSONArray; import org.json.JSONObject; +import org.threeten.bp.Duration; public class WriteBufferedStream { @@ -61,11 +63,25 @@ public static void writeBufferedStream(String projectId, String datasetName, Str .build(); WriteStream writeStream = client.createWriteStream(createWriteStreamRequest); + // Configure in-stream automatic retry settings. + // Error codes that are immediately retried: + // * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED + // Error codes that are retried with exponential backoff: + // * RESOURCE_EXHAUSTED + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .build(); + // Use the JSON stream writer to send records in JSON format. // For more information about JsonStreamWriter, see: // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.html try (JsonStreamWriter writer = JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) + .setRetrySettings(retrySettings) .build()) { // Write two batches to the stream, each with 10 JSON records. for (int i = 0; i < 2; i++) { diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java index c8c9334374..b2dfe21b00 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java @@ -20,6 +20,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; +import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; @@ -37,6 +38,7 @@ import javax.annotation.concurrent.GuardedBy; import org.json.JSONArray; import org.json.JSONObject; +import org.threeten.bp.Duration; public class WriteCommittedStream { @@ -113,11 +115,25 @@ void initialize(TableName parentTable, BigQueryWriteClient client) .build(); WriteStream writeStream = client.createWriteStream(createWriteStreamRequest); + // Configure in-stream automatic retry settings. + // Error codes that are immediately retried: + // * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED + // Error codes that are retried with exponential backoff: + // * RESOURCE_EXHAUSTED + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .build(); + // Use the JSON stream writer to send records in JSON format. // For more information about JsonStreamWriter, see: // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html streamWriter = JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client) + .setRetrySettings(retrySettings) .build(); } diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java index 76f59a0e17..e8de163ed5 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WritePendingStream.java @@ -17,10 +17,10 @@ package com.example.bigquerystorage; // [START bigquerystorage_jsonstreamwriter_pending] - import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; +import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest; import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse; @@ -41,6 +41,7 @@ import javax.annotation.concurrent.GuardedBy; import org.json.JSONArray; import org.json.JSONObject; +import org.threeten.bp.Duration; public class WritePendingStream { @@ -129,6 +130,19 @@ void initialize(TableName parentTable, BigQueryWriteClient client) // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/WriteStream.Type.html WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build(); + // Configure in-stream automatic retry settings. + // Error codes that are immediately retried: + // * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED + // Error codes that are retried with exponential backoff: + // * RESOURCE_EXHAUSTED + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .build(); + CreateWriteStreamRequest createWriteStreamRequest = CreateWriteStreamRequest.newBuilder() .setParent(parentTable.toString()) @@ -140,7 +154,9 @@ void initialize(TableName parentTable, BigQueryWriteClient client) // For more information about JsonStreamWriter, see: // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.html streamWriter = - JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()).build(); + JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) + .setRetrySettings(retrySettings) + .build(); } public void append(JSONArray data, long offset) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index feccef61f0..6c35f06018 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -17,11 +17,11 @@ package com.example.bigquerystorage; // [START bigquerystorage_jsonstreamwriter_default] - import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.gax.core.FixedExecutorProvider; +import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.QueryJobConfiguration; @@ -35,12 +35,9 @@ import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException; import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; import com.google.cloud.bigquery.storage.v1.TableName; -import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors.DescriptorValidationException; -import io.grpc.Status; -import io.grpc.Status.Code; import java.io.IOException; import java.util.Map; import java.util.concurrent.Executors; @@ -49,6 +46,7 @@ import javax.annotation.concurrent.GuardedBy; import org.json.JSONArray; import org.json.JSONObject; +import org.threeten.bp.Duration; public class WriteToDefaultStream { @@ -97,7 +95,7 @@ public static void writeToDefaultStream(String projectId, String datasetName, St jsonArr.put(record); } - writer.append(new AppendContext(jsonArr, 0)); + writer.append(new AppendContext(jsonArr)); } // Final cleanup for the stream during worker teardown. @@ -130,26 +128,15 @@ private static void verifyExpectedRowCount(TableName parentTable, int expectedRo private static class AppendContext { JSONArray data; - int retryCount = 0; - AppendContext(JSONArray data, int retryCount) { + AppendContext(JSONArray data) { this.data = data; - this.retryCount = retryCount; } } private static class DataWriter { - private static final int MAX_RETRY_COUNT = 3; private static final int MAX_RECREATE_COUNT = 3; - private static final ImmutableList RETRIABLE_ERROR_CODES = - ImmutableList.of( - Code.INTERNAL, - Code.ABORTED, - Code.CANCELLED, - Code.FAILED_PRECONDITION, - Code.DEADLINE_EXCEEDED, - Code.UNAVAILABLE); // Track the number of in-flight requests to wait for all responses before shutting down. private final Phaser inflightRequestCount = new Phaser(1); @@ -163,6 +150,19 @@ private static class DataWriter { public void initialize(TableName parentTable) throws DescriptorValidationException, IOException, InterruptedException { + // Configure in-stream automatic retry settings. + // Error codes that are immediately retried: + // * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED + // Error codes that are retried with exponential backoff: + // * RESOURCE_EXHAUSTED + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .build(); + // Use the JSON stream writer to send records in JSON format. Specify the table name to write // to the default stream. // For more information about JsonStreamWriter, see: @@ -183,6 +183,7 @@ public void initialize(TableName parentTable) // column, apply the default value to the missing value field. .setDefaultMissingValueInterpretation( AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE) + .setRetrySettings(retrySettings) .build(); } @@ -244,26 +245,6 @@ public void onSuccess(AppendRowsResponse response) { } public void onFailure(Throwable throwable) { - // If the wrapped exception is a StatusRuntimeException, check the state of the operation. - // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, - // see: https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html - Status status = Status.fromThrowable(throwable); - if (appendContext.retryCount < MAX_RETRY_COUNT - && RETRIABLE_ERROR_CODES.contains(status.getCode())) { - appendContext.retryCount++; - try { - // Since default stream appends are not ordered, we can simply retry the appends. - // Retrying with exclusive streams requires more careful consideration. - this.parent.append(appendContext); - // Mark the existing attempt as done since it's being retried. - done(); - return; - } catch (Exception e) { - // Fall through to return error. - System.out.format("Failed to retry append: %s\n", e); - } - } - if (throwable instanceof AppendSerializationError) { AppendSerializationError ase = (AppendSerializationError) throwable; Map
rowIndexToErrorMessage = ase.getRowIndexToErrorMessage(); @@ -282,7 +263,7 @@ public void onFailure(Throwable throwable) { // avoid potentially blocking while we are in a callback. if (dataNew.length() > 0) { try { - this.parent.append(new AppendContext(dataNew, 0)); + this.parent.append(new AppendContext(dataNew)); } catch (DescriptorValidationException e) { throw new RuntimeException(e); } catch (IOException e) {