From 8a7da6da4e1f8b89d3de4347566af249cab40e16 Mon Sep 17 00:00:00 2001 From: yayi Date: Wed, 24 Feb 2021 14:53:20 -0800 Subject: [PATCH] docs(sample): Update parallel append sample to use StreamWriterV2 --- .../ParallelWriteCommittedStream.java | 45 +++++++++++++------ 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java index 71155a38c7..0de3302063 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java @@ -20,21 +20,29 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; +import com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest; import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1beta2.BQTableSchemaToProtoDescriptor; import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient; import com.google.cloud.bigquery.storage.v1beta2.CreateWriteStreamRequest; -import com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1beta2.JsonToProtoMessage; +import com.google.cloud.bigquery.storage.v1beta2.ProtoRows; +import com.google.cloud.bigquery.storage.v1beta2.ProtoSchema; +import com.google.cloud.bigquery.storage.v1beta2.ProtoSchemaConverter; +import com.google.cloud.bigquery.storage.v1beta2.StreamWriterV2; import com.google.cloud.bigquery.storage.v1beta2.TableName; import com.google.cloud.bigquery.storage.v1beta2.WriteStream; import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.DescriptorValidationException; +import com.google.protobuf.Int64Value; +import com.google.protobuf.Message; import java.io.IOException; import java.time.Duration; import java.util.concurrent.ThreadLocalRandom; import java.util.logging.Logger; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; -import org.json.JSONArray; import org.json.JSONObject; public class ParallelWriteCommittedStream { @@ -151,12 +159,11 @@ private void writeToStream( lastMetricsSuccessCount = 0; lastMetricsFailureCount = 0; } - // Use the JSON stream writer to send records in JSON format. - // For more information about JsonStreamWriter, see: - // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.html - try (JsonStreamWriter writer = - JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client) - .build()) { + Descriptor descriptor = + BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor( + writeStream.getTableSchema()); + ProtoSchema protoSchema = ProtoSchemaConverter.convert(descriptor); + try (StreamWriterV2 writer = StreamWriterV2.newBuilder(writeStream.getName()).build()) { while (System.currentTimeMillis() < deadlineMillis) { synchronized (this) { if (error != null) { @@ -164,7 +171,8 @@ private void writeToStream( throw error; } } - ApiFuture future = writer.append(createPayload(), -1); + ApiFuture future = + writer.append(createAppendRequest(writeStream.getName(), descriptor, protoSchema, -1)); synchronized (this) { inflightCount++; } @@ -189,17 +197,26 @@ private void waitForInflightToReachZero(Duration timeout) { throw new RuntimeException("Timeout waiting for inflight count to reach 0"); } - private JSONArray createPayload() { - // Create a JSON object that is compatible with the table schema. - JSONArray jsonArr = new JSONArray(); + private AppendRowsRequest createAppendRequest( + String streamName, Descriptor descriptor, ProtoSchema protoSchema, long offset) { + ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder(); for (int i = 0; i < BATCH_SIZE; i++) { byte[] payload = new byte[ROW_SIZE]; ThreadLocalRandom.current().nextBytes(payload); JSONObject record = new JSONObject(); record.put("col1", new String(payload)); - jsonArr.put(record); + Message protoMessage = JsonToProtoMessage.convertJsonToProtoMessage(descriptor, record); + rowsBuilder.addSerializedRows(protoMessage.toByteString()); } - return jsonArr; + AppendRowsRequest.ProtoData.Builder data = AppendRowsRequest.ProtoData.newBuilder(); + data.setWriterSchema(protoSchema); + data.setRows(rowsBuilder.build()); + AppendRowsRequest.Builder request = AppendRowsRequest.newBuilder().setProtoRows(data.build()); + request.setWriteStream(streamName); + if (offset >= 0) { + request.setOffset(Int64Value.of(offset)); + } + return request.build(); } private void sleepIgnoringInterruption(Duration duration) {