Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(sample): Update parallel append sample to use StreamWriterV2 #883

Merged
merged 6 commits into from
Feb 25, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,29 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1beta2.BQTableSchemaToProtoDescriptor;
import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1beta2.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1beta2.JsonToProtoMessage;
import com.google.cloud.bigquery.storage.v1beta2.ProtoRows;
import com.google.cloud.bigquery.storage.v1beta2.ProtoSchema;
import com.google.cloud.bigquery.storage.v1beta2.ProtoSchemaConverter;
import com.google.cloud.bigquery.storage.v1beta2.StreamWriterV2;
import com.google.cloud.bigquery.storage.v1beta2.TableName;
import com.google.cloud.bigquery.storage.v1beta2.WriteStream;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Int64Value;
import com.google.protobuf.Message;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONArray;
import org.json.JSONObject;

public class ParallelWriteCommittedStream {
Expand Down Expand Up @@ -151,20 +159,20 @@ private void writeToStream(
lastMetricsSuccessCount = 0;
lastMetricsFailureCount = 0;
}
// Use the JSON stream writer to send records in JSON format.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.html
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client)
.build()) {
Descriptor descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(
writeStream.getTableSchema());
ProtoSchema protoSchema = ProtoSchemaConverter.convert(descriptor);
try (StreamWriterV2 writer = StreamWriterV2.newBuilder(writeStream.getName()).build()) {
while (System.currentTimeMillis() < deadlineMillis) {
synchronized (this) {
if (error != null) {
// Stop writing once we get an error.
throw error;
}
}
ApiFuture<AppendRowsResponse> future = writer.append(createPayload(), -1);
ApiFuture<AppendRowsResponse> future =
writer.append(createAppendRequest(writeStream.getName(), descriptor, protoSchema, -1));
synchronized (this) {
inflightCount++;
}
Expand All @@ -189,17 +197,26 @@ private void waitForInflightToReachZero(Duration timeout) {
throw new RuntimeException("Timeout waiting for inflight count to reach 0");
}

private JSONArray createPayload() {
// Create a JSON object that is compatible with the table schema.
JSONArray jsonArr = new JSONArray();
private AppendRowsRequest createAppendRequest(
String streamName, Descriptor descriptor, ProtoSchema protoSchema, long offset) {
ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
for (int i = 0; i < BATCH_SIZE; i++) {
byte[] payload = new byte[ROW_SIZE];
ThreadLocalRandom.current().nextBytes(payload);
JSONObject record = new JSONObject();
record.put("col1", new String(payload));
jsonArr.put(record);
Message protoMessage = JsonToProtoMessage.convertJsonToProtoMessage(descriptor, record);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
}
return jsonArr;
AppendRowsRequest.ProtoData.Builder data = AppendRowsRequest.ProtoData.newBuilder();
data.setWriterSchema(protoSchema);
data.setRows(rowsBuilder.build());
AppendRowsRequest.Builder request = AppendRowsRequest.newBuilder().setProtoRows(data.build());
request.setWriteStream(streamName);
if (offset >= 0) {
request.setOffset(Int64Value.of(offset));
}
return request.build();
}

private void sleepIgnoringInterruption(Duration duration) {
Expand Down