Skip to content

Commit

Permalink
docs(sample): Update parallel append sample to use StreamWriterV2
Browse files Browse the repository at this point in the history
  • Loading branch information
yayi-google committed Feb 24, 2021
1 parent e41cfba commit 8a7da6d
Showing 1 changed file with 31 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,29 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1beta2.BQTableSchemaToProtoDescriptor;
import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1beta2.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1beta2.JsonToProtoMessage;
import com.google.cloud.bigquery.storage.v1beta2.ProtoRows;
import com.google.cloud.bigquery.storage.v1beta2.ProtoSchema;
import com.google.cloud.bigquery.storage.v1beta2.ProtoSchemaConverter;
import com.google.cloud.bigquery.storage.v1beta2.StreamWriterV2;
import com.google.cloud.bigquery.storage.v1beta2.TableName;
import com.google.cloud.bigquery.storage.v1beta2.WriteStream;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Int64Value;
import com.google.protobuf.Message;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONArray;
import org.json.JSONObject;

public class ParallelWriteCommittedStream {
Expand Down Expand Up @@ -151,20 +159,20 @@ private void writeToStream(
lastMetricsSuccessCount = 0;
lastMetricsFailureCount = 0;
}
// Use the JSON stream writer to send records in JSON format.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.html
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client)
.build()) {
Descriptor descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(
writeStream.getTableSchema());
ProtoSchema protoSchema = ProtoSchemaConverter.convert(descriptor);
try (StreamWriterV2 writer = StreamWriterV2.newBuilder(writeStream.getName()).build()) {
while (System.currentTimeMillis() < deadlineMillis) {
synchronized (this) {
if (error != null) {
// Stop writing once we get an error.
throw error;
}
}
ApiFuture<AppendRowsResponse> future = writer.append(createPayload(), -1);
ApiFuture<AppendRowsResponse> future =
writer.append(createAppendRequest(writeStream.getName(), descriptor, protoSchema, -1));
synchronized (this) {
inflightCount++;
}
Expand All @@ -189,17 +197,26 @@ private void waitForInflightToReachZero(Duration timeout) {
throw new RuntimeException("Timeout waiting for inflight count to reach 0");
}

private JSONArray createPayload() {
// Create a JSON object that is compatible with the table schema.
JSONArray jsonArr = new JSONArray();
private AppendRowsRequest createAppendRequest(
String streamName, Descriptor descriptor, ProtoSchema protoSchema, long offset) {
ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
for (int i = 0; i < BATCH_SIZE; i++) {
byte[] payload = new byte[ROW_SIZE];
ThreadLocalRandom.current().nextBytes(payload);
JSONObject record = new JSONObject();
record.put("col1", new String(payload));
jsonArr.put(record);
Message protoMessage = JsonToProtoMessage.convertJsonToProtoMessage(descriptor, record);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
}
return jsonArr;
AppendRowsRequest.ProtoData.Builder data = AppendRowsRequest.ProtoData.newBuilder();
data.setWriterSchema(protoSchema);
data.setRows(rowsBuilder.build());
AppendRowsRequest.Builder request = AppendRowsRequest.newBuilder().setProtoRows(data.build());
request.setWriteStream(streamName);
if (offset >= 0) {
request.setOffset(Int64Value.of(offset));
}
return request.build();
}

private void sleepIgnoringInterruption(Duration duration) {
Expand Down

0 comments on commit 8a7da6d

Please sign in to comment.