diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/ProtoSchemaConverter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/ProtoSchemaConverter.java
index 81fa5fd419..7969ad5f23 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/ProtoSchemaConverter.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/ProtoSchemaConverter.java
@@ -31,7 +31,8 @@
public class ProtoSchemaConverter {
private static class StructName {
public String getName() {
- return "__S" + (count++);
+ count++;
+ return count == 1 ? "__ROOT__" : "__S" + count;
}
private int count = 0;
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java
new file mode 100644
index 0000000000..921fb2be83
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java
@@ -0,0 +1,881 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery.storage.v1alpha2;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.SettableApiFuture;
+import com.google.api.gax.batching.BatchingSettings;
+import com.google.api.gax.batching.FlowControlSettings;
+import com.google.api.gax.batching.FlowController;
+import com.google.api.gax.core.BackgroundResource;
+import com.google.api.gax.core.BackgroundResourceAggregation;
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.ExecutorAsBackgroundResource;
+import com.google.api.gax.core.ExecutorProvider;
+import com.google.api.gax.core.InstantiatingExecutorProvider;
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.api.gax.rpc.*;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.bigquery.storage.v1alpha2.Storage.AppendRowsRequest;
+import com.google.cloud.bigquery.storage.v1alpha2.Storage.AppendRowsResponse;
+import com.google.common.base.Preconditions;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.threeten.bp.Duration;
+
+/**
+ * A BigQuery Stream Writer that can be used to write data into BigQuery Table.
+ *
+ * A {@link StreamWrier} provides built-in capabilities to: handle batching of messages;
+ * controlling memory utilization (through flow control); automatic connection re-establishment and
+ * request cleanup (only keeps write schema on first request in the stream).
+ *
+ *
With customizable options that control:
+ *
+ *
+ * - Message batching: such as number of messages or max batch byte size, and batching deadline
+ *
- Inflight message control: such as number of messages or max batch byte size
+ *
+ *
+ * {@link StreamWriter} will use the credentials set on the channel, which uses application
+ * default credentials through {@link GoogleCredentials#getApplicationDefault} by default.
+ */
+public class StreamWriter implements AutoCloseable {
+ private static final Logger LOG = Logger.getLogger(StreamWriter.class.getName());
+
+ private final String streamName;
+
+ private final BatchingSettings batchingSettings;
+ private final RetrySettings retrySettings;
+ private final BigQueryWriteSettings stubSettings;
+
+ private final Lock messagesBatchLock;
+ private final MessagesBatch messagesBatch;
+
+ private BackgroundResource backgroundResources;
+ private List backgroundResourceList;
+
+ private BigQueryWriteClient stub;
+ BidiStreamingCallable bidiStreamingCallable;
+ ClientStream clientStream;
+ private final AppendResponseObserver responseObserver;
+
+ private final ScheduledExecutorService executor;
+
+ private final AtomicBoolean shutdown;
+ private final Waiter messagesWaiter;
+ private final AtomicBoolean activeAlarm;
+ private ScheduledFuture> currentAlarmFuture;
+
+ private Integer currentRetries = 0;
+
+ /** The maximum size of one request. Defined by the API. */
+ public static long getApiMaxRequestBytes() {
+ return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
+ }
+
+ /** The maximum size of in flight requests. Defined by the API. */
+ public static long getApiMaxInflightRequests() {
+ return 5000L;
+ }
+
+ private StreamWriter(Builder builder) throws IOException {
+ streamName = builder.streamName;
+
+ this.batchingSettings = builder.batchingSettings;
+ this.retrySettings = builder.retrySettings;
+ this.messagesBatch = new MessagesBatch(batchingSettings);
+ messagesBatchLock = new ReentrantLock();
+ activeAlarm = new AtomicBoolean(false);
+ executor = builder.executorProvider.getExecutor();
+ backgroundResourceList = new ArrayList<>();
+ if (builder.executorProvider.shouldAutoClose()) {
+ backgroundResourceList.add(new ExecutorAsBackgroundResource(executor));
+ }
+ messagesWaiter = new Waiter(this.batchingSettings.getFlowControlSettings());
+ responseObserver = new AppendResponseObserver(this);
+
+ stubSettings =
+ BigQueryWriteSettings.newBuilder()
+ .setCredentialsProvider(builder.credentialsProvider)
+ .setExecutorProvider(builder.executorProvider)
+ .setTransportChannelProvider(builder.channelProvider)
+ .setEndpoint(builder.endpoint)
+ .build();
+ shutdown = new AtomicBoolean(false);
+ refreshAppend();
+ }
+
+ /** Stream name we are writing to. */
+ public String getStreamNameString() {
+ return streamName;
+ }
+
+ /**
+ * Schedules the writing of a message. The write of the message may occur immediately or be
+ * delayed based on the writer batching options.
+ *
+ * Example of writing a message.
+ *
+ *
{@code
+ * AppendRowsRequest message;
+ * ApiFuture messageIdFuture = writer.append(message);
+ * ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback() {
+ * public void onSuccess(AppendRowsResponse response) {
+ * if (response.hasOffset()) {
+ * System.out.println("written with offset: " + response.getOffset());
+ * } else {
+ * System.out.println("received an in stream error: " + response.error().toString());
+ * }
+ * }
+ *
+ * public void onFailure(Throwable t) {
+ * System.out.println("failed to write: " + t);
+ * }
+ * }, MoreExecutors.directExecutor());
+ * }
+ *
+ * @param message the message in serialized format to write to BigQuery.
+ * @return the message ID wrapped in a future.
+ */
+ public ApiFuture append(AppendRowsRequest message) {
+ Preconditions.checkState(!shutdown.get(), "Cannot append on a shut-down writer.");
+
+ final AppendRequestAndFutureResponse outstandingAppend =
+ new AppendRequestAndFutureResponse(message);
+ List batchesToSend;
+ messagesBatchLock.lock();
+ try {
+ batchesToSend = messagesBatch.add(outstandingAppend);
+ // Setup the next duration based delivery alarm if there are messages batched.
+ setupAlarm();
+ if (!batchesToSend.isEmpty()) {
+ for (final InflightBatch batch : batchesToSend) {
+ LOG.fine("Scheduling a batch for immediate sending.");
+ writeBatch(batch);
+ }
+ }
+ } finally {
+ messagesBatchLock.unlock();
+ }
+
+ return outstandingAppend.appendResult;
+ }
+
+ /**
+ * Re-establishes a stream connection.
+ *
+ * @throws IOException
+ */
+ private void refreshAppend() throws IOException {
+ synchronized (this) {
+ Preconditions.checkState(!shutdown.get(), "Cannot append on a shut-down writer.");
+ if (stub != null) {
+ stub.shutdown();
+ }
+ backgroundResourceList.remove(stub);
+ stub = BigQueryWriteClient.create(stubSettings);
+ backgroundResourceList.add(stub);
+ backgroundResources = new BackgroundResourceAggregation(backgroundResourceList);
+ messagesBatch.resetAttachSchema();
+ bidiStreamingCallable = stub.appendRowsCallable();
+ clientStream = bidiStreamingCallable.splitCall(responseObserver);
+ }
+ try {
+ while (!clientStream.isSendReady()) {
+ Thread.sleep(10);
+ }
+ } catch (InterruptedException expected) {
+ }
+ }
+
+ private void setupAlarm() {
+ if (!messagesBatch.isEmpty()) {
+ if (!activeAlarm.getAndSet(true)) {
+ long delayThresholdMs = getBatchingSettings().getDelayThreshold().toMillis();
+ LOG.log(Level.INFO, "Setting up alarm for the next {0} ms.", delayThresholdMs);
+ currentAlarmFuture =
+ executor.schedule(
+ new Runnable() {
+ @Override
+ public void run() {
+ LOG.fine("Sending messages based on schedule");
+ activeAlarm.getAndSet(false);
+ messagesBatchLock.lock();
+ try {
+ writeBatch(messagesBatch.popBatch());
+ } finally {
+ messagesBatchLock.unlock();
+ }
+ }
+ },
+ delayThresholdMs,
+ TimeUnit.MILLISECONDS);
+ }
+ } else if (currentAlarmFuture != null) {
+ LOG.log(Level.FINER, "Cancelling alarm, no more messages");
+ if (activeAlarm.getAndSet(false)) {
+ currentAlarmFuture.cancel(false);
+ }
+ }
+ }
+
+ /**
+ * Write any outstanding batches if non-empty. This method sends buffered messages, but does not
+ * wait for the send operations to complete. To wait for messages to send, call {@code get} on the
+ * futures returned from {@code append}.
+ */
+ public void writeAllOutstanding() {
+ InflightBatch unorderedOutstandingBatch = null;
+ messagesBatchLock.lock();
+ try {
+ if (!messagesBatch.isEmpty()) {
+ writeBatch(messagesBatch.popBatch());
+ }
+ messagesBatch.reset();
+ } finally {
+ messagesBatchLock.unlock();
+ }
+ }
+
+ private void writeBatch(final InflightBatch inflightBatch) {
+ if (inflightBatch != null) {
+ AppendRowsRequest request = inflightBatch.getMergedRequest();
+ messagesWaiter.waitOnElementCount();
+ messagesWaiter.waitOnSizeLimit(inflightBatch.getByteSize());
+ responseObserver.addInflightBatch(inflightBatch);
+ clientStream.send(request);
+
+ synchronized (messagesWaiter) {
+ messagesWaiter.incrementPendingCount(1);
+ messagesWaiter.incrementPendingSize(inflightBatch.getByteSize());
+ }
+ }
+ }
+
+ /** Close the stream writer. Shut down all resources. */
+ @Override
+ public void close() {
+ shutdown();
+ // There is some problem waiting for resource to shutdown. So comment this statement out since
+ // it will cause a minute hang.
+ // awaitTermination(1, TimeUnit.MINUTES);
+ }
+
+ // The batch of messages that is being sent/processed.
+ private static final class InflightBatch {
+ // List of requests that is going to be batched.
+ final List inflightRequests;
+ // A list tracks expected offset for each AppendRequest. Used to reconstruct the Response
+ // future.
+ final ArrayList offsetList;
+ final long creationTime;
+ int attempt;
+ int batchSizeBytes;
+ long expectedOffset;
+ Boolean attachSchema;
+
+ InflightBatch(
+ List inflightRequests,
+ int batchSizeBytes,
+ Boolean attachSchema) {
+ this.inflightRequests = inflightRequests;
+ this.offsetList = new ArrayList(inflightRequests.size());
+ for (AppendRequestAndFutureResponse request : inflightRequests) {
+ if (request.message.getOffset().getValue() > 0) {
+ offsetList.add(new Long(request.message.getOffset().getValue()));
+ } else {
+ offsetList.add(new Long(-1));
+ }
+ }
+ this.expectedOffset = offsetList.get(0).longValue();
+ attempt = 1;
+ creationTime = System.currentTimeMillis();
+ this.batchSizeBytes = batchSizeBytes;
+ this.attachSchema = attachSchema;
+ }
+
+ int count() {
+ return inflightRequests.size();
+ }
+
+ int getByteSize() {
+ return this.batchSizeBytes;
+ }
+
+ long getExpectedOffset() {
+ return expectedOffset;
+ }
+
+ private AppendRowsRequest getMergedRequest() throws IllegalStateException {
+ if (inflightRequests.size() == 0) {
+ throw new IllegalStateException("Unexpected empty message batch");
+ }
+ ProtoBufProto.ProtoRows.Builder rowsBuilder =
+ inflightRequests.get(0).message.getProtoRows().getRows().toBuilder();
+ for (int i = 1; i < inflightRequests.size(); i++) {
+ rowsBuilder.addAllSerializedRows(
+ inflightRequests.get(i).message.getProtoRows().getRows().getSerializedRowsList());
+ }
+ AppendRowsRequest.ProtoData.Builder data =
+ inflightRequests.get(0).message.getProtoRows().toBuilder().setRows(rowsBuilder.build());
+ if (!attachSchema) {
+ data.clearWriterSchema();
+ } else {
+ if (!data.hasWriterSchema()) {
+ throw new IllegalStateException(
+ "The first message on the connection must have writer schema set");
+ }
+ }
+ return inflightRequests.get(0).message.toBuilder().setProtoRows(data.build()).build();
+ }
+
+ private void onFailure(Throwable t) {
+ for (AppendRequestAndFutureResponse request : inflightRequests) {
+ request.appendResult.setException(t);
+ }
+ }
+
+ // Disassemble the batched response and sets the furture on individual request.
+ private void onSuccess(AppendRowsResponse response) {
+ for (int i = 0; i < inflightRequests.size(); i++) {
+ AppendRowsResponse.Builder singleResponse = response.toBuilder();
+ if (offsetList.get(i) > 0) {
+ singleResponse.setOffset(offsetList.get(i));
+ } else {
+ long actualOffset = response.getOffset();
+ for (int j = 0; j < i; j++) {
+ actualOffset +=
+ inflightRequests.get(j).message.getProtoRows().getRows().getSerializedRowsCount();
+ }
+ singleResponse.setOffset(actualOffset);
+ }
+ inflightRequests.get(i).appendResult.set(singleResponse.build());
+ }
+ }
+ }
+
+ // Class that wraps AppendRowsRequest and its cooresponding Response future.
+ private static final class AppendRequestAndFutureResponse {
+ final SettableApiFuture appendResult;
+ final AppendRowsRequest message;
+ final int messageSize;
+
+ AppendRequestAndFutureResponse(AppendRowsRequest message) {
+ this.appendResult = SettableApiFuture.create();
+ this.message = message;
+ this.messageSize = message.getProtoRows().getSerializedSize();
+ if (this.messageSize > getApiMaxRequestBytes()) {
+ throw new StatusRuntimeException(
+ Status.fromCode(Status.Code.FAILED_PRECONDITION)
+ .withDescription("Message exceeded max size limit: " + getApiMaxRequestBytes()));
+ }
+ }
+ }
+
+ /** The batching settings configured on this {@code StreamWriter}. */
+ public BatchingSettings getBatchingSettings() {
+ return batchingSettings;
+ }
+
+ /** The retry settings configured on this {@code StreamWriter}. */
+ public RetrySettings getRetrySettings() {
+ return retrySettings;
+ }
+
+ /**
+ * Schedules immediate flush of any outstanding messages and waits until all are processed.
+ *
+ * Sends remaining outstanding messages and prevents future calls to publish. This method
+ * should be invoked prior to deleting the {@link WriteStream} object in order to ensure that no
+ * pending messages are lost.
+ */
+ public void shutdown() {
+ Preconditions.checkState(
+ !shutdown.getAndSet(true), "Cannot shut down a writer already shut-down.");
+ if (currentAlarmFuture != null && activeAlarm.getAndSet(false)) {
+ currentAlarmFuture.cancel(false);
+ }
+ writeAllOutstanding();
+ messagesWaiter.waitComplete();
+ backgroundResources.shutdown();
+ }
+
+ /**
+ * Wait for all work has completed execution after a {@link #shutdown()} request, or the timeout
+ * occurs, or the current thread is interrupted.
+ *
+ *
Call this method to make sure all resources are freed properly.
+ */
+ public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
+ return backgroundResources.awaitTermination(duration, unit);
+ }
+
+ /**
+ * Constructs a new {@link Builder} using the given topic.
+ *
+ *
Example of creating a {@code WriteStream}.
+ *
+ *
{@code
+ * String table = "projects/my_project/datasets/my_dataset/tables/my_table";
+ * String stream;
+ * try (BigQueryWriteClient bigqueryWriteClient = BigQueryWriteClient.create()) {
+ * CreateWriteStreamRequest request = CreateWriteStreamRequest.newBuilder().setParent(table).build();
+ * WriteStream response = bigQueryWriteClient.createWriteStream(request);
+ * stream = response.getName();
+ * }
+ * WriteStream writer = WriteStream.newBuilder(stream).build();
+ * try {
+ * // ...
+ * } finally {
+ * // When finished with the writer, make sure to shutdown to free up resources.
+ * writer.shutdown();
+ * writer.awaitTermination(1, TimeUnit.MINUTES);
+ * }
+ * }
+ */
+ public static Builder newBuilder(String streamName) {
+ return new Builder(streamName);
+ }
+
+ /** A builder of {@link Publisher}s. */
+ public static final class Builder {
+ static final Duration MIN_TOTAL_TIMEOUT = Duration.ofSeconds(10);
+ static final Duration MIN_RPC_TIMEOUT = Duration.ofMillis(10);
+
+ // Meaningful defaults.
+ static final long DEFAULT_ELEMENT_COUNT_THRESHOLD = 100L;
+ static final long DEFAULT_REQUEST_BYTES_THRESHOLD = 100 * 1024L; // 100 kB
+ static final Duration DEFAULT_DELAY_THRESHOLD = Duration.ofMillis(1);
+ static final FlowControlSettings DEFAULT_FLOW_CONTROL_SETTINGS =
+ FlowControlSettings.newBuilder()
+ .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
+ .setMaxOutstandingElementCount(1000L)
+ .setMaxOutstandingRequestBytes(100 * 1024 * 1024L) // 100 Mb
+ .build();
+ public static final BatchingSettings DEFAULT_BATCHING_SETTINGS =
+ BatchingSettings.newBuilder()
+ .setDelayThreshold(DEFAULT_DELAY_THRESHOLD)
+ .setRequestByteThreshold(DEFAULT_REQUEST_BYTES_THRESHOLD)
+ .setElementCountThreshold(DEFAULT_ELEMENT_COUNT_THRESHOLD)
+ .setFlowControlSettings(DEFAULT_FLOW_CONTROL_SETTINGS)
+ .build();
+ public static final RetrySettings DEFAULT_RETRY_SETTINGS =
+ RetrySettings.newBuilder()
+ .setMaxRetryDelay(Duration.ofSeconds(60))
+ .setInitialRetryDelay(Duration.ofMillis(100))
+ .setMaxAttempts(3)
+ .build();
+ static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false;
+ private static final int THREADS_PER_CPU = 5;
+ static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
+ InstantiatingExecutorProvider.newBuilder()
+ .setExecutorThreadCount(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors())
+ .build();
+
+ private String streamName;
+ private String endpoint = BigQueryWriteSettings.getDefaultEndpoint();
+
+ // Batching options
+ BatchingSettings batchingSettings = DEFAULT_BATCHING_SETTINGS;
+
+ RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS;
+
+ private boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING;
+
+ private TransportChannelProvider channelProvider =
+ BigQueryWriteSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build();
+
+ private HeaderProvider headerProvider = new NoHeaderProvider();
+ private HeaderProvider internalHeaderProvider =
+ BigQueryWriteSettings.defaultApiClientHeaderProviderBuilder().build();
+ ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
+ private CredentialsProvider credentialsProvider =
+ BigQueryWriteSettings.defaultCredentialsProviderBuilder().build();
+
+ private Builder(String stream) {
+ this.streamName = Preconditions.checkNotNull(stream);
+ }
+
+ /**
+ * {@code ChannelProvider} to use to create Channels, which must point at Cloud BigQuery Storage
+ * API endpoint.
+ *
+ * For performance, this client benefits from having multiple underlying connections. See
+ * {@link com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.Builder#setPoolSize(int)}.
+ */
+ public Builder setChannelProvider(TransportChannelProvider channelProvider) {
+ this.channelProvider = Preconditions.checkNotNull(channelProvider);
+ return this;
+ }
+
+ /** {@code CredentialsProvider} to use to create Credentials to authenticate calls. */
+ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
+ this.credentialsProvider = Preconditions.checkNotNull(credentialsProvider);
+ return this;
+ }
+
+ /**
+ * Sets the {@code BatchSettings} on the writer.
+ *
+ * @param batchingSettings
+ * @return
+ */
+ public Builder setBatchingSettings(BatchingSettings batchingSettings) {
+ Preconditions.checkNotNull(batchingSettings);
+
+ BatchingSettings.Builder builder = batchingSettings.toBuilder();
+ Preconditions.checkNotNull(batchingSettings.getElementCountThreshold());
+ Preconditions.checkArgument(batchingSettings.getElementCountThreshold() > 0);
+ Preconditions.checkNotNull(batchingSettings.getRequestByteThreshold());
+ Preconditions.checkArgument(batchingSettings.getRequestByteThreshold() > 0);
+ if (batchingSettings.getRequestByteThreshold() > getApiMaxRequestBytes()) {
+ builder.setRequestByteThreshold(getApiMaxRequestBytes());
+ }
+ Preconditions.checkNotNull(batchingSettings.getDelayThreshold());
+ Preconditions.checkArgument(batchingSettings.getDelayThreshold().toMillis() > 0);
+ if (batchingSettings.getFlowControlSettings() == null) {
+ builder.setFlowControlSettings(DEFAULT_FLOW_CONTROL_SETTINGS);
+ } else {
+
+ if (batchingSettings.getFlowControlSettings().getMaxOutstandingElementCount() == null) {
+ builder.setFlowControlSettings(
+ batchingSettings
+ .getFlowControlSettings()
+ .toBuilder()
+ .setMaxOutstandingElementCount(
+ DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingElementCount())
+ .build());
+ } else {
+ Preconditions.checkArgument(
+ batchingSettings.getFlowControlSettings().getMaxOutstandingElementCount() > 0);
+ if (batchingSettings.getFlowControlSettings().getMaxOutstandingElementCount()
+ > getApiMaxInflightRequests()) {
+ builder.setFlowControlSettings(
+ batchingSettings
+ .getFlowControlSettings()
+ .toBuilder()
+ .setMaxOutstandingElementCount(getApiMaxInflightRequests())
+ .build());
+ }
+ }
+ if (batchingSettings.getFlowControlSettings().getMaxOutstandingRequestBytes() == null) {
+ builder.setFlowControlSettings(
+ batchingSettings
+ .getFlowControlSettings()
+ .toBuilder()
+ .setMaxOutstandingRequestBytes(
+ DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingRequestBytes())
+ .build());
+ } else {
+ Preconditions.checkArgument(
+ batchingSettings.getFlowControlSettings().getMaxOutstandingRequestBytes() > 0);
+ }
+ if (batchingSettings.getFlowControlSettings().getLimitExceededBehavior() == null) {
+ builder.setFlowControlSettings(
+ batchingSettings
+ .getFlowControlSettings()
+ .toBuilder()
+ .setLimitExceededBehavior(
+ DEFAULT_FLOW_CONTROL_SETTINGS.getLimitExceededBehavior())
+ .build());
+ } else {
+ Preconditions.checkArgument(
+ batchingSettings.getFlowControlSettings().getLimitExceededBehavior()
+ != FlowController.LimitExceededBehavior.Ignore);
+ }
+ }
+ this.batchingSettings = builder.build();
+ return this;
+ }
+
+ /**
+ * Sets the {@code RetrySettings} on the writer.
+ *
+ * @param retrySettings
+ * @return
+ */
+ public Builder setRetrySettings(RetrySettings retrySettings) {
+ Preconditions.checkNotNull(retrySettings);
+ Preconditions.checkArgument(
+ retrySettings.getTotalTimeout().compareTo(MIN_TOTAL_TIMEOUT) >= 0);
+ Preconditions.checkArgument(
+ retrySettings.getInitialRpcTimeout().compareTo(MIN_RPC_TIMEOUT) >= 0);
+ this.retrySettings = retrySettings;
+ return this;
+ }
+
+ /** Gives the ability to set a custom executor to be used by the library. */
+ public Builder setExecutorProvider(ExecutorProvider executorProvider) {
+ this.executorProvider = Preconditions.checkNotNull(executorProvider);
+ return this;
+ }
+
+ /** Gives the ability to override the gRPC endpoint. */
+ public Builder setEndpoint(String endpoint) {
+ this.endpoint = endpoint;
+ return this;
+ }
+
+ /** Builds the {@code StreamWriter}. */
+ public StreamWriter build() throws IOException {
+ return new StreamWriter(this);
+ }
+ }
+
+ private static final class AppendResponseObserver
+ implements ResponseObserver {
+ private Queue inflightBatches = new LinkedList();
+ private StreamWriter streamWriter;
+
+ public void addInflightBatch(InflightBatch batch) {
+ synchronized (this.inflightBatches) {
+ this.inflightBatches.add(batch);
+ }
+ }
+
+ public AppendResponseObserver(StreamWriter streamWriter) {
+ this.streamWriter = streamWriter;
+ }
+
+ private boolean isRecoverableError(Throwable t) {
+ Status status = Status.fromThrowable(t);
+ return status.getCode() == Status.Code.UNAVAILABLE;
+ }
+
+ @Override
+ public void onStart(StreamController controller) {
+ // no-op
+ }
+
+ private void abortInflightRequests(Throwable t) {
+ synchronized (this.inflightBatches) {
+ while (!this.inflightBatches.isEmpty()) {
+ this.inflightBatches
+ .poll()
+ .onFailure(
+ new AbortedException(
+ "Request aborted due to previous failures",
+ t,
+ GrpcStatusCode.of(Status.Code.ABORTED),
+ true));
+ }
+ }
+ }
+
+ @Override
+ public void onResponse(AppendRowsResponse response) {
+ InflightBatch inflightBatch = null;
+ synchronized (this.inflightBatches) {
+ inflightBatch = this.inflightBatches.poll();
+ }
+ try {
+ streamWriter.currentRetries = 0;
+ if (response == null) {
+ inflightBatch.onFailure(new IllegalStateException("Response is null"));
+ }
+ // TODO: Deal with in stream errors.
+ if (response.hasError()) {
+ StatusRuntimeException exception =
+ new StatusRuntimeException(
+ Status.fromCodeValue(response.getError().getCode())
+ .withDescription(response.getError().getMessage()));
+ inflightBatch.onFailure(exception);
+ }
+ if (inflightBatch.getExpectedOffset() > 0
+ && response.getOffset() != inflightBatch.getExpectedOffset()) {
+ IllegalStateException exception =
+ new IllegalStateException(
+ String.format(
+ "The append result offset %s does not match " + "the expected offset %s.",
+ response.getOffset(), inflightBatch.getExpectedOffset()));
+ inflightBatch.onFailure(exception);
+ abortInflightRequests(exception);
+ } else {
+ inflightBatch.onSuccess(response);
+ }
+ } finally {
+ synchronized (streamWriter.messagesWaiter) {
+ streamWriter.messagesWaiter.incrementPendingCount(-1);
+ streamWriter.messagesWaiter.incrementPendingSize(0 - inflightBatch.getByteSize());
+ streamWriter.messagesWaiter.notifyAll();
+ }
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ LOG.info("OnComplete called");
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ LOG.info("OnError called");
+ if (streamWriter.shutdown.get()) {
+ return;
+ }
+ InflightBatch inflightBatch = null;
+ synchronized (this.inflightBatches) {
+ if (inflightBatches.isEmpty()) {
+ // The batches could have been aborted.
+ return;
+ }
+ inflightBatch = this.inflightBatches.poll();
+ }
+ if (isRecoverableError(t)) {
+ try {
+ if (streamWriter.currentRetries < streamWriter.getRetrySettings().getMaxAttempts()
+ && !streamWriter.shutdown.get()) {
+ streamWriter.refreshAppend();
+ // Currently there is a bug that it took reconnected stream 5 seconds to pick up
+ // stream count. So wait at least 5 seconds before sending a new request.
+ Thread.sleep(
+ Math.min(
+ streamWriter.getRetrySettings().getInitialRetryDelay().toMillis(),
+ Duration.ofSeconds(5).toMillis()));
+ streamWriter.writeBatch(inflightBatch);
+ synchronized (streamWriter.currentRetries) {
+ streamWriter.currentRetries++;
+ }
+ } else {
+ synchronized (streamWriter.currentRetries) {
+ streamWriter.currentRetries = 0;
+ }
+ inflightBatch.onFailure(t);
+ }
+ } catch (IOException | InterruptedException e) {
+ streamWriter.currentRetries = 0;
+ inflightBatch.onFailure(e);
+ synchronized (streamWriter.messagesWaiter) {
+ streamWriter.messagesWaiter.incrementPendingCount(-1);
+ streamWriter.messagesWaiter.incrementPendingSize(0 - inflightBatch.getByteSize());
+ streamWriter.messagesWaiter.notifyAll();
+ }
+ }
+ } else {
+ LOG.info("Set error response");
+ try {
+ synchronized (streamWriter.currentRetries) {
+ streamWriter.currentRetries = 0;
+ }
+ inflightBatch.onFailure(t);
+ try {
+ // Establish a new connection.
+ streamWriter.refreshAppend();
+ } catch (IOException e) {
+ LOG.info("Failed to establish a new connection, shutdown writer");
+ streamWriter.shutdown();
+ }
+ } finally {
+ synchronized (streamWriter.messagesWaiter) {
+ streamWriter.messagesWaiter.incrementPendingCount(-1);
+ streamWriter.messagesWaiter.incrementPendingSize(0 - inflightBatch.getByteSize());
+ streamWriter.messagesWaiter.notifyAll();
+ }
+ }
+ }
+ }
+ };
+
+ // This class controls how many messages are going to be sent out in a batch.
+ private static class MessagesBatch {
+ private List messages;
+ private int batchedBytes;
+ private final BatchingSettings batchingSettings;
+ private Boolean attachSchema = true;
+
+ private MessagesBatch(BatchingSettings batchingSettings) {
+ this.batchingSettings = batchingSettings;
+ reset();
+ }
+
+ // Get all the messages out in a batch.
+ private InflightBatch popBatch() {
+ InflightBatch batch = new InflightBatch(messages, batchedBytes, this.attachSchema);
+ this.attachSchema = false;
+ reset();
+ return batch;
+ }
+
+ private void reset() {
+ messages = new LinkedList<>();
+ batchedBytes = 0;
+ }
+
+ private void resetAttachSchema() {
+ attachSchema = true;
+ }
+
+ private boolean isEmpty() {
+ return messages.isEmpty();
+ }
+
+ private int getBatchedBytes() {
+ return batchedBytes;
+ }
+
+ private int getMessagesCount() {
+ return messages.size();
+ }
+
+ private boolean hasBatchingBytes() {
+ return getMaxBatchBytes() > 0;
+ }
+
+ private long getMaxBatchBytes() {
+ return batchingSettings.getRequestByteThreshold();
+ }
+
+ // The message batch returned could contain the previous batch of messages plus the current
+ // message.
+ // if the message is too large.
+ private List add(AppendRequestAndFutureResponse outstandingAppend) {
+ List batchesToSend = new ArrayList<>();
+ // Check if the next message makes the current batch exceed the max batch byte size.
+ if (!isEmpty()
+ && hasBatchingBytes()
+ && getBatchedBytes() + outstandingAppend.messageSize >= getMaxBatchBytes()) {
+ batchesToSend.add(popBatch());
+ }
+
+ messages.add(outstandingAppend);
+ batchedBytes += outstandingAppend.messageSize;
+
+ // Border case: If the message to send is greater or equals to the max batch size then send it
+ // immediately.
+ // Alternatively if after adding the message we have reached the batch max messages then we
+ // have a batch to send.
+ if ((hasBatchingBytes() && outstandingAppend.messageSize >= getMaxBatchBytes())
+ || getMessagesCount() == batchingSettings.getElementCountThreshold()) {
+ batchesToSend.add(popBatch());
+ }
+
+ return batchesToSend;
+ }
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/Waiter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/Waiter.java
new file mode 100644
index 0000000000..0e15d6c726
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/Waiter.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2016 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery.storage.v1alpha2;
+
+import com.google.api.core.InternalApi;
+import com.google.api.gax.batching.FlowControlSettings;
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.UnimplementedException;
+import io.grpc.Status;
+import java.util.logging.Logger;
+
+/**
+ * A barrier kind of object that helps keep track of pending actions and synchronously wait until
+ * all have completed.
+ */
+class Waiter {
+ private static final Logger LOG = Logger.getLogger(Waiter.class.getName());
+
+ private int pendingCount;
+ private int pendingSize;
+ FlowControlSettings flowControlSettings;
+
+ Waiter(FlowControlSettings flowControlSettings) {
+ pendingCount = 0;
+ pendingSize = 0;
+ this.flowControlSettings = flowControlSettings;
+ }
+
+ public synchronized void incrementPendingCount(int delta) {
+ this.pendingCount += delta;
+ if (pendingCount == 0) {
+ notifyAll();
+ }
+ }
+
+ public synchronized void incrementPendingSize(int delta) {
+ this.pendingSize += delta;
+ }
+
+ private void wait(String message) {
+ boolean interrupted = false;
+ try {
+ LOG.fine("Wait on: " + message);
+ wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void handleOverLimit(String message) {
+ boolean interrupted = false;
+ switch (this.flowControlSettings.getLimitExceededBehavior()) {
+ case Block:
+ wait(message);
+ break;
+ case ThrowException:
+ throw new IllegalStateException("FlowControl limit exceeded: " + message);
+ case Ignore:
+ return;
+ default:
+ throw new UnimplementedException(
+ "Unknown behavior setting: "
+ + this.flowControlSettings.getLimitExceededBehavior().toString(),
+ null,
+ GrpcStatusCode.of(Status.Code.UNIMPLEMENTED),
+ false);
+ }
+ }
+
+ public synchronized void waitOnElementCount() {
+ LOG.finer(
+ "Waiting on element count "
+ + this.pendingCount
+ + " "
+ + this.flowControlSettings.getMaxOutstandingElementCount());
+ while (this.pendingCount >= this.flowControlSettings.getMaxOutstandingElementCount()) {
+ handleOverLimit("Element count");
+ }
+ }
+
+ public synchronized void waitOnSizeLimit(int incomingSize) {
+ LOG.finer(
+ "Waiting on size limit "
+ + (this.pendingSize + incomingSize)
+ + " "
+ + this.flowControlSettings.getMaxOutstandingRequestBytes());
+ while (this.pendingSize + incomingSize
+ >= this.flowControlSettings.getMaxOutstandingRequestBytes()) {
+ handleOverLimit("Byte size");
+ }
+ }
+
+ public synchronized void waitComplete() {
+ boolean interrupted = false;
+ try {
+ while (pendingCount > 0) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ // Ignored, uninterruptibly.
+ interrupted = true;
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ @InternalApi
+ public int pendingCount() {
+ return pendingCount;
+ }
+
+ @InternalApi
+ public int pendingSize() {
+ return pendingSize;
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWrite.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWrite.java
new file mode 100644
index 0000000000..5298e80ae4
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWrite.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigquery.storage.v1alpha2;
+
+import com.google.api.gax.grpc.testing.MockGrpcService;
+import com.google.cloud.bigquery.storage.v1alpha2.Storage.*;
+import com.google.protobuf.AbstractMessage;
+import io.grpc.ServerServiceDefinition;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import org.threeten.bp.Duration;
+
+/**
+ * A fake implementation of {@link MockGrpcService}, that can be used to test clients of a
+ * StreamWriter. It forwards calls to the real implementation (@link FakeBigQueryWriteImpl}.
+ */
+public class FakeBigQueryWrite implements MockGrpcService {
+ private final FakeBigQueryWriteImpl serviceImpl;
+
+ public FakeBigQueryWrite() {
+ serviceImpl = new FakeBigQueryWriteImpl();
+ }
+
+ @Override
+ public List getRequests() {
+ return new LinkedList(serviceImpl.getCapturedRequests());
+ }
+
+ public List getAppendRequests() {
+ return serviceImpl.getCapturedRequests();
+ }
+
+ @Override
+ public void addResponse(AbstractMessage response) {
+ if (response instanceof AppendRowsResponse) {
+ serviceImpl.addResponse((AppendRowsResponse) response);
+ } else {
+ throw new IllegalStateException("Unsupported service");
+ }
+ }
+
+ @Override
+ public void addException(Exception exception) {
+ serviceImpl.addConnectionError(exception);
+ }
+
+ @Override
+ public ServerServiceDefinition getServiceDefinition() {
+ return serviceImpl.bindService();
+ }
+
+ @Override
+ public void reset() {
+ serviceImpl.reset();
+ }
+
+ public void setResponseDelay(Duration delay) {
+ serviceImpl.setResponseDelay(delay);
+ }
+
+ public void setExecutor(ScheduledExecutorService executor) {
+ serviceImpl.setExecutor(executor);
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWriteImpl.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWriteImpl.java
new file mode 100644
index 0000000000..aa3f7e734d
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWriteImpl.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2016 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery.storage.v1alpha2;
+
+import com.google.cloud.bigquery.storage.v1alpha2.Storage.*;
+import com.google.common.base.Optional;
+import io.grpc.stub.StreamObserver;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Logger;
+import org.threeten.bp.Duration;
+
+/**
+ * A fake implementation of {@link BigQueryWriteImplBase} that can acts like server in StreamWriter
+ * unit testing.
+ */
+class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase {
+ private static final Logger LOG = Logger.getLogger(FakeBigQueryWriteImpl.class.getName());
+
+ private final LinkedBlockingQueue requests = new LinkedBlockingQueue<>();
+ private final LinkedBlockingQueue responses = new LinkedBlockingQueue<>();
+ private final AtomicInteger nextMessageId = new AtomicInteger(1);
+ private boolean autoPublishResponse;
+ private ScheduledExecutorService executor = null;
+ private Duration responseDelay = Duration.ZERO;
+
+ /** Class used to save the state of a possible response. */
+ private static class Response {
+ Optional appendResponse;
+ Optional error;
+
+ public Response(AppendRowsResponse appendResponse) {
+ this.appendResponse = Optional.of(appendResponse);
+ this.error = Optional.absent();
+ }
+
+ public Response(Throwable exception) {
+ this.appendResponse = Optional.absent();
+ this.error = Optional.of(exception);
+ }
+
+ public AppendRowsResponse getResponse() {
+ return appendResponse.get();
+ }
+
+ public Throwable getError() {
+ return error.get();
+ }
+
+ boolean isError() {
+ return error.isPresent();
+ }
+
+ @Override
+ public String toString() {
+ if (isError()) {
+ return error.get().toString();
+ }
+ return appendResponse.get().toString();
+ }
+ }
+
+ @Override
+ public StreamObserver appendRows(
+ final StreamObserver responseObserver) {
+ StreamObserver requestObserver =
+ new StreamObserver() {
+ @Override
+ public void onNext(AppendRowsRequest value) {
+ LOG.info("Get request:" + value.toString());
+ final Response response = responses.remove();
+ requests.add(value);
+ if (responseDelay == Duration.ZERO) {
+ sendResponse(response, responseObserver);
+ } else {
+ final Response responseToSend = response;
+ LOG.info("Schedule a response to be sent at delay");
+ executor.schedule(
+ new Runnable() {
+ @Override
+ public void run() {
+ sendResponse(responseToSend, responseObserver);
+ }
+ },
+ responseDelay.toMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ responseObserver.onError(t);
+ }
+
+ @Override
+ public void onCompleted() {
+ responseObserver.onCompleted();
+ }
+ };
+ return requestObserver;
+ }
+
+ private void sendResponse(
+ Response response, StreamObserver responseObserver) {
+ LOG.info("Sending response: " + response.toString());
+ if (response.isError()) {
+ responseObserver.onError(response.getError());
+ } else {
+ responseObserver.onNext(response.getResponse());
+ }
+ }
+
+ /** Set an executor to use to delay publish responses. */
+ public FakeBigQueryWriteImpl setExecutor(ScheduledExecutorService executor) {
+ this.executor = executor;
+ return this;
+ }
+
+ /** Set an amount of time by which to delay publish responses. */
+ public FakeBigQueryWriteImpl setResponseDelay(Duration responseDelay) {
+ this.responseDelay = responseDelay;
+ return this;
+ }
+
+ public FakeBigQueryWriteImpl addResponse(AppendRowsResponse appendRowsResponse) {
+ responses.add(new Response(appendRowsResponse));
+ return this;
+ }
+
+ public FakeBigQueryWriteImpl addResponse(AppendRowsResponse.Builder appendResponseBuilder) {
+ return addResponse(appendResponseBuilder.build());
+ }
+
+ public FakeBigQueryWriteImpl addConnectionError(Throwable error) {
+ responses.add(new Response(error));
+ return this;
+ }
+
+ public List getCapturedRequests() {
+ return new ArrayList(requests);
+ }
+
+ public void reset() {
+ requests.clear();
+ responses.clear();
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeClock.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeClock.java
new file mode 100644
index 0000000000..ee8ee3221b
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeClock.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2016 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery.storage.v1alpha2;
+
+import com.google.api.core.ApiClock;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** A Clock to help with testing time-based logic. */
+public class FakeClock implements ApiClock {
+
+ private final AtomicLong millis = new AtomicLong();
+
+ // Advances the clock value by {@code time} in {@code timeUnit}.
+ public void advance(long time, TimeUnit timeUnit) {
+ millis.addAndGet(timeUnit.toMillis(time));
+ }
+
+ @Override
+ public long nanoTime() {
+ return millisTime() * 1000_000L;
+ }
+
+ @Override
+ public long millisTime() {
+ return millis.get();
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeScheduledExecutorService.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeScheduledExecutorService.java
new file mode 100644
index 0000000000..8ee37cc0ba
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeScheduledExecutorService.java
@@ -0,0 +1,347 @@
+/*
+ * Copyright 2016 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery.storage.v1alpha2;
+
+import com.google.api.core.ApiClock;
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+import org.threeten.bp.Duration;
+import org.threeten.bp.Instant;
+
+/**
+ * Fake implementation of {@link ScheduledExecutorService} that allows tests control the reference
+ * time of the executor and decide when to execute any outstanding task.
+ */
+public class FakeScheduledExecutorService extends AbstractExecutorService
+ implements ScheduledExecutorService {
+ private static final Logger LOG = Logger.getLogger(FakeScheduledExecutorService.class.getName());
+
+ private final AtomicBoolean shutdown = new AtomicBoolean(false);
+ private final PriorityQueue> pendingCallables = new PriorityQueue<>();
+ private final FakeClock clock = new FakeClock();
+ private final Deque expectedWorkQueue = new LinkedList<>();
+
+ public ApiClock getClock() {
+ return clock;
+ }
+
+ @Override
+ public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit) {
+ return schedulePendingCallable(
+ new PendingCallable<>(
+ Duration.ofMillis(unit.toMillis(delay)), command, PendingCallableType.NORMAL));
+ }
+
+ @Override
+ public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) {
+ return schedulePendingCallable(
+ new PendingCallable<>(
+ Duration.ofMillis(unit.toMillis(delay)), callable, PendingCallableType.NORMAL));
+ }
+
+ @Override
+ public ScheduledFuture> scheduleAtFixedRate(
+ Runnable command, long initialDelay, long period, TimeUnit unit) {
+ return schedulePendingCallable(
+ new PendingCallable<>(
+ Duration.ofMillis(unit.toMillis(initialDelay)),
+ command,
+ PendingCallableType.FIXED_RATE));
+ }
+
+ @Override
+ public ScheduledFuture> scheduleWithFixedDelay(
+ Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ return schedulePendingCallable(
+ new PendingCallable<>(
+ Duration.ofMillis(unit.toMillis(initialDelay)),
+ command,
+ PendingCallableType.FIXED_DELAY));
+ }
+
+ /**
+ * This will advance the reference time of the executor and execute (in the same thread) any
+ * outstanding callable which execution time has passed.
+ */
+ public void advanceTime(Duration toAdvance) {
+ LOG.info(
+ "Advance to time to:"
+ + Instant.ofEpochMilli(clock.millisTime() + toAdvance.toMillis()).toString());
+ clock.advance(toAdvance.toMillis(), TimeUnit.MILLISECONDS);
+ work();
+ }
+
+ private void work() {
+ for (; ; ) {
+ PendingCallable> callable = null;
+ Instant cmpTime = Instant.ofEpochMilli(clock.millisTime());
+ if (!pendingCallables.isEmpty()) {
+ LOG.info(
+ "Going to call: Current time: "
+ + cmpTime.toString()
+ + " Scheduled time: "
+ + pendingCallables.peek().getScheduledTime().toString()
+ + " Creation time:"
+ + pendingCallables.peek().getCreationTime().toString());
+ }
+ synchronized (pendingCallables) {
+ if (pendingCallables.isEmpty()
+ || pendingCallables.peek().getScheduledTime().isAfter(cmpTime)) {
+ break;
+ }
+ callable = pendingCallables.poll();
+ }
+ if (callable != null) {
+ try {
+ callable.call();
+ } catch (Exception e) {
+ // We ignore any callable exception, which should be set to the future but not relevant to
+ // advanceTime.
+ }
+ }
+ }
+
+ synchronized (pendingCallables) {
+ if (shutdown.get() && pendingCallables.isEmpty()) {
+ pendingCallables.notifyAll();
+ }
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ if (shutdown.getAndSet(true)) {
+ throw new IllegalStateException("This executor has been shutdown already");
+ }
+ }
+
+ @Override
+ public List shutdownNow() {
+ if (shutdown.getAndSet(true)) {
+ throw new IllegalStateException("This executor has been shutdown already");
+ }
+ List pending = new ArrayList<>();
+ for (final PendingCallable> pendingCallable : pendingCallables) {
+ pending.add(
+ new Runnable() {
+ @Override
+ public void run() {
+ pendingCallable.call();
+ }
+ });
+ }
+ synchronized (pendingCallables) {
+ pendingCallables.notifyAll();
+ pendingCallables.clear();
+ }
+ return pending;
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return shutdown.get();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return pendingCallables.isEmpty();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ synchronized (pendingCallables) {
+ if (pendingCallables.isEmpty()) {
+ return true;
+ }
+ LOG.info("Wating on pending callables" + pendingCallables.size());
+ pendingCallables.wait(unit.toMillis(timeout));
+ return pendingCallables.isEmpty();
+ }
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ if (shutdown.get()) {
+ throw new IllegalStateException("This executor has been shutdown");
+ }
+ command.run();
+ }
+
+ ScheduledFuture schedulePendingCallable(PendingCallable callable) {
+ LOG.info(
+ "Schedule pending callable called " + callable.delay + " " + callable.getScheduledTime());
+ if (shutdown.get()) {
+ throw new IllegalStateException("This executor has been shutdown");
+ }
+ synchronized (pendingCallables) {
+ pendingCallables.add(callable);
+ }
+ work();
+ synchronized (expectedWorkQueue) {
+ // We compare by the callable delay in order decide when to remove expectations from the
+ // expected work queue, i.e. only the expected work that matches the delay of the scheduled
+ // callable is removed from the queue.
+ if (!expectedWorkQueue.isEmpty() && expectedWorkQueue.peek().equals(callable.delay)) {
+ expectedWorkQueue.poll();
+ }
+ expectedWorkQueue.notifyAll();
+ }
+
+ return callable.getScheduledFuture();
+ }
+
+ enum PendingCallableType {
+ NORMAL,
+ FIXED_RATE,
+ FIXED_DELAY
+ }
+
+ /** Class that saves the state of an scheduled pending callable. */
+ class PendingCallable implements Comparable> {
+ Instant creationTime = Instant.ofEpochMilli(clock.millisTime());
+ Duration delay;
+ Callable pendingCallable;
+ SettableFuture future = SettableFuture.create();
+ AtomicBoolean cancelled = new AtomicBoolean(false);
+ AtomicBoolean done = new AtomicBoolean(false);
+ PendingCallableType type;
+
+ PendingCallable(Duration delay, final Runnable runnable, PendingCallableType type) {
+ pendingCallable =
+ new Callable() {
+ @Override
+ public T call() {
+ runnable.run();
+ return null;
+ }
+ };
+ this.type = type;
+ this.delay = delay;
+ }
+
+ PendingCallable(Duration delay, Callable callable, PendingCallableType type) {
+ pendingCallable = callable;
+ this.type = type;
+ this.delay = delay;
+ }
+
+ private Instant getScheduledTime() {
+ return creationTime.plus(delay);
+ }
+
+ private Instant getCreationTime() {
+ return creationTime;
+ }
+
+ ScheduledFuture getScheduledFuture() {
+ return new ScheduledFuture() {
+ @Override
+ public long getDelay(TimeUnit unit) {
+ return unit.convert(
+ getScheduledTime().toEpochMilli() - clock.millisTime(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public int compareTo(Delayed o) {
+ return Ints.saturatedCast(
+ getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ synchronized (this) {
+ cancelled.set(true);
+ return !done.get();
+ }
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return cancelled.get();
+ }
+
+ @Override
+ public boolean isDone() {
+ return done.get();
+ }
+
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ return future.get();
+ }
+
+ @Override
+ public T get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return future.get(timeout, unit);
+ }
+ };
+ }
+
+ T call() {
+ T result = null;
+ synchronized (this) {
+ if (cancelled.get()) {
+ return null;
+ }
+ try {
+ result = pendingCallable.call();
+ future.set(result);
+ } catch (Exception e) {
+ future.setException(e);
+ } finally {
+ switch (type) {
+ case NORMAL:
+ done.set(true);
+ break;
+ case FIXED_DELAY:
+ this.creationTime = Instant.ofEpochMilli(clock.millisTime());
+ schedulePendingCallable(this);
+ break;
+ case FIXED_RATE:
+ this.creationTime = this.creationTime.plus(delay);
+ schedulePendingCallable(this);
+ break;
+ default:
+ // Nothing to do
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public int compareTo(PendingCallable other) {
+ return getScheduledTime().compareTo(other.getScheduledTime());
+ }
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/ProtoSchemaConverterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/ProtoSchemaConverterTest.java
index de1e7f4888..1cf7263628 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/ProtoSchemaConverterTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/ProtoSchemaConverterTest.java
@@ -26,7 +26,7 @@ public void convertSimple() {
ProtoBufProto.ProtoSchema protoSchema =
ProtoSchemaConverter.convert(testProto.getDescriptorForType());
Assert.assertEquals(
- "name: \"__S0\"\n"
+ "name: \"__ROOT__\"\n"
+ "field {\n"
+ " name: \"int32_value\"\n"
+ " number: 1\n"
@@ -102,32 +102,32 @@ public void convertNested() {
ProtoBufProto.ProtoSchema protoSchema =
ProtoSchemaConverter.convert(testProto.getDescriptorForType());
Assert.assertEquals(
- "name: \"__S0\"\n"
+ "name: \"__ROOT__\"\n"
+ "field {\n"
+ " name: \"nested_repeated_type\"\n"
+ " number: 1\n"
+ " label: LABEL_REPEATED\n"
+ " type: TYPE_MESSAGE\n"
- + " type_name: \"__S1\"\n"
+ + " type_name: \"__S2\"\n"
+ "}\n"
+ "field {\n"
+ " name: \"inner_type\"\n"
+ " number: 2\n"
+ " label: LABEL_OPTIONAL\n"
+ " type: TYPE_MESSAGE\n"
- + " type_name: \"__S3\"\n"
+ + " type_name: \"__S4\"\n"
+ "}\n"
+ "nested_type {\n"
- + " name: \"__S1\"\n"
+ + " name: \"__S2\"\n"
+ " field {\n"
+ " name: \"inner_type\"\n"
+ " number: 1\n"
+ " label: LABEL_REPEATED\n"
+ " type: TYPE_MESSAGE\n"
- + " type_name: \"__S2\"\n"
+ + " type_name: \"__S3\"\n"
+ " }\n"
+ " nested_type {\n"
- + " name: \"__S2\"\n"
+ + " name: \"__S3\"\n"
+ " field {\n"
+ " name: \"value\"\n"
+ " number: 1\n"
@@ -137,7 +137,7 @@ public void convertNested() {
+ " }\n"
+ "}\n"
+ "nested_type {\n"
- + " name: \"__S3\"\n"
+ + " name: \"__S4\"\n"
+ " field {\n"
+ " name: \"value\"\n"
+ " number: 1\n"
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java
new file mode 100644
index 0000000000..324599a05a
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java
@@ -0,0 +1,802 @@
+/*
+ * Copyright 2016 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery.storage.v1alpha2;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.gax.batching.BatchingSettings;
+import com.google.api.gax.batching.FlowControlSettings;
+import com.google.api.gax.batching.FlowController;
+import com.google.api.gax.core.ExecutorProvider;
+import com.google.api.gax.core.FixedExecutorProvider;
+import com.google.api.gax.core.InstantiatingExecutorProvider;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.testing.LocalChannelProvider;
+import com.google.api.gax.grpc.testing.MockGrpcService;
+import com.google.api.gax.grpc.testing.MockServiceHelper;
+import com.google.api.gax.rpc.DataLossException;
+import com.google.cloud.bigquery.storage.test.Test.FooType;
+import com.google.cloud.bigquery.storage.v1alpha2.Storage.*;
+import com.google.protobuf.DescriptorProtos;
+import com.google.protobuf.Int64Value;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.threeten.bp.Duration;
+
+@RunWith(JUnit4.class)
+public class StreamWriterTest {
+ private static final Logger LOG = Logger.getLogger(StreamWriterTest.class.getName());
+ private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s";
+ private static final ExecutorProvider SINGLE_THREAD_EXECUTOR =
+ InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();
+ private static LocalChannelProvider channelProvider;
+ private FakeScheduledExecutorService fakeExecutor;
+ private FakeBigQueryWrite testBigQueryWrite;
+ private static MockServiceHelper serviceHelper;
+
+ @Before
+ public void setUp() throws Exception {
+ testBigQueryWrite = new FakeBigQueryWrite();
+ serviceHelper =
+ new MockServiceHelper(
+ UUID.randomUUID().toString(), Arrays.asList(testBigQueryWrite));
+ serviceHelper.start();
+ channelProvider = serviceHelper.createChannelProvider();
+ fakeExecutor = new FakeScheduledExecutorService();
+ testBigQueryWrite.setExecutor(fakeExecutor);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ LOG.info("tearDown called");
+ serviceHelper.stop();
+ }
+
+ private StreamWriter.Builder getTestStreamWriterBuilder() {
+ return StreamWriter.newBuilder(TEST_STREAM)
+ .setChannelProvider(channelProvider)
+ .setExecutorProvider(SINGLE_THREAD_EXECUTOR)
+ .setCredentialsProvider(NoCredentialsProvider.create());
+ }
+
+ private AppendRowsRequest createAppendRequest(String[] messages, long offset) {
+ AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
+ AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder();
+ dataBuilder.setWriterSchema(
+ ProtoBufProto.ProtoSchema.newBuilder()
+ .setProtoDescriptor(
+ DescriptorProtos.DescriptorProto.newBuilder()
+ .setName("Message")
+ .addField(
+ DescriptorProtos.FieldDescriptorProto.newBuilder()
+ .setName("foo")
+ .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING)
+ .setNumber(1)
+ .build())
+ .build()));
+ ProtoBufProto.ProtoRows.Builder rows = ProtoBufProto.ProtoRows.newBuilder();
+ for (String message : messages) {
+ FooType foo = FooType.newBuilder().setFoo(message).build();
+ rows.addSerializedRows(foo.toByteString());
+ }
+ if (offset > 0) {
+ requestBuilder.setOffset(Int64Value.of(offset));
+ }
+ return requestBuilder
+ .setProtoRows(dataBuilder.setRows(rows.build()).build())
+ .setWriteStream(TEST_STREAM)
+ .build();
+ }
+
+ private ApiFuture sendTestMessage(StreamWriter writer, String[] messages) {
+ return writer.append(createAppendRequest(messages, -1));
+ }
+
+ @Test
+ public void testAppendByDuration() throws Exception {
+ StreamWriter writer =
+ getTestStreamWriterBuilder()
+ .setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setDelayThreshold(Duration.ofSeconds(5))
+ .setElementCountThreshold(10L)
+ .build())
+ .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor))
+ .build();
+
+ testBigQueryWrite.addResponse(Storage.AppendRowsResponse.newBuilder().setOffset(0).build());
+ ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"});
+ ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"});
+
+ assertFalse(appendFuture1.isDone());
+ assertFalse(appendFuture2.isDone());
+ fakeExecutor.advanceTime(Duration.ofSeconds(10));
+
+ assertEquals(0L, appendFuture1.get().getOffset());
+ assertEquals(1L, appendFuture2.get().getOffset());
+
+ assertEquals(1, testBigQueryWrite.getAppendRequests().size());
+
+ assertEquals(
+ 2,
+ testBigQueryWrite
+ .getAppendRequests()
+ .get(0)
+ .getProtoRows()
+ .getRows()
+ .getSerializedRowsCount());
+ assertEquals(
+ true, testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema());
+ writer.shutdown();
+ }
+
+ @Test
+ public void testAppendByNumBatchedMessages() throws Exception {
+ StreamWriter writer =
+ getTestStreamWriterBuilder()
+ .setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setElementCountThreshold(2L)
+ .setDelayThreshold(Duration.ofSeconds(100))
+ .build())
+ .build();
+
+ testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
+ testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build());
+
+ ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"});
+ ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"});
+ ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"});
+
+ assertEquals(0L, appendFuture1.get().getOffset());
+ assertEquals(1L, appendFuture2.get().getOffset());
+
+ assertFalse(appendFuture3.isDone());
+
+ ApiFuture appendFuture4 = sendTestMessage(writer, new String[] {"D"});
+
+ assertEquals(2L, appendFuture3.get().getOffset());
+ assertEquals(3L, appendFuture4.get().getOffset());
+
+ assertEquals(2, testBigQueryWrite.getAppendRequests().size());
+ assertEquals(
+ 2,
+ testBigQueryWrite
+ .getAppendRequests()
+ .get(0)
+ .getProtoRows()
+ .getRows()
+ .getSerializedRowsCount());
+ assertEquals(
+ true, testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema());
+ assertEquals(
+ 2,
+ testBigQueryWrite
+ .getAppendRequests()
+ .get(1)
+ .getProtoRows()
+ .getRows()
+ .getSerializedRowsCount());
+ assertEquals(
+ false, testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema());
+ writer.shutdown();
+ }
+
+ @Test
+ public void testAppendByNumBytes() throws Exception {
+ StreamWriter writer =
+ getTestStreamWriterBuilder()
+ .setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ // Each message is 32 bytes, setting batch size to 70 bytes allows 2 messages.
+ .setRequestByteThreshold(70L)
+ .setDelayThreshold(Duration.ofSeconds(100000))
+ .build())
+ .build();
+
+ testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
+ testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build());
+ testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build());
+
+ ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"});
+ ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"});
+ ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"});
+
+ assertEquals(0L, appendFuture1.get().getOffset());
+ assertEquals(1L, appendFuture2.get().getOffset());
+ assertFalse(appendFuture3.isDone());
+
+ // This message is big enough to trigger send on the pervious message and itself.
+ ApiFuture appendFuture4 =
+ sendTestMessage(writer, new String[] {StringUtils.repeat('A', 100)});
+ assertEquals(2L, appendFuture3.get().getOffset());
+ assertEquals(3L, appendFuture4.get().getOffset());
+
+ assertEquals(3, testBigQueryWrite.getAppendRequests().size());
+
+ writer.shutdown();
+ }
+
+ @Test
+ public void testWriteByShutdown() throws Exception {
+ StreamWriter writer =
+ getTestStreamWriterBuilder()
+ .setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setDelayThreshold(Duration.ofSeconds(100))
+ .setElementCountThreshold(10L)
+ .build())
+ .build();
+
+ testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0L).build());
+ testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1L).build());
+
+ ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"});
+ ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"});
+
+ // Note we are not advancing time or reaching the count threshold but messages should
+ // still get written by call to shutdown
+
+ writer.shutdown();
+ LOG.info("Wait for termination");
+ writer.awaitTermination(10, TimeUnit.SECONDS);
+
+ // Verify the appends completed
+ assertTrue(appendFuture1.isDone());
+ assertTrue(appendFuture2.isDone());
+ assertEquals(0L, appendFuture1.get().getOffset());
+ assertEquals(1L, appendFuture2.get().getOffset());
+ }
+
+ @Test
+ public void testWriteMixedSizeAndDuration() throws Exception {
+ StreamWriter writer =
+ getTestStreamWriterBuilder()
+ .setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setElementCountThreshold(2L)
+ .setDelayThreshold(Duration.ofSeconds(5))
+ .build())
+ .build();
+
+ testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0L).build());
+ testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2L).build());
+
+ ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"});
+
+ fakeExecutor.advanceTime(Duration.ofSeconds(2));
+ assertFalse(appendFuture1.isDone());
+
+ ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B", "C"});
+
+ // Write triggered by batch size
+ assertEquals(0L, appendFuture1.get().getOffset());
+ assertEquals(1L, appendFuture2.get().getOffset());
+
+ ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"D"});
+
+ assertFalse(appendFuture3.isDone());
+
+ // Write triggered by time
+ fakeExecutor.advanceTime(Duration.ofSeconds(5));
+
+ assertEquals(2L, appendFuture3.get().getOffset());
+
+ assertEquals(
+ 3,
+ testBigQueryWrite
+ .getAppendRequests()
+ .get(0)
+ .getProtoRows()
+ .getRows()
+ .getSerializedRowsCount());
+ assertEquals(
+ true, testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema());
+ assertEquals(
+ 1,
+ testBigQueryWrite
+ .getAppendRequests()
+ .get(1)
+ .getProtoRows()
+ .getRows()
+ .getSerializedRowsCount());
+ assertEquals(
+ false, testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema());
+ writer.shutdown();
+ }
+
+ @Test
+ public void testFlowControlBehaviorBlock() throws Exception {
+ StreamWriter writer =
+ getTestStreamWriterBuilder()
+ .setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setElementCountThreshold(1L)
+ .setFlowControlSettings(
+ StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS
+ .toBuilder()
+ .setMaxOutstandingRequestBytes(40L)
+ .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
+ .build())
+ .build())
+ .build();
+
+ testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2L).build());
+ testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3L).build());
+ testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10));
+
+ ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"});
+ final StreamWriter writer1 = writer;
+ Runnable runnable =
+ new Runnable() {
+ @Override
+ public void run() {
+ ApiFuture appendFuture2 =
+ sendTestMessage(writer1, new String[] {"B"});
+ }
+ };
+ Thread t = new Thread(runnable);
+ t.start();
+ assertEquals(true, t.isAlive());
+ assertEquals(false, appendFuture1.isDone());
+ // Wait is necessary for response to be scheduled before timer is advanced.
+ Thread.sleep(5000L);
+ fakeExecutor.advanceTime(Duration.ofSeconds(10));
+ // The first requests gets back while the second one is blocked.
+ assertEquals(2L, appendFuture1.get().getOffset());
+ Thread.sleep(5000L);
+ // Wait is necessary for response to be scheduled before timer is advanced.
+ fakeExecutor.advanceTime(Duration.ofSeconds(10));
+ t.join();
+ writer.shutdown();
+ }
+
+ @Test
+ public void testFlowControlBehaviorException() throws Exception {
+ StreamWriter writer =
+ getTestStreamWriterBuilder()
+ .setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setElementCountThreshold(1L)
+ .setFlowControlSettings(
+ StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS
+ .toBuilder()
+ .setMaxOutstandingElementCount(1L)
+ .setLimitExceededBehavior(
+ FlowController.LimitExceededBehavior.ThrowException)
+ .build())
+ .build())
+ .build();
+
+ testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1L).build());
+ ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"});
+ try {
+ ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"});
+ Assert.fail("This should fail");
+ } catch (IllegalStateException e) {
+ assertEquals("FlowControl limit exceeded: Element count", e.getMessage());
+ }
+ assertEquals(1L, appendFuture1.get().getOffset());
+ writer.shutdown();
+ }
+
+ @Test
+ public void testStreamReconnection() throws Exception {
+ StreamWriter writer =
+ getTestStreamWriterBuilder()
+ .setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setDelayThreshold(Duration.ofSeconds(100000))
+ .setElementCountThreshold(1L)
+ .build())
+ .build();
+
+ // Case 1: Request succeeded after retry since the error is transient.
+ StatusRuntimeException transientError = new StatusRuntimeException(Status.UNAVAILABLE);
+ testBigQueryWrite.addException(transientError);
+ testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
+ ApiFuture future1 = sendTestMessage(writer, new String[] {"m1"});
+ assertEquals(false, future1.isDone());
+ // Retry is scheduled to be 5 seconds later.
+ assertEquals(0L, future1.get().getOffset());
+
+ LOG.info("======CASE II");
+ // Case 2 : Request failed since the error is not transient.
+ StatusRuntimeException permanentError = new StatusRuntimeException(Status.INVALID_ARGUMENT);
+ testBigQueryWrite.addException(permanentError);
+ ApiFuture future2 = sendTestMessage(writer, new String[] {"m2"});
+ try {
+ future2.get();
+ Assert.fail("This should fail.");
+ } catch (ExecutionException e) {
+ assertEquals(permanentError.toString(), e.getCause().getCause().toString());
+ }
+
+ LOG.info("======CASE III");
+ // Writer needs to be recreated since the previous error is not recoverable.
+ writer =
+ getTestStreamWriterBuilder()
+ .setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setDelayThreshold(Duration.ofSeconds(100000))
+ .setElementCountThreshold(1L)
+ .build())
+ .build();
+ // Case 3: Failed after retried max retry times.
+ testBigQueryWrite.addException(transientError);
+ testBigQueryWrite.addException(transientError);
+ testBigQueryWrite.addException(transientError);
+ testBigQueryWrite.addException(transientError);
+ ApiFuture future3 = sendTestMessage(writer, new String[] {"m3"});
+ try {
+ future3.get();
+ Assert.fail("This should fail.");
+ } catch (ExecutionException e) {
+ assertEquals(transientError.toString(), e.getCause().getCause().toString());
+ }
+ }
+
+ @Test
+ public void testOffset() throws Exception {
+ StreamWriter writer =
+ getTestStreamWriterBuilder()
+ .setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setElementCountThreshold(2L)
+ .build())
+ .build();
+
+ testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(10L).build());
+ testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(13L).build());
+ AppendRowsRequest request1 = createAppendRequest(new String[] {"A"}, 10L);
+ ApiFuture appendFuture1 = writer.append(request1);
+ AppendRowsRequest request2 = createAppendRequest(new String[] {"B", "C"}, 11L);
+ ApiFuture appendFuture2 = writer.append(request2);
+ AppendRowsRequest request3 = createAppendRequest(new String[] {"E", "F"}, 13L);
+ ApiFuture appendFuture3 = writer.append(request3);
+ AppendRowsRequest request4 = createAppendRequest(new String[] {"G"}, 15L);
+ ApiFuture appendFuture4 = writer.append(request4);
+ assertEquals(10L, appendFuture1.get().getOffset());
+ assertEquals(11L, appendFuture2.get().getOffset());
+ assertEquals(13L, appendFuture3.get().getOffset());
+ assertEquals(15L, appendFuture4.get().getOffset());
+ writer.shutdown();
+ }
+
+ @Test
+ public void testOffsetMismatch() throws Exception {
+ StreamWriter writer =
+ getTestStreamWriterBuilder()
+ .setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setElementCountThreshold(1L)
+ .build())
+ .build();
+
+ testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(11L).build());
+ AppendRowsRequest request1 = createAppendRequest(new String[] {"A"}, 10L);
+ ApiFuture appendFuture1 = writer.append(request1);
+ try {
+ appendFuture1.get();
+ fail("Should throw exception");
+ } catch (Exception e) {
+ assertEquals(
+ "java.lang.IllegalStateException: The append result offset 11 does not match the expected offset 10.",
+ e.getCause().toString());
+ }
+ writer.shutdown();
+ }
+
+ @Test
+ public void testErrorPropagation() throws Exception {
+ StreamWriter writer =
+ getTestStreamWriterBuilder()
+ .setExecutorProvider(SINGLE_THREAD_EXECUTOR)
+ .setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setElementCountThreshold(1L)
+ .setDelayThreshold(Duration.ofSeconds(5))
+ .build())
+ .build();
+ testBigQueryWrite.addException(Status.DATA_LOSS.asException());
+ try {
+ sendTestMessage(writer, new String[] {"A"}).get();
+ fail("should throw exception");
+ } catch (ExecutionException e) {
+ assertThat(e.getCause()).isInstanceOf(DataLossException.class);
+ }
+ }
+
+ @Test
+ public void testWriterGetters() throws Exception {
+ StreamWriter.Builder builder = StreamWriter.newBuilder(TEST_STREAM);
+ builder.setChannelProvider(channelProvider);
+ builder.setExecutorProvider(SINGLE_THREAD_EXECUTOR);
+ builder.setBatchingSettings(
+ BatchingSettings.newBuilder()
+ .setRequestByteThreshold(10L)
+ .setDelayThreshold(Duration.ofMillis(11))
+ .setElementCountThreshold(12L)
+ .setFlowControlSettings(
+ FlowControlSettings.newBuilder()
+ .setMaxOutstandingElementCount(100L)
+ .setMaxOutstandingRequestBytes(1000L)
+ .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
+ .build())
+ .build());
+ builder.setCredentialsProvider(NoCredentialsProvider.create());
+ StreamWriter writer = builder.build();
+
+ assertEquals(TEST_STREAM, writer.getStreamNameString());
+ assertEquals(10, (long) writer.getBatchingSettings().getRequestByteThreshold());
+ assertEquals(Duration.ofMillis(11), writer.getBatchingSettings().getDelayThreshold());
+ assertEquals(12, (long) writer.getBatchingSettings().getElementCountThreshold());
+ assertEquals(
+ FlowController.LimitExceededBehavior.Block,
+ writer.getBatchingSettings().getFlowControlSettings().getLimitExceededBehavior());
+ assertEquals(
+ 100L,
+ writer
+ .getBatchingSettings()
+ .getFlowControlSettings()
+ .getMaxOutstandingElementCount()
+ .longValue());
+ assertEquals(
+ 1000L,
+ writer
+ .getBatchingSettings()
+ .getFlowControlSettings()
+ .getMaxOutstandingRequestBytes()
+ .longValue());
+ writer.shutdown();
+ }
+
+ @Test
+ public void testBuilderParametersAndDefaults() {
+ StreamWriter.Builder builder = StreamWriter.newBuilder(TEST_STREAM);
+ assertEquals(StreamWriter.Builder.DEFAULT_EXECUTOR_PROVIDER, builder.executorProvider);
+ assertEquals(
+ StreamWriter.Builder.DEFAULT_REQUEST_BYTES_THRESHOLD,
+ builder.batchingSettings.getRequestByteThreshold().longValue());
+ assertEquals(
+ StreamWriter.Builder.DEFAULT_DELAY_THRESHOLD, builder.batchingSettings.getDelayThreshold());
+ assertEquals(
+ StreamWriter.Builder.DEFAULT_ELEMENT_COUNT_THRESHOLD,
+ builder.batchingSettings.getElementCountThreshold().longValue());
+ assertEquals(StreamWriter.Builder.DEFAULT_RETRY_SETTINGS, builder.retrySettings);
+ assertEquals(Duration.ofMillis(100), builder.retrySettings.getInitialRetryDelay());
+ assertEquals(3, builder.retrySettings.getMaxAttempts());
+ }
+
+ @Test
+ public void testBuilderInvalidArguments() {
+ StreamWriter.Builder builder = StreamWriter.newBuilder(TEST_STREAM);
+
+ try {
+ builder.setChannelProvider(null);
+ fail("Should have thrown an NullPointerException");
+ } catch (NullPointerException expected) {
+ // Expected
+ }
+
+ try {
+ builder.setExecutorProvider(null);
+ fail("Should have thrown an NullPointerException");
+ } catch (NullPointerException expected) {
+ // Expected
+ }
+ try {
+ builder.setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setRequestByteThreshold(null)
+ .build());
+ fail("Should have thrown an NullPointerException");
+ } catch (NullPointerException expected) {
+ // Expected
+ }
+ try {
+ builder.setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setRequestByteThreshold(0L)
+ .build());
+ fail("Should have thrown an IllegalArgumentException");
+ } catch (IllegalArgumentException expected) {
+ // Expected
+ }
+ try {
+ builder.setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setRequestByteThreshold(-1L)
+ .build());
+ fail("Should have thrown an IllegalArgumentException");
+ } catch (IllegalArgumentException expected) {
+ // Expected
+ }
+
+ builder.setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setDelayThreshold(Duration.ofMillis(1))
+ .build());
+ try {
+ builder.setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setDelayThreshold(null)
+ .build());
+ fail("Should have thrown an NullPointerException");
+ } catch (NullPointerException expected) {
+ // Expected
+ }
+ try {
+ builder.setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setDelayThreshold(Duration.ofMillis(-1))
+ .build());
+ fail("Should have thrown an IllegalArgumentException");
+ } catch (IllegalArgumentException expected) {
+ // Expected
+ }
+
+ builder.setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setElementCountThreshold(1L)
+ .build());
+ try {
+ builder.setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setElementCountThreshold(null)
+ .build());
+ fail("Should have thrown an NullPointerException");
+ } catch (NullPointerException expected) {
+ // Expected
+ }
+ try {
+ builder.setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setElementCountThreshold(0L)
+ .build());
+ fail("Should have thrown an IllegalArgumentException");
+ } catch (IllegalArgumentException expected) {
+ // Expected
+ }
+ try {
+ builder.setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setElementCountThreshold(-1L)
+ .build());
+ fail("Should have thrown an IllegalArgumentException");
+ } catch (IllegalArgumentException expected) {
+ // Expected
+ }
+
+ try {
+ FlowControlSettings flowControlSettings =
+ FlowControlSettings.newBuilder().setMaxOutstandingElementCount(-1L).build();
+ builder.setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setFlowControlSettings(flowControlSettings)
+ .build());
+ fail("Should have thrown an IllegalArgumentException");
+ } catch (IllegalArgumentException expected) {
+ // Expected
+ }
+
+ try {
+ FlowControlSettings flowControlSettings =
+ FlowControlSettings.newBuilder().setMaxOutstandingRequestBytes(-1L).build();
+ builder.setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setFlowControlSettings(flowControlSettings)
+ .build());
+ fail("Should have thrown an IllegalArgumentException");
+ } catch (IllegalArgumentException expected) {
+ // Expected
+ }
+
+ try {
+ FlowControlSettings flowControlSettings =
+ FlowControlSettings.newBuilder()
+ .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Ignore)
+ .build();
+ builder.setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setFlowControlSettings(flowControlSettings)
+ .build());
+ fail("Should have thrown an IllegalArgumentException");
+ } catch (IllegalArgumentException expected) {
+ // Expected
+ }
+
+ try {
+ FlowControlSettings flowControlSettings =
+ FlowControlSettings.newBuilder().setLimitExceededBehavior(null).build();
+ builder.setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setFlowControlSettings(flowControlSettings)
+ .build());
+ fail("Should have thrown an NullPointerException");
+ } catch (NullPointerException expected) {
+ // Expected
+ }
+ }
+
+ @Test
+ public void testAwaitTermination() throws Exception {
+ StreamWriter writer =
+ getTestStreamWriterBuilder().setExecutorProvider(SINGLE_THREAD_EXECUTOR).build();
+ testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
+ ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"});
+ writer.shutdown();
+ // TODO: for some reason, await always returns false.
+ // assertTrue(writer.awaitTermination(1, TimeUnit.MINUTES));
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ StreamWriter writer = getTestStreamWriterBuilder().build();
+ writer.close();
+ try {
+ writer.shutdown();
+ fail("Should throw");
+ } catch (IllegalStateException e) {
+ LOG.info(e.toString());
+ assertEquals("Cannot shut down a writer already shut-down.", e.getMessage());
+ }
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java
new file mode 100644
index 0000000000..4b5976f45b
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java
@@ -0,0 +1,351 @@
+/*
+ * Copyright 2019 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery.storage.v1alpha2.it;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.core.ApiFuture;
+import com.google.cloud.ServiceOptions;
+import com.google.cloud.bigquery.*;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.storage.test.Test.*;
+import com.google.cloud.bigquery.storage.v1alpha2.BigQueryWriteClient;
+import com.google.cloud.bigquery.storage.v1alpha2.ProtoBufProto;
+import com.google.cloud.bigquery.storage.v1alpha2.ProtoSchemaConverter;
+import com.google.cloud.bigquery.storage.v1alpha2.Storage.*;
+import com.google.cloud.bigquery.storage.v1alpha2.Stream.WriteStream;
+import com.google.cloud.bigquery.storage.v1alpha2.StreamWriter;
+import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
+import com.google.protobuf.Int64Value;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.ExecutionException;
+import java.util.logging.Logger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.threeten.bp.Duration;
+
+/** Integration tests for BigQuery Storage API. */
+public class ITBigQueryWriteManualClientTest {
+ private static final Logger LOG =
+ Logger.getLogger(ITBigQueryWriteManualClientTest.class.getName());
+ private static final String DATASET = RemoteBigQueryHelper.generateDatasetName();
+ private static final String TABLE = "testtable";
+ private static final String TABLE2 = "complicatedtable";
+ private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset";
+
+ private static BigQueryWriteClient client;
+ private static TableInfo tableInfo;
+ private static TableInfo tableInfo2;
+ private static String tableId;
+ private static String tableId2;
+ private static BigQuery bigquery;
+
+ @BeforeClass
+ public static void beforeClass() throws IOException {
+ client = BigQueryWriteClient.create();
+
+ RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create();
+ bigquery = bigqueryHelper.getOptions().getService();
+ DatasetInfo datasetInfo =
+ DatasetInfo.newBuilder(/* datasetId = */ DATASET).setDescription(DESCRIPTION).build();
+ bigquery.create(datasetInfo);
+ LOG.info("Created test dataset: " + DATASET);
+ tableInfo =
+ TableInfo.newBuilder(
+ TableId.of(DATASET, TABLE),
+ StandardTableDefinition.of(
+ Schema.of(
+ com.google.cloud.bigquery.Field.newBuilder("foo", LegacySQLTypeName.STRING)
+ .build())))
+ .build();
+ com.google.cloud.bigquery.Field.Builder innerTypeFieldBuilder =
+ com.google.cloud.bigquery.Field.newBuilder(
+ "inner_type",
+ LegacySQLTypeName.RECORD,
+ com.google.cloud.bigquery.Field.newBuilder("value", LegacySQLTypeName.STRING)
+ .setMode(Field.Mode.REPEATED)
+ .build());
+
+ tableInfo2 =
+ TableInfo.newBuilder(
+ TableId.of(DATASET, TABLE2),
+ StandardTableDefinition.of(
+ Schema.of(
+ Field.newBuilder(
+ "nested_repeated_type",
+ LegacySQLTypeName.RECORD,
+ innerTypeFieldBuilder.setMode(Field.Mode.REPEATED).build())
+ .setMode(Field.Mode.REPEATED)
+ .build(),
+ innerTypeFieldBuilder.setMode(Field.Mode.NULLABLE).build())))
+ .build();
+ bigquery.create(tableInfo);
+ bigquery.create(tableInfo2);
+ tableId =
+ String.format(
+ "projects/%s/datasets/%s/tables/%s",
+ ServiceOptions.getDefaultProjectId(), DATASET, TABLE);
+ tableId2 =
+ String.format(
+ "projects/%s/datasets/%s/tables/%s",
+ ServiceOptions.getDefaultProjectId(), DATASET, TABLE2);
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ if (client != null) {
+ client.close();
+ }
+
+ if (bigquery != null) {
+ RemoteBigQueryHelper.forceDelete(bigquery, DATASET);
+ LOG.info("Deleted test dataset: " + DATASET);
+ }
+ }
+
+ private AppendRowsRequest.Builder createAppendRequest(String streamName, String[] messages) {
+ AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
+
+ AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder();
+ dataBuilder.setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor()));
+
+ ProtoBufProto.ProtoRows.Builder rows = ProtoBufProto.ProtoRows.newBuilder();
+ for (String message : messages) {
+ FooType foo = FooType.newBuilder().setFoo(message).build();
+ rows.addSerializedRows(foo.toByteString());
+ }
+ dataBuilder.setRows(rows.build());
+ return requestBuilder.setProtoRows(dataBuilder.build()).setWriteStream(streamName);
+ }
+
+ private AppendRowsRequest.Builder createAppendRequestComplicateType(
+ String streamName, String[] messages) {
+ AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
+
+ AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder();
+ dataBuilder.setWriterSchema(ProtoSchemaConverter.convert(ComplicateType.getDescriptor()));
+
+ ProtoBufProto.ProtoRows.Builder rows = ProtoBufProto.ProtoRows.newBuilder();
+ for (String message : messages) {
+ ComplicateType foo =
+ ComplicateType.newBuilder()
+ .setInnerType(InnerType.newBuilder().addValue(message).addValue(message).build())
+ .build();
+ rows.addSerializedRows(foo.toByteString());
+ }
+ dataBuilder.setRows(rows.build());
+ return requestBuilder.setProtoRows(dataBuilder.build()).setWriteStream(streamName);
+ }
+
+ @Test
+ public void testBatchWriteWithCommittedStream()
+ throws IOException, InterruptedException, ExecutionException {
+ WriteStream writeStream =
+ client.createWriteStream(
+ CreateWriteStreamRequest.newBuilder()
+ .setParent(tableId)
+ .setWriteStream(
+ WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
+ .build());
+ try (StreamWriter streamWriter =
+ StreamWriter.newBuilder(writeStream.getName())
+ .setBatchingSettings(
+ StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
+ .toBuilder()
+ .setRequestByteThreshold(1024 * 1024L) // 1 Mb
+ .setElementCountThreshold(2L)
+ .setDelayThreshold(Duration.ofSeconds(2))
+ .build())
+ .build()) {
+ LOG.info("Sending one message");
+ ApiFuture response =
+ streamWriter.append(
+ createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build());
+ assertEquals(0, response.get().getOffset());
+
+ LOG.info("Sending two more messages");
+ ApiFuture response1 =
+ streamWriter.append(
+ createAppendRequest(writeStream.getName(), new String[] {"bbb", "ccc"}).build());
+ ApiFuture response2 =
+ streamWriter.append(
+ createAppendRequest(writeStream.getName(), new String[] {"ddd"}).build());
+ assertEquals(1, response1.get().getOffset());
+ assertEquals(3, response2.get().getOffset());
+ }
+
+ TableResult result =
+ bigquery.listTableData(tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
+ Iterator iter = result.getValues().iterator();
+ assertEquals("aaa", iter.next().get(0).getStringValue());
+ assertEquals("bbb", iter.next().get(0).getStringValue());
+ assertEquals("ccc", iter.next().get(0).getStringValue());
+ assertEquals("ddd", iter.next().get(0).getStringValue());
+ assertEquals(false, iter.hasNext());
+
+ LOG.info("Waiting for termination");
+ // The awaitTermination always returns false.
+ // assertEquals(true, streamWriter.awaitTermination(10, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testComplicateSchemaWithPendingStream()
+ throws IOException, InterruptedException, ExecutionException {
+ WriteStream writeStream =
+ client.createWriteStream(
+ CreateWriteStreamRequest.newBuilder()
+ .setParent(tableId2)
+ .setWriteStream(WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build())
+ .build());
+ try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) {
+ LOG.info("Sending two messages");
+ ApiFuture response =
+ streamWriter.append(
+ createAppendRequestComplicateType(writeStream.getName(), new String[] {"aaa"})
+ .setOffset(Int64Value.of(0L))
+ .build());
+ assertEquals(0, response.get().getOffset());
+
+ ApiFuture response2 =
+ streamWriter.append(
+ createAppendRequestComplicateType(writeStream.getName(), new String[] {"bbb"})
+ .setOffset(Int64Value.of(1L))
+ .build());
+ assertEquals(1, response2.get().getOffset());
+ } finally {
+ }
+
+ // Nothing showed up since rows are not committed.
+ TableResult result =
+ bigquery.listTableData(
+ tableInfo2.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
+ Iterator iter = result.getValues().iterator();
+ assertEquals(false, iter.hasNext());
+
+ FinalizeWriteStreamResponse finalizeResponse =
+ client.finalizeWriteStream(
+ FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build());
+ // Finalize row count is not populated.
+ // assertEquals(1, finalizeResponse.getRowCount());
+ BatchCommitWriteStreamsResponse batchCommitWriteStreamsResponse =
+ client.batchCommitWriteStreams(
+ BatchCommitWriteStreamsRequest.newBuilder()
+ .setParent(tableId2)
+ .addWriteStreams(writeStream.getName())
+ .build());
+ assertEquals(true, batchCommitWriteStreamsResponse.hasCommitTime());
+
+ LOG.info("Waiting for termination");
+ // The awaitTermination always returns false.
+ // assertEquals(true, streamWriter.awaitTermination(10, TimeUnit.SECONDS));
+
+ // Data showed up.
+ result =
+ bigquery.listTableData(
+ tableInfo2.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
+ iter = result.getValues().iterator();
+ assertEquals(
+ "[FieldValue{attribute=REPEATED, value=[FieldValue{attribute=PRIMITIVE, value=aaa}, FieldValue{attribute=PRIMITIVE, value=aaa}]}]",
+ iter.next().get(1).getRepeatedValue().toString());
+ assertEquals(
+ "[FieldValue{attribute=REPEATED, value=[FieldValue{attribute=PRIMITIVE, value=bbb}, FieldValue{attribute=PRIMITIVE, value=bbb}]}]",
+ iter.next().get(1).getRepeatedValue().toString());
+ assertEquals(false, iter.hasNext());
+ }
+
+ @Test
+ public void testStreamError() throws IOException, InterruptedException, ExecutionException {
+ WriteStream writeStream =
+ client.createWriteStream(
+ CreateWriteStreamRequest.newBuilder()
+ .setParent(tableId)
+ .setWriteStream(
+ WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
+ .build());
+ try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) {
+
+ AppendRowsRequest request =
+ createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build();
+ request
+ .toBuilder()
+ .setProtoRows(request.getProtoRows().toBuilder().clearWriterSchema().build())
+ .build();
+ ApiFuture response =
+ streamWriter.append(
+ createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build());
+ assertEquals(0L, response.get().getOffset());
+ // Send in a bogus stream name should cause in connection error.
+ ApiFuture response2 =
+ streamWriter.append(
+ createAppendRequest(writeStream.getName(), new String[] {"aaa"})
+ .setWriteStream("blah")
+ .build());
+ try {
+ response2.get().getOffset();
+ Assert.fail("Should fail");
+ } catch (ExecutionException e) {
+ assertEquals(
+ true,
+ e.getCause()
+ .getMessage()
+ .startsWith(
+ "INVALID_ARGUMENT: Stream name `blah` in the request doesn't match the one already specified"));
+ }
+ // We can keep sending requests on the same stream.
+ ApiFuture response3 =
+ streamWriter.append(
+ createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build());
+ assertEquals(1L, response3.get().getOffset());
+ } finally {
+ }
+ }
+
+ @Test
+ public void testStreamReconnect() throws IOException, InterruptedException, ExecutionException {
+ WriteStream writeStream =
+ client.createWriteStream(
+ CreateWriteStreamRequest.newBuilder()
+ .setParent(tableId)
+ .setWriteStream(
+ WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
+ .build());
+ try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) {
+ ApiFuture response =
+ streamWriter.append(
+ createAppendRequest(writeStream.getName(), new String[] {"aaa"})
+ .setOffset(Int64Value.of(0L))
+ .build());
+ assertEquals(0L, response.get().getOffset());
+ } finally {
+ }
+
+ try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) {
+ // Currently there is a bug that reconnection must wait 5 seconds to get the real row count.
+ Thread.sleep(5000L);
+ ApiFuture response =
+ streamWriter.append(
+ createAppendRequest(writeStream.getName(), new String[] {"bbb"})
+ .setOffset(Int64Value.of(1L))
+ .build());
+ assertEquals(1L, response.get().getOffset());
+ } finally {
+ }
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/test/proto/test.proto b/google-cloud-bigquerystorage/src/test/proto/test.proto
index 2b1a988ea6..34ef52b265 100644
--- a/google-cloud-bigquerystorage/src/test/proto/test.proto
+++ b/google-cloud-bigquerystorage/src/test/proto/test.proto
@@ -17,8 +17,6 @@ syntax = "proto2";
package com.google.cloud.bigquery.storage.test;
-import "google/protobuf/descriptor.proto";
-
enum TestEnum {
TestEnum0 = 0;
TestEnum1 = 1;
@@ -39,6 +37,7 @@ message AllSupportedTypes {
message InnerType {
repeated string value = 1;
}
+
message NestedType {
repeated InnerType inner_type = 1;
}
@@ -51,6 +50,11 @@ message ComplicateType {
message ContainsRecursive {
optional RecursiveType field = 1;
}
+
message RecursiveType {
optional ContainsRecursive field = 2;
}
+
+message FooType {
+ optional string foo = 1;
+}
diff --git a/pom.xml b/pom.xml
index 04ad6291a2..904c140215 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,6 +77,7 @@
1.4.1
1.3.2
1.18
+ 3.5
@@ -208,6 +209,11 @@
animal-sniffer-annotations
${animal-sniffer.version}
+