diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index 630e06c62e..40af7716f1 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -18,17 +18,23 @@ // [START bigquerystorage_jsonstreamwriter_default] import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.Exceptions; +import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException; import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; import com.google.cloud.bigquery.storage.v1.TableName; import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Descriptors.DescriptorValidationException; import java.io.IOException; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.Phaser; +import javax.annotation.concurrent.GuardedBy; import org.json.JSONArray; import org.json.JSONObject; @@ -45,36 +51,119 @@ public static void runWriteToDefaultStream() public static void writeToDefaultStream(String projectId, String datasetName, String tableName) throws DescriptorValidationException, InterruptedException, IOException { - BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); - Table table = bigquery.getTable(datasetName, tableName); TableName parentTable = TableName.of(projectId, datasetName, tableName); - Schema schema = table.getDefinition().getSchema(); - TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema); - - // Use the JSON stream writer to send records in JSON format. - // For more information about JsonStreamWriter, see: - // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html - try (JsonStreamWriter writer = - JsonStreamWriter.newBuilder(parentTable.toString(), tableSchema).build()) { - // Write two batches to the stream, each with 10 JSON records. A writer should be used for as - // much writes as possible. Creating a writer for just one write is an antipattern. - for (int i = 0; i < 2; i++) { - // Create a JSON object that is compatible with the table schema. - JSONArray jsonArr = new JSONArray(); - for (int j = 0; j < 10; j++) { - JSONObject record = new JSONObject(); - record.put("test_string", String.format("record %03d-%03d", i, j)); - jsonArr.put(record); + + DataWriter writer = new DataWriter(); + // One time initialization. + writer.initialize(parentTable); + + // Write two batches of fake data to the stream, each with 10 JSON records. Data may be + // batched up to the maximum request size: + // https://cloud.google.com/bigquery/quotas#write-api-limits + for (int i = 0; i < 2; i++) { + // Create a JSON object that is compatible with the table schema. + JSONArray jsonArr = new JSONArray(); + for (int j = 0; j < 10; j++) { + JSONObject record = new JSONObject(); + record.put("test_string", String.format("record %03d-%03d", i, j)); + jsonArr.put(record); + } + + writer.append(jsonArr); + } + + // Final cleanup for the stream. + writer.cleanup(); + System.out.println("Appended records successfully."); + } + + private static class DataWriter { + private JsonStreamWriter streamWriter; + // Track the number of in-flight requests to wait for all responses before shutting down. + private final Phaser inflightRequestCount = new Phaser(1); + + private final Object lock = new Object(); + + @GuardedBy("lock") + private RuntimeException error = null; + + public void initialize(TableName parentTable) + throws DescriptorValidationException, IOException, InterruptedException { + // Retrive table schema information. + BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); + Table table = bigquery.getTable(parentTable.getDataset(), parentTable.getTable()); + Schema schema = table.getDefinition().getSchema(); + TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema); + + // Use the JSON stream writer to send records in JSON format. Specify the table name to write + // to the default stream. + // For more information about JsonStreamWriter, see: + // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html + streamWriter = JsonStreamWriter.newBuilder(parentTable.toString(), tableSchema).build(); + } + + public void append(JSONArray data) throws DescriptorValidationException, IOException { + synchronized (this.lock) { + // If earlier appends have failed, we need to reset before continuing. + if (this.error != null) { + throw this.error; + } + } + // Append asynchronously for increased throughput. + ApiFuture future = streamWriter.append(data); + ApiFutures.addCallback( + future, new AppendCompleteCallback(this), MoreExecutors.directExecutor()); + // Increase the count of in-flight requests. + inflightRequestCount.register(); + } + + public void cleanup() { + // Wait for all in-flight requests to complete. + inflightRequestCount.arriveAndAwaitAdvance(); + + // Close the connection to the server. + streamWriter.close(); + + // Verify that no error occurred in the stream. + synchronized (this.lock) { + if (this.error != null) { + throw this.error; + } + } + } + + static class AppendCompleteCallback implements ApiFutureCallback { + + private final DataWriter parent; + + public AppendCompleteCallback(DataWriter parent) { + this.parent = parent; + } + + public void onSuccess(AppendRowsResponse response) { + System.out.format("Append success\n"); + done(); + } + + public void onFailure(Throwable throwable) { + // If the wrapped exception is a StatusRuntimeException, check the state of the operation. + // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, + // see: https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html + synchronized (this.parent.lock) { + if (this.parent.error == null) { + StorageException storageException = Exceptions.toStorageException(throwable); + this.parent.error = + (storageException != null) ? storageException : new RuntimeException(throwable); + } } - ApiFuture future = writer.append(jsonArr); - AppendRowsResponse response = future.get(); + System.out.format("Error: %s\n", throwable.toString()); + done(); + } + + private void done() { + // Reduce the count of in-flight requests. + this.parent.inflightRequestCount.arriveAndDeregister(); } - System.out.println("Appended records successfully."); - } catch (ExecutionException e) { - // If the wrapped exception is a StatusRuntimeException, check the state of the operation. - // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see: - // https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html - System.out.println("Failed to append records. \n" + e.toString()); } } }