append(JSONArray jsonArr, long offset) {
}
}
- /**
- * Refreshes connection for a JsonStreamWriter by first flushing all remaining rows, then
- * recreates stream writer, and finally setting the descriptor. All of these actions need to be
- * performed atomically to avoid having synchronization issues with append(). Flushing all rows
- * first is necessary since if there are rows remaining when the connection refreshes, it will
- * send out the old writer schema instead of the new one.
- */
- void refreshConnection()
- throws IOException, InterruptedException, Descriptors.DescriptorValidationException {
- synchronized (this) {
- this.streamWriter.close();
- this.streamWriter = streamWriterBuilder.build();
- this.descriptor =
- BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(this.tableSchema);
- }
- }
-
/**
* Gets streamName
*
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/OnSchemaUpdateRunnable.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/OnSchemaUpdateRunnable.java
deleted file mode 100644
index dc2f855d0c..0000000000
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/OnSchemaUpdateRunnable.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright 2020 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.bigquery.storage.v1beta2;
-
-/**
- * A abstract class that implements the Runnable interface and provides access to the current
- * StreamWriter and updatedSchema. This runnable will only be called when a updated schema has been
- * passed back through the AppendRowsResponse. Users should only implement the run() function.
- *
- * @deprecated
- */
-public abstract class OnSchemaUpdateRunnable implements Runnable {
- private StreamWriter streamWriter;
- private TableSchema updatedSchema;
-
- /**
- * Setter for the updatedSchema
- *
- * @param updatedSchema
- */
- void setUpdatedSchema(TableSchema updatedSchema) {
- this.updatedSchema = updatedSchema;
- }
-
- /**
- * Setter for the streamWriter
- *
- * @param streamWriter
- */
- void setStreamWriter(StreamWriter streamWriter) {
- this.streamWriter = streamWriter;
- }
-
- /** Getter for the updatedSchema */
- TableSchema getUpdatedSchema() {
- return this.updatedSchema;
- }
-
- /** Getter for the streamWriter */
- StreamWriter getStreamWriter() {
- return this.streamWriter;
- }
-}
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java
deleted file mode 100644
index 4a937b6be5..0000000000
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java
+++ /dev/null
@@ -1,1019 +0,0 @@
-/*
- * Copyright 2020 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.google.cloud.bigquery.storage.v1beta2;
-
-import com.google.api.core.ApiFuture;
-import com.google.api.core.SettableApiFuture;
-import com.google.api.gax.batching.BatchingSettings;
-import com.google.api.gax.batching.FlowControlSettings;
-import com.google.api.gax.batching.FlowController;
-import com.google.api.gax.core.BackgroundResource;
-import com.google.api.gax.core.BackgroundResourceAggregation;
-import com.google.api.gax.core.CredentialsProvider;
-import com.google.api.gax.core.ExecutorAsBackgroundResource;
-import com.google.api.gax.core.ExecutorProvider;
-import com.google.api.gax.core.InstantiatingExecutorProvider;
-import com.google.api.gax.grpc.GrpcStatusCode;
-import com.google.api.gax.retrying.RetrySettings;
-import com.google.api.gax.rpc.AbortedException;
-import com.google.api.gax.rpc.BidiStreamingCallable;
-import com.google.api.gax.rpc.ClientStream;
-import com.google.api.gax.rpc.ResponseObserver;
-import com.google.api.gax.rpc.StreamController;
-import com.google.api.gax.rpc.TransportChannelProvider;
-import com.google.api.gax.rpc.UnimplementedException;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.Int64Value;
-import io.grpc.Status;
-import io.grpc.StatusRuntimeException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import javax.annotation.concurrent.GuardedBy;
-import org.threeten.bp.Duration;
-
-/**
- * This is to be used to managed streaming write when you are working with PENDING streams or want
- * to explicitly manage offset. In that most common cases when writing with COMMITTED stream without
- * offset, please use a simpler writer {@code DirectWriter}.
- *
- * A {@link StreamWrier} provides built-in capabilities to: handle batching of messages;
- * controlling memory utilization (through flow control) and request cleanup (only keeps write
- * schema on first request in the stream).
- *
- *
With customizable options that control:
- *
- *
- * - Message batching: such as number of messages or max batch byte size, and batching deadline
- *
- Inflight message control: such as number of messages or max batch byte size
- *
- *
- * {@link StreamWriter} will use the credentials set on the channel, which uses application
- * default credentials through {@link GoogleCredentials#getApplicationDefault} by default.
- *
- * @deprecated use {@link #StreamWriterV2()} instead.
- */
-@Deprecated
-public class StreamWriter implements AutoCloseable {
- private static final Logger LOG = Logger.getLogger(StreamWriter.class.getName());
-
- private static String streamPatternString =
- "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+";
- private static String tablePatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)";
-
- private static Pattern streamPattern = Pattern.compile(streamPatternString);
- private static Pattern tablePattern = Pattern.compile(tablePatternString);
-
- private final String streamName;
- private final String tableName;
-
- private final String traceId;
-
- private final BatchingSettings batchingSettings;
- private final RetrySettings retrySettings;
- private BigQueryWriteSettings stubSettings;
-
- private final Lock messagesBatchLock;
- private final Lock appendAndRefreshAppendLock;
-
- @GuardedBy("appendAndRefreshAppendLock")
- private final MessagesBatch messagesBatch;
-
- // Indicates if a stream has some non recoverable exception happened.
- private AtomicReference streamException;
-
- private BackgroundResource backgroundResources;
- private List backgroundResourceList;
-
- private BigQueryWriteClient stub;
- BidiStreamingCallable bidiStreamingCallable;
-
- @GuardedBy("appendAndRefreshAppendLock")
- ClientStream clientStream;
-
- private final AppendResponseObserver responseObserver;
-
- private final ScheduledExecutorService executor;
-
- @GuardedBy("appendAndRefreshAppendLock")
- private boolean shutdown;
-
- private final Waiter messagesWaiter;
-
- @GuardedBy("appendAndRefreshAppendLock")
- private boolean activeAlarm;
-
- private ScheduledFuture> currentAlarmFuture;
-
- private Integer currentRetries = 0;
-
- // Used for schema updates
- private OnSchemaUpdateRunnable onSchemaUpdateRunnable;
-
- /** The maximum size of one request. Defined by the API. */
- public static long getApiMaxRequestBytes() {
- return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
- }
-
- /** The maximum size of in flight requests. Defined by the API. */
- public static long getApiMaxInflightRequests() {
- return 5000L;
- }
-
- private StreamWriter(Builder builder)
- throws IllegalArgumentException, IOException, InterruptedException {
- if (builder.createDefaultStream) {
- Matcher matcher = tablePattern.matcher(builder.streamOrTableName);
- if (!matcher.matches()) {
- throw new IllegalArgumentException("Invalid table name: " + builder.streamOrTableName);
- }
- streamName = builder.streamOrTableName + "/_default";
- tableName = builder.streamOrTableName;
- } else {
- Matcher matcher = streamPattern.matcher(builder.streamOrTableName);
- if (!matcher.matches()) {
- throw new IllegalArgumentException("Invalid stream name: " + builder.streamOrTableName);
- }
- streamName = builder.streamOrTableName;
- tableName = matcher.group(1);
- }
-
- this.traceId = builder.traceId;
- this.batchingSettings = builder.batchingSettings;
- this.retrySettings = builder.retrySettings;
- this.messagesBatch = new MessagesBatch(batchingSettings, this.streamName, this);
- messagesBatchLock = new ReentrantLock();
- appendAndRefreshAppendLock = new ReentrantLock();
- activeAlarm = false;
- this.streamException = new AtomicReference(null);
-
- executor = builder.executorProvider.getExecutor();
- backgroundResourceList = new ArrayList<>();
- if (builder.executorProvider.shouldAutoClose()) {
- backgroundResourceList.add(new ExecutorAsBackgroundResource(executor));
- }
- messagesWaiter = new Waiter(this.batchingSettings.getFlowControlSettings());
- responseObserver = new AppendResponseObserver(this);
-
- if (builder.client == null) {
- stubSettings =
- BigQueryWriteSettings.newBuilder()
- .setCredentialsProvider(builder.credentialsProvider)
- .setTransportChannelProvider(builder.channelProvider)
- .setEndpoint(builder.endpoint)
- .build();
- stub = BigQueryWriteClient.create(stubSettings);
- backgroundResourceList.add(stub);
- } else {
- stub = builder.client;
- }
- backgroundResources = new BackgroundResourceAggregation(backgroundResourceList);
- shutdown = false;
- if (builder.onSchemaUpdateRunnable != null) {
- this.onSchemaUpdateRunnable = builder.onSchemaUpdateRunnable;
- this.onSchemaUpdateRunnable.setStreamWriter(this);
- }
-
- bidiStreamingCallable = stub.appendRowsCallable();
- clientStream = bidiStreamingCallable.splitCall(responseObserver);
- try {
- while (!clientStream.isSendReady()) {
- Thread.sleep(10);
- }
- } catch (InterruptedException e) {
- }
- }
-
- /** Stream name we are writing to. */
- public String getStreamNameString() {
- return streamName;
- }
-
- /** Table name we are writing to. */
- public String getTableNameString() {
- return tableName;
- }
-
- /** OnSchemaUpdateRunnable for this streamWriter. */
- OnSchemaUpdateRunnable getOnSchemaUpdateRunnable() {
- return this.onSchemaUpdateRunnable;
- }
-
- /**
- * Schedules the writing of a message. The write of the message may occur immediately or be
- * delayed based on the writer batching options.
- *
- * Example of writing a message.
- *
- *
{@code
- * AppendRowsRequest message;
- * ApiFuture messageIdFuture = writer.append(message);
- * ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback() {
- * public void onSuccess(AppendRowsResponse response) {
- * if (response.hasOffset()) {
- * System.out.println("written with offset: " + response.getOffset());
- * } else {
- * System.out.println("received an in stream error: " + response.error().toString());
- * }
- * }
- *
- * public void onFailure(Throwable t) {
- * System.out.println("failed to write: " + t);
- * }
- * }, MoreExecutors.directExecutor());
- * }
- *
- * @param message the message in serialized format to write to BigQuery.
- * @return the message ID wrapped in a future.
- */
- public ApiFuture append(AppendRowsRequest message) {
- appendAndRefreshAppendLock.lock();
-
- try {
- Preconditions.checkState(!shutdown, "Cannot append on a shut-down writer.");
- Preconditions.checkNotNull(message, "Message is null.");
- Preconditions.checkState(streamException.get() == null, "Stream already failed.");
- final AppendRequestAndFutureResponse outstandingAppend =
- new AppendRequestAndFutureResponse(message);
- List batchesToSend;
- batchesToSend = messagesBatch.add(outstandingAppend);
- // Setup the next duration based delivery alarm if there are messages batched.
- if (batchingSettings.getDelayThreshold() != null) {
- setupAlarm();
- }
- if (!batchesToSend.isEmpty()) {
- for (final InflightBatch batch : batchesToSend) {
- LOG.fine("Scheduling a batch for immediate sending");
- writeBatch(batch);
- }
- }
- return outstandingAppend.appendResult;
- } finally {
- appendAndRefreshAppendLock.unlock();
- }
- }
-
- /**
- * Re-establishes a stream connection.
- *
- * @throws InterruptedException
- */
- public void refreshAppend() throws InterruptedException {
- throw new UnimplementedException(null, GrpcStatusCode.of(Status.Code.UNIMPLEMENTED), false);
- }
-
- @GuardedBy("appendAndRefreshAppendLock")
- private void setupAlarm() {
- if (!messagesBatch.isEmpty()) {
- if (!activeAlarm) {
- long delayThresholdMs = getBatchingSettings().getDelayThreshold().toMillis();
- LOG.log(Level.FINE, "Setting up alarm for the next {0} ms.", delayThresholdMs);
- currentAlarmFuture =
- executor.schedule(
- new Runnable() {
- @Override
- public void run() {
- LOG.fine("Sending messages based on schedule");
- appendAndRefreshAppendLock.lock();
- activeAlarm = false;
- try {
- writeBatch(messagesBatch.popBatch());
- } finally {
- appendAndRefreshAppendLock.unlock();
- }
- }
- },
- delayThresholdMs,
- TimeUnit.MILLISECONDS);
- }
- } else if (currentAlarmFuture != null) {
- LOG.log(Level.FINER, "Cancelling alarm, no more messages");
- currentAlarmFuture.cancel(false);
- activeAlarm = false;
- }
- }
-
- /**
- * Write any outstanding batches if non-empty. This method sends buffered messages, but does not
- * wait for the send operations to complete. To wait for messages to send, call {@code get} on the
- * futures returned from {@code append}.
- */
- @GuardedBy("appendAndRefreshAppendLock")
- public void writeAllOutstanding() {
- InflightBatch unorderedOutstandingBatch = null;
- if (!messagesBatch.isEmpty()) {
- writeBatch(messagesBatch.popBatch());
- }
- messagesBatch.reset();
- }
-
- @GuardedBy("appendAndRefreshAppendLock")
- private void writeBatch(final InflightBatch inflightBatch) {
- if (inflightBatch != null) {
- AppendRowsRequest request = inflightBatch.getMergedRequest();
- try {
- appendAndRefreshAppendLock.unlock();
- messagesWaiter.acquire(inflightBatch.getByteSize());
- appendAndRefreshAppendLock.lock();
- if (shutdown || streamException.get() != null) {
- appendAndRefreshAppendLock.unlock();
- messagesWaiter.release(inflightBatch.getByteSize());
- appendAndRefreshAppendLock.lock();
- inflightBatch.onFailure(
- new AbortedException(
- shutdown
- ? "Stream closed, abort append."
- : "Stream has previous errors, abort append.",
- null,
- GrpcStatusCode.of(Status.Code.ABORTED),
- true));
- return;
- }
- responseObserver.addInflightBatch(inflightBatch);
- clientStream.send(request);
- } catch (FlowController.FlowControlException ex) {
- appendAndRefreshAppendLock.lock();
- inflightBatch.onFailure(ex);
- }
- }
- }
-
- /** Close the stream writer. Shut down all resources. */
- @Override
- public void close() {
- LOG.info("Closing stream writer:" + streamName);
- shutdown();
- try {
- awaitTermination(1, TimeUnit.MINUTES);
- } catch (InterruptedException ignored) {
- }
- }
-
- // The batch of messages that is being sent/processed.
- private static final class InflightBatch {
- // List of requests that is going to be batched.
- final List inflightRequests;
- // A list tracks expected offset for each AppendRequest. Used to reconstruct the Response
- // future.
- private final ArrayList offsetList;
- private final long creationTime;
- private int attempt;
- private long batchSizeBytes;
- private long expectedOffset;
- private Boolean attachSchema;
- private String streamName;
- private final AtomicBoolean failed;
- private final StreamWriter streamWriter;
-
- InflightBatch(
- List inflightRequests,
- long batchSizeBytes,
- String streamName,
- Boolean attachSchema,
- StreamWriter streamWriter) {
- this.inflightRequests = inflightRequests;
- this.offsetList = new ArrayList(inflightRequests.size());
- for (AppendRequestAndFutureResponse request : inflightRequests) {
- if (request.message.hasOffset()) {
- offsetList.add(new Long(request.message.getOffset().getValue()));
- } else {
- offsetList.add(new Long(-1));
- }
- }
- this.expectedOffset = offsetList.get(0).longValue();
- attempt = 1;
- creationTime = System.currentTimeMillis();
- this.batchSizeBytes = batchSizeBytes;
- this.attachSchema = attachSchema;
- this.streamName = streamName;
- this.failed = new AtomicBoolean(false);
- this.streamWriter = streamWriter;
- }
-
- int count() {
- return inflightRequests.size();
- }
-
- long getByteSize() {
- return this.batchSizeBytes;
- }
-
- long getExpectedOffset() {
- return expectedOffset;
- }
-
- private AppendRowsRequest getMergedRequest() throws IllegalStateException {
- if (inflightRequests.size() == 0) {
- throw new IllegalStateException("Unexpected empty message batch");
- }
- ProtoRows.Builder rowsBuilder =
- inflightRequests.get(0).message.getProtoRows().getRows().toBuilder();
- for (int i = 1; i < inflightRequests.size(); i++) {
- rowsBuilder.addAllSerializedRows(
- inflightRequests.get(i).message.getProtoRows().getRows().getSerializedRowsList());
- }
- AppendRowsRequest.ProtoData.Builder data =
- inflightRequests.get(0).message.getProtoRows().toBuilder().setRows(rowsBuilder.build());
- AppendRowsRequest.Builder requestBuilder = inflightRequests.get(0).message.toBuilder();
- if (!attachSchema) {
- data.clearWriterSchema();
- requestBuilder.clearWriteStream();
- } else {
- if (!data.hasWriterSchema()) {
- throw new IllegalStateException(
- "The first message on the connection must have writer schema set");
- }
- requestBuilder.setWriteStream(streamName);
- if (!inflightRequests.get(0).message.getTraceId().isEmpty()) {
- requestBuilder.setTraceId(inflightRequests.get(0).message.getTraceId());
- } else if (streamWriter.traceId != null) {
- requestBuilder.setTraceId(streamWriter.traceId);
- }
- }
- return requestBuilder.setProtoRows(data.build()).build();
- }
-
- private void onFailure(Throwable t) {
- if (failed.getAndSet(true)) {
- // Error has been set already.
- LOG.warning("Ignore " + t.toString() + " since error has already been set");
- return;
- }
-
- for (AppendRequestAndFutureResponse request : inflightRequests) {
- request.appendResult.setException(t);
- }
- }
-
- // Disassemble the batched response and sets the furture on individual request.
- private void onSuccess(AppendRowsResponse response) {
- for (int i = 0; i < inflightRequests.size(); i++) {
- AppendRowsResponse.Builder singleResponse = response.toBuilder();
- if (response.getAppendResult().hasOffset()) {
- long actualOffset = response.getAppendResult().getOffset().getValue();
- for (int j = 0; j < i; j++) {
- actualOffset +=
- inflightRequests.get(j).message.getProtoRows().getRows().getSerializedRowsCount();
- }
- singleResponse.setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(actualOffset)));
- }
- inflightRequests.get(i).appendResult.set(singleResponse.build());
- }
- }
- }
-
- // Class that wraps AppendRowsRequest and its cooresponding Response future.
- private static final class AppendRequestAndFutureResponse {
- final SettableApiFuture appendResult;
- final AppendRowsRequest message;
- final int messageSize;
-
- AppendRequestAndFutureResponse(AppendRowsRequest message) {
- this.appendResult = SettableApiFuture.create();
- this.message = message;
- this.messageSize = message.getProtoRows().getSerializedSize();
- if (this.messageSize > getApiMaxRequestBytes()) {
- throw new StatusRuntimeException(
- Status.fromCode(Status.Code.FAILED_PRECONDITION)
- .withDescription("Message exceeded max size limit: " + getApiMaxRequestBytes()));
- }
- }
- }
-
- /** The batching settings configured on this {@code StreamWriter}. */
- public BatchingSettings getBatchingSettings() {
- return batchingSettings;
- }
-
- /** The retry settings configured on this {@code StreamWriter}. */
- public RetrySettings getRetrySettings() {
- return retrySettings;
- }
-
- /**
- * Schedules immediate flush of any outstanding messages and waits until all are processed.
- *
- * Sends remaining outstanding messages and prevents future calls to publish. This method
- * should be invoked prior to deleting the {@link WriteStream} object in order to ensure that no
- * pending messages are lost.
- */
- protected void shutdown() {
- appendAndRefreshAppendLock.lock();
- try {
- if (shutdown) {
- LOG.fine("Already shutdown.");
- return;
- }
- shutdown = true;
- LOG.info("Shutdown called on writer: " + streamName);
- if (currentAlarmFuture != null && activeAlarm) {
- currentAlarmFuture.cancel(false);
- activeAlarm = false;
- }
- // Wait for current inflight to drain.
- try {
- appendAndRefreshAppendLock.unlock();
- messagesWaiter.waitComplete(0);
- } catch (InterruptedException e) {
- LOG.warning("Failed to wait for messages to return " + e.toString());
- }
- appendAndRefreshAppendLock.lock();
- // Try to send out what's left in batch.
- if (!messagesBatch.isEmpty()) {
- InflightBatch inflightBatch = messagesBatch.popBatch();
- AppendRowsRequest request = inflightBatch.getMergedRequest();
- if (streamException.get() != null) {
- inflightBatch.onFailure(
- new AbortedException(
- shutdown
- ? "Stream closed, abort append."
- : "Stream has previous errors, abort append.",
- null,
- GrpcStatusCode.of(Status.Code.ABORTED),
- true));
- } else {
- try {
- appendAndRefreshAppendLock.unlock();
- messagesWaiter.acquire(inflightBatch.getByteSize());
- appendAndRefreshAppendLock.lock();
- responseObserver.addInflightBatch(inflightBatch);
- clientStream.send(request);
- } catch (FlowController.FlowControlException ex) {
- appendAndRefreshAppendLock.lock();
- LOG.warning(
- "Unexpected flow control exception when sending batch leftover: " + ex.toString());
- }
- }
- }
- // Close the stream.
- try {
- appendAndRefreshAppendLock.unlock();
- messagesWaiter.waitComplete(0);
- } catch (InterruptedException e) {
- LOG.warning("Failed to wait for messages to return " + e.toString());
- }
- appendAndRefreshAppendLock.lock();
- if (clientStream.isSendReady()) {
- clientStream.closeSend();
- }
- backgroundResources.shutdown();
- } finally {
- appendAndRefreshAppendLock.unlock();
- }
- }
-
- /**
- * Wait for all work has completed execution after a {@link #shutdown()} request, or the timeout
- * occurs, or the current thread is interrupted.
- *
- *
Call this method to make sure all resources are freed properly.
- */
- protected boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
- return backgroundResources.awaitTermination(duration, unit);
- }
-
- /**
- * Constructs a new {@link Builder} using the given stream. If builder has createDefaultStream set
- * to true, then user should pass in a table name here.
- *
- *
Example of creating a {@code WriteStream}.
- *
- *
{@code
- * String table = "projects/my_project/datasets/my_dataset/tables/my_table";
- * String stream;
- * try (BigQueryWriteClient bigqueryWriteClient = BigQueryWriteClient.create()) {
- * CreateWriteStreamRequest request = CreateWriteStreamRequest.newBuilder().setParent(table).build();
- * WriteStream response = bigQueryWriteClient.createWriteStream(request);
- * stream = response.getName();
- * }
- * try (WriteStream writer = WriteStream.newBuilder(stream).build()) {
- * //...
- * }
- * }
- *
- * Example of creating a default {@code WriteStream}, which is COMMIT only and doesn't support
- * offset. But it will support higher thoughput per stream and not subject to CreateWriteStream
- * quotas.
- *
- *
{@code
- * String table = "projects/my_project/datasets/my_dataset/tables/my_table";
- * try (WriteStream writer = WriteStream.newBuilder(table).createDefaultStream().build()) {
- * //...
- * }
- * }
- */
- public static Builder newBuilder(String streamOrTableName) {
- Preconditions.checkNotNull(streamOrTableName, "streamOrTableName is null.");
- return new Builder(streamOrTableName, null);
- }
-
- /**
- * Constructs a new {@link Builder} using the given stream and an existing BigQueryWriteClient.
- */
- public static Builder newBuilder(String streamOrTableName, BigQueryWriteClient client) {
- Preconditions.checkNotNull(streamOrTableName, "streamOrTableName is null.");
- Preconditions.checkNotNull(client, "Client is null.");
- return new Builder(streamOrTableName, client);
- }
-
- /** A builder of {@link StreamWriter}s. */
- public static final class Builder {
- static final Duration MIN_TOTAL_TIMEOUT = Duration.ofSeconds(10);
- static final Duration MIN_RPC_TIMEOUT = Duration.ofMillis(10);
-
- // Meaningful defaults.
- static final FlowControlSettings DEFAULT_FLOW_CONTROL_SETTINGS =
- FlowControlSettings.newBuilder()
- .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
- .setMaxOutstandingElementCount(1000L)
- .setMaxOutstandingRequestBytes(100 * 1024 * 1024L) // 100 Mb
- .build();
- public static final BatchingSettings DEFAULT_BATCHING_SETTINGS =
- BatchingSettings.newBuilder()
- .setDelayThreshold(Duration.ofMillis(10))
- .setRequestByteThreshold(100 * 1024L) // 100 kb
- .setElementCountThreshold(100L)
- .setFlowControlSettings(DEFAULT_FLOW_CONTROL_SETTINGS)
- .build();
- public static final RetrySettings DEFAULT_RETRY_SETTINGS =
- RetrySettings.newBuilder()
- .setMaxRetryDelay(Duration.ofSeconds(60))
- .setInitialRetryDelay(Duration.ofMillis(100))
- .setMaxAttempts(3)
- .build();
- private static final int THREADS_PER_CPU = 5;
- static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
- InstantiatingExecutorProvider.newBuilder()
- .setExecutorThreadCount(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors())
- .build();
-
- private String streamOrTableName;
- private String endpoint = BigQueryWriteSettings.getDefaultEndpoint();
-
- private String traceId;
-
- private BigQueryWriteClient client = null;
-
- // Batching options
- BatchingSettings batchingSettings = DEFAULT_BATCHING_SETTINGS;
-
- RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS;
-
- private TransportChannelProvider channelProvider =
- BigQueryWriteSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build();
-
- ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
- private CredentialsProvider credentialsProvider =
- BigQueryWriteSettings.defaultCredentialsProviderBuilder().build();
-
- private OnSchemaUpdateRunnable onSchemaUpdateRunnable;
-
- private boolean createDefaultStream = false;
-
- private Builder(String streamOrTableName, BigQueryWriteClient client) {
- this.streamOrTableName = Preconditions.checkNotNull(streamOrTableName);
- this.client = client;
- }
-
- /**
- * {@code ChannelProvider} to use to create Channels, which must point at Cloud BigQuery Storage
- * API endpoint.
- *
- * For performance, this client benefits from having multiple underlying connections. See
- * {@link com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.Builder#setPoolSize(int)}.
- */
- public Builder setChannelProvider(TransportChannelProvider channelProvider) {
- this.channelProvider =
- Preconditions.checkNotNull(channelProvider, "ChannelProvider is null.");
- return this;
- }
-
- /** {@code CredentialsProvider} to use to create Credentials to authenticate calls. */
- public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
- this.credentialsProvider =
- Preconditions.checkNotNull(credentialsProvider, "CredentialsProvider is null.");
- return this;
- }
-
- /**
- * Sets the {@code BatchSettings} on the writer.
- *
- * @param batchingSettings
- * @return
- */
- public Builder setBatchingSettings(BatchingSettings batchingSettings) {
- Preconditions.checkNotNull(batchingSettings, "BatchingSettings is null.");
-
- BatchingSettings.Builder builder = batchingSettings.toBuilder();
- Preconditions.checkNotNull(batchingSettings.getElementCountThreshold());
- Preconditions.checkArgument(batchingSettings.getElementCountThreshold() > 0);
- Preconditions.checkNotNull(batchingSettings.getRequestByteThreshold());
- Preconditions.checkArgument(batchingSettings.getRequestByteThreshold() > 0);
- if (batchingSettings.getRequestByteThreshold() > getApiMaxRequestBytes()) {
- builder.setRequestByteThreshold(getApiMaxRequestBytes());
- }
- LOG.info("here" + batchingSettings.getFlowControlSettings());
- if (batchingSettings.getFlowControlSettings() == null) {
- builder.setFlowControlSettings(DEFAULT_FLOW_CONTROL_SETTINGS);
- } else {
- Long elementCount =
- batchingSettings.getFlowControlSettings().getMaxOutstandingElementCount();
- if (elementCount == null || elementCount > getApiMaxInflightRequests()) {
- elementCount = DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingElementCount();
- }
- Long elementSize =
- batchingSettings.getFlowControlSettings().getMaxOutstandingRequestBytes();
- if (elementSize == null || elementSize < 0) {
- elementSize = DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingRequestBytes();
- }
- FlowController.LimitExceededBehavior behavior =
- batchingSettings.getFlowControlSettings().getLimitExceededBehavior();
- if (behavior == null || behavior == FlowController.LimitExceededBehavior.Ignore) {
- behavior = DEFAULT_FLOW_CONTROL_SETTINGS.getLimitExceededBehavior();
- }
- builder.setFlowControlSettings(
- FlowControlSettings.newBuilder()
- .setMaxOutstandingElementCount(elementCount)
- .setMaxOutstandingRequestBytes(elementSize)
- .setLimitExceededBehavior(behavior)
- .build());
- }
- this.batchingSettings = builder.build();
- return this;
- }
-
- /**
- * Sets the {@code RetrySettings} on the writer.
- *
- * @param retrySettings
- * @return
- */
- public Builder setRetrySettings(RetrySettings retrySettings) {
- this.retrySettings = Preconditions.checkNotNull(retrySettings, "RetrySettings is null.");
- return this;
- }
-
- /** Gives the ability to set a custom executor to be used by the library. */
- public Builder setExecutorProvider(ExecutorProvider executorProvider) {
- this.executorProvider =
- Preconditions.checkNotNull(executorProvider, "ExecutorProvider is null.");
- return this;
- }
-
- /** Gives the ability to override the gRPC endpoint. */
- public Builder setEndpoint(String endpoint) {
- this.endpoint = Preconditions.checkNotNull(endpoint, "Endpoint is null.");
- return this;
- }
-
- /** Gives the ability to set action on schema update. */
- public Builder setOnSchemaUpdateRunnable(OnSchemaUpdateRunnable onSchemaUpdateRunnable) {
- this.onSchemaUpdateRunnable =
- Preconditions.checkNotNull(onSchemaUpdateRunnable, "onSchemaUpdateRunnable is null.");
- return this;
- }
-
- /** If the stream is a default stream. */
- public Builder createDefaultStream() {
- this.createDefaultStream = true;
- return this;
- }
-
- /** Mark the request as coming from Dataflow. */
- public Builder setDataflowTraceId() {
- this.traceId = "Dataflow";
- return this;
- }
-
- /** Builds the {@code StreamWriter}. */
- public StreamWriter build() throws IllegalArgumentException, IOException, InterruptedException {
- return new StreamWriter(this);
- }
- }
-
- private static final class AppendResponseObserver
- implements ResponseObserver {
- private Queue inflightBatches = new LinkedList();
- private StreamWriter streamWriter;
-
- public void addInflightBatch(InflightBatch batch) {
- synchronized (this.inflightBatches) {
- this.inflightBatches.add(batch);
- }
- }
-
- public AppendResponseObserver(StreamWriter streamWriter) {
- this.streamWriter = streamWriter;
- }
-
- private boolean isRecoverableError(Throwable t) {
- Status status = Status.fromThrowable(t);
- return status.getCode() == Status.Code.UNAVAILABLE;
- }
-
- @Override
- public void onStart(StreamController controller) {
- // no-op
- }
-
- private void abortInflightRequests(Throwable t) {
- LOG.fine("Aborting all inflight requests");
- synchronized (this.inflightBatches) {
- boolean first_error = true;
- while (!this.inflightBatches.isEmpty()) {
- InflightBatch inflightBatch = this.inflightBatches.poll();
- if (first_error || t.getCause().getClass() == AbortedException.class) {
- inflightBatch.onFailure(t);
- first_error = false;
- } else {
- inflightBatch.onFailure(
- new AbortedException(
- "Request aborted due to previous failures",
- t,
- GrpcStatusCode.of(Status.Code.ABORTED),
- true));
- }
- streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
- }
- }
- }
-
- @Override
- public void onResponse(AppendRowsResponse response) {
- InflightBatch inflightBatch = null;
- synchronized (this.inflightBatches) {
- inflightBatch = this.inflightBatches.poll();
- }
- try {
- streamWriter.currentRetries = 0;
- if (response == null) {
- inflightBatch.onFailure(new IllegalStateException("Response is null"));
- }
- if (response.hasUpdatedSchema()) {
- if (streamWriter.getOnSchemaUpdateRunnable() != null) {
- streamWriter.getOnSchemaUpdateRunnable().setUpdatedSchema(response.getUpdatedSchema());
- streamWriter.executor.schedule(
- streamWriter.getOnSchemaUpdateRunnable(), 0L, TimeUnit.MILLISECONDS);
- }
- }
- // Currently there is nothing retryable. If the error is already exists, then ignore it.
- if (response.hasError()) {
- StatusRuntimeException exception =
- new StatusRuntimeException(
- Status.fromCodeValue(response.getError().getCode())
- .withDescription(response.getError().getMessage()));
- inflightBatch.onFailure(exception);
- } else {
- if (inflightBatch.getExpectedOffset() > 0
- && (response.getAppendResult().hasOffset()
- && response.getAppendResult().getOffset().getValue()
- != inflightBatch.getExpectedOffset())) {
- IllegalStateException exception =
- new IllegalStateException(
- String.format(
- "The append result offset %s does not match the expected offset %s.",
- response.getAppendResult().getOffset().getValue(),
- inflightBatch.getExpectedOffset()));
- inflightBatch.onFailure(exception);
- abortInflightRequests(
- new AbortedException(
- "Request aborted due to previous failures",
- exception,
- GrpcStatusCode.of(Status.Code.ABORTED),
- true));
- } else {
- inflightBatch.onSuccess(response);
- }
- }
- } finally {
- streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
- }
- }
-
- @Override
- public void onComplete() {
- LOG.info("OnComplete called");
- }
-
- @Override
- public void onError(Throwable t) {
- LOG.info("OnError called: " + t.toString());
- streamWriter.streamException.set(t);
- abortInflightRequests(t);
- }
- };
-
- // This class controls how many messages are going to be sent out in a batch.
- private static class MessagesBatch {
- private List messages;
- private long batchedBytes;
- private final BatchingSettings batchingSettings;
- private Boolean attachSchema = true;
- private final String streamName;
- private final StreamWriter streamWriter;
-
- private MessagesBatch(
- BatchingSettings batchingSettings, String streamName, StreamWriter streamWriter) {
- this.batchingSettings = batchingSettings;
- this.streamName = streamName;
- this.streamWriter = streamWriter;
- reset();
- }
-
- // Get all the messages out in a batch.
- @GuardedBy("appendAndRefreshAppendLock")
- private InflightBatch popBatch() {
- InflightBatch batch =
- new InflightBatch(
- messages, batchedBytes, this.streamName, this.attachSchema, this.streamWriter);
- this.attachSchema = false;
- reset();
- return batch;
- }
-
- private void reset() {
- messages = new LinkedList<>();
- batchedBytes = 0;
- }
-
- private void resetAttachSchema() {
- attachSchema = true;
- }
-
- private boolean isEmpty() {
- return messages.isEmpty();
- }
-
- private long getBatchedBytes() {
- return batchedBytes;
- }
-
- private int getMessagesCount() {
- return messages.size();
- }
-
- private boolean hasBatchingBytes() {
- return getMaxBatchBytes() > 0;
- }
-
- private long getMaxBatchBytes() {
- return batchingSettings.getRequestByteThreshold();
- }
-
- // The message batch returned could contain the previous batch of messages plus the current
- // message.
- // if the message is too large.
- @GuardedBy("appendAndRefreshAppendLock")
- private List add(AppendRequestAndFutureResponse outstandingAppend) {
- List batchesToSend = new ArrayList<>();
- // Check if the next message makes the current batch exceed the max batch byte size.
- if (!isEmpty()
- && hasBatchingBytes()
- && getBatchedBytes() + outstandingAppend.messageSize >= getMaxBatchBytes()) {
- batchesToSend.add(popBatch());
- }
-
- messages.add(outstandingAppend);
- batchedBytes += outstandingAppend.messageSize;
-
- // Border case: If the message to send is greater or equals to the max batch size then send it
- // immediately.
- // Alternatively if after adding the message we have reached the batch max messages then we
- // have a batch to send.
- if ((hasBatchingBytes() && outstandingAppend.messageSize >= getMaxBatchBytes())
- || getMessagesCount() == batchingSettings.getElementCountThreshold()) {
- batchesToSend.add(popBatch());
- }
- return batchesToSend;
- }
- }
-}
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java
index ad104c497b..1d83af111e 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java
@@ -172,6 +172,7 @@ private StreamWriterV2(Builder builder) throws IOException {
this.client = builder.client;
this.ownsBigQueryWriteClient = false;
}
+
this.streamConnection =
new StreamConnection(
this.client,
@@ -492,7 +493,11 @@ private AppendRequestAndResponse pollInflightRequestQueue() {
return requestWrapper;
}
- /** Constructs a new {@link StreamWriterV2.Builder} using the given stream and client. */
+ /**
+ * Constructs a new {@link StreamWriterV2.Builder} using the given stream and client. AppendRows
+ * needs special headers to be added to client, so a passed in client will not work. This should
+ * be used by test only.
+ */
public static StreamWriterV2.Builder newBuilder(String streamName, BigQueryWriteClient client) {
return new StreamWriterV2.Builder(streamName, client);
}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java
deleted file mode 100644
index 7821cbae2f..0000000000
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java
+++ /dev/null
@@ -1,1398 +0,0 @@
-/*
- * Copyright 2020 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.bigquery.storage.v1beta2;
-
-import static com.google.common.truth.Truth.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import com.google.api.core.ApiFuture;
-import com.google.api.gax.batching.BatchingSettings;
-import com.google.api.gax.batching.FlowControlSettings;
-import com.google.api.gax.batching.FlowController;
-import com.google.api.gax.core.ExecutorProvider;
-import com.google.api.gax.core.FixedExecutorProvider;
-import com.google.api.gax.core.InstantiatingExecutorProvider;
-import com.google.api.gax.core.NoCredentialsProvider;
-import com.google.api.gax.grpc.testing.LocalChannelProvider;
-import com.google.api.gax.grpc.testing.MockGrpcService;
-import com.google.api.gax.grpc.testing.MockServiceHelper;
-import com.google.api.gax.rpc.AbortedException;
-import com.google.api.gax.rpc.DataLossException;
-import com.google.api.gax.rpc.UnknownException;
-import com.google.cloud.bigquery.storage.test.Test.FooType;
-import com.google.common.base.Strings;
-import com.google.protobuf.DescriptorProtos;
-import com.google.protobuf.Int64Value;
-import com.google.protobuf.Timestamp;
-import io.grpc.Status;
-import io.grpc.StatusRuntimeException;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.threeten.bp.Duration;
-import org.threeten.bp.Instant;
-
-@RunWith(JUnit4.class)
-public class StreamWriterTest {
- private static final Logger LOG = Logger.getLogger(StreamWriterTest.class.getName());
- private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s";
- private static final String TEST_TABLE = "projects/p/datasets/d/tables/t";
- private static final ExecutorProvider SINGLE_THREAD_EXECUTOR =
- InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();
- private static LocalChannelProvider channelProvider;
- private FakeScheduledExecutorService fakeExecutor;
- private FakeBigQueryWrite testBigQueryWrite;
- private static MockServiceHelper serviceHelper;
-
- @Before
- public void setUp() throws Exception {
- testBigQueryWrite = new FakeBigQueryWrite();
- serviceHelper =
- new MockServiceHelper(
- UUID.randomUUID().toString(), Arrays.asList(testBigQueryWrite));
- serviceHelper.start();
- channelProvider = serviceHelper.createChannelProvider();
- fakeExecutor = new FakeScheduledExecutorService();
- testBigQueryWrite.setExecutor(fakeExecutor);
- Instant time = Instant.now();
- Timestamp timestamp =
- Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build();
- // Add enough GetWriteStream response.
- for (int i = 0; i < 4; i++) {
- testBigQueryWrite.addResponse(
- WriteStream.newBuilder().setName(TEST_STREAM).setCreateTime(timestamp).build());
- }
- }
-
- @After
- public void tearDown() throws Exception {
- LOG.info("tearDown called");
- serviceHelper.stop();
- }
-
- private StreamWriter.Builder getTestStreamWriterBuilder(String testStream) {
- return StreamWriter.newBuilder(testStream)
- .setChannelProvider(channelProvider)
- .setExecutorProvider(SINGLE_THREAD_EXECUTOR)
- .setCredentialsProvider(NoCredentialsProvider.create());
- }
-
- private StreamWriter.Builder getTestStreamWriterBuilder() {
- return getTestStreamWriterBuilder(TEST_STREAM);
- }
-
- private AppendRowsRequest createAppendRequest(String[] messages, long offset) {
- AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
- AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder();
- dataBuilder.setWriterSchema(
- ProtoSchema.newBuilder()
- .setProtoDescriptor(
- DescriptorProtos.DescriptorProto.newBuilder()
- .setName("Message")
- .addField(
- DescriptorProtos.FieldDescriptorProto.newBuilder()
- .setName("foo")
- .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING)
- .setNumber(1)
- .build())
- .build()));
- ProtoRows.Builder rows = ProtoRows.newBuilder();
- for (String message : messages) {
- FooType foo = FooType.newBuilder().setFoo(message).build();
- rows.addSerializedRows(foo.toByteString());
- }
- if (offset > 0) {
- requestBuilder.setOffset(Int64Value.of(offset));
- }
- return requestBuilder
- .setProtoRows(dataBuilder.setRows(rows.build()).build())
- .setWriteStream(TEST_STREAM)
- .build();
- }
-
- private ApiFuture sendTestMessage(
- StreamWriter writer, String[] messages, int offset) {
- return writer.append(createAppendRequest(messages, offset));
- }
-
- private ApiFuture sendTestMessage(StreamWriter writer, String[] messages) {
- return writer.append(createAppendRequest(messages, -1));
- }
-
- @Test
- public void testTableName() throws Exception {
- try (StreamWriter writer = getTestStreamWriterBuilder().build()) {
- assertEquals("projects/p/datasets/d/tables/t", writer.getTableNameString());
- }
- }
-
- @Test
- public void testDefaultStream() throws Exception {
- try (StreamWriter writer =
- StreamWriter.newBuilder(TEST_TABLE)
- .createDefaultStream()
- .setChannelProvider(channelProvider)
- .setExecutorProvider(SINGLE_THREAD_EXECUTOR)
- .setCredentialsProvider(NoCredentialsProvider.create())
- .build()) {
- assertEquals("projects/p/datasets/d/tables/t", writer.getTableNameString());
- assertEquals("projects/p/datasets/d/tables/t/_default", writer.getStreamNameString());
- }
- }
-
- @Test
- public void testAppendByDuration() throws Exception {
- StreamWriter writer =
- getTestStreamWriterBuilder()
- .setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setDelayThreshold(Duration.ofSeconds(5))
- .setElementCountThreshold(10L)
- .build())
- .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor))
- .build();
-
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
- .build());
- ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"});
- ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"});
-
- assertFalse(appendFuture1.isDone());
- assertFalse(appendFuture2.isDone());
- fakeExecutor.advanceTime(Duration.ofSeconds(10));
-
- assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue());
- assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue());
- appendFuture1.get();
- appendFuture2.get();
- assertEquals(1, testBigQueryWrite.getAppendRequests().size());
-
- assertEquals(
- 2,
- testBigQueryWrite
- .getAppendRequests()
- .get(0)
- .getProtoRows()
- .getRows()
- .getSerializedRowsCount());
- assertEquals(
- true, testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema());
- writer.close();
- }
-
- @Test
- public void testAppendByNumBatchedMessages() throws Exception {
- StreamWriter writer =
- getTestStreamWriterBuilder()
- .setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setElementCountThreshold(2L)
- .setDelayThreshold(Duration.ofSeconds(100))
- .build())
- .build();
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
- .build());
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build())
- .build());
-
- ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"});
- ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"});
- ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"});
-
- assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue());
- assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue());
-
- assertFalse(appendFuture3.isDone());
-
- ApiFuture appendFuture4 = sendTestMessage(writer, new String[] {"D"});
-
- assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue());
- assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue());
-
- assertEquals(2, testBigQueryWrite.getAppendRequests().size());
- assertEquals(
- 2,
- testBigQueryWrite
- .getAppendRequests()
- .get(0)
- .getProtoRows()
- .getRows()
- .getSerializedRowsCount());
- assertEquals(
- true, testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema());
- assertEquals(
- 2,
- testBigQueryWrite
- .getAppendRequests()
- .get(1)
- .getProtoRows()
- .getRows()
- .getSerializedRowsCount());
- assertEquals(
- false, testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema());
- writer.close();
- }
-
- @Test
- public void testAppendByNumBytes() throws Exception {
- StreamWriter writer =
- getTestStreamWriterBuilder()
- .setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- // Each message is 32 bytes, setting batch size to 70 bytes allows 2 messages.
- .setRequestByteThreshold(70L)
- .setDelayThreshold(Duration.ofSeconds(100000))
- .build())
- .build();
-
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
- .build());
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build())
- .build());
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build())
- .build());
-
- ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"});
- ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"});
- ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"});
-
- assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue());
- assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue());
-
- assertFalse(appendFuture3.isDone());
-
- // This message is big enough to trigger send on the previous message and itself.
- ApiFuture appendFuture4 =
- sendTestMessage(writer, new String[] {Strings.repeat("A", 100)});
- assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue());
- assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue());
-
- assertEquals(3, testBigQueryWrite.getAppendRequests().size());
-
- writer.close();
- }
-
- @Test
- public void testShutdownFlushBatch() throws Exception {
- StreamWriter writer =
- getTestStreamWriterBuilder()
- .setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setDelayThreshold(Duration.ofSeconds(100))
- .setElementCountThreshold(10L)
- .build())
- .build();
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
- .build());
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build())
- .build());
-
- ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"});
- ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"});
-
- // Note we are not advancing time or reaching the count threshold but messages should
- // still get written by call to shutdown
-
- writer.close();
-
- // Verify the appends completed
- assertTrue(appendFuture1.isDone());
- assertTrue(appendFuture2.isDone());
- assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue());
- assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue());
- }
-
- @Test
- public void testWriteMixedSizeAndDuration() throws Exception {
- try (StreamWriter writer =
- getTestStreamWriterBuilder()
- .setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setElementCountThreshold(2L)
- .setDelayThreshold(Duration.ofSeconds(1))
- .build())
- .build()) {
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
- .build());
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build())
- .build());
-
- ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"});
- assertFalse(appendFuture1.isDone());
-
- ApiFuture appendFuture2 =
- sendTestMessage(writer, new String[] {"B", "C"});
-
- // Write triggered by batch size
- assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue());
- assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue());
-
- ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"D"});
- assertFalse(appendFuture3.isDone());
- // Eventually will be triggered by time elapsed.
- assertEquals(3L, appendFuture3.get().getAppendResult().getOffset().getValue());
-
- assertEquals(
- 3,
- testBigQueryWrite
- .getAppendRequests()
- .get(0)
- .getProtoRows()
- .getRows()
- .getSerializedRowsCount());
- assertEquals(
- true, testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema());
- assertEquals(
- 1,
- testBigQueryWrite
- .getAppendRequests()
- .get(1) // this gives IndexOutOfBounds error at the moment
- .getProtoRows()
- .getRows()
- .getSerializedRowsCount());
- assertEquals(
- false, testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema());
- Thread.sleep(1005);
- assertTrue(appendFuture3.isDone());
- }
- }
-
- @Test
- public void testBatchIsFlushed() throws Exception {
- try (StreamWriter writer =
- getTestStreamWriterBuilder()
- .setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setElementCountThreshold(2L)
- .setDelayThreshold(Duration.ofSeconds(1))
- .build())
- .build()) {
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
- .build());
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build())
- .build());
-
- ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"});
- assertFalse(appendFuture1.isDone());
- writer.shutdown();
- // Write triggered by shutdown.
- assertTrue(appendFuture1.isDone());
- assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue());
- }
- }
-
- @Test
- public void testBatchIsFlushedWithError() throws Exception {
- try (StreamWriter writer =
- getTestStreamWriterBuilder()
- .setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setElementCountThreshold(2L)
- .setDelayThreshold(Duration.ofSeconds(1))
- .build())
- .build()) {
- testBigQueryWrite.addException(Status.DATA_LOSS.asException());
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build())
- .build());
-
- ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"});
- try {
- ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"});
- ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"});
- try {
- appendFuture2.get();
- } catch (ExecutionException ex) {
- assertEquals(DataLossException.class, ex.getCause().getClass());
- }
- assertFalse(appendFuture3.isDone());
- writer.shutdown();
- try {
- appendFuture3.get();
- } catch (ExecutionException ex) {
- assertEquals(AbortedException.class, ex.getCause().getClass());
- }
- } catch (IllegalStateException ex) {
- assertEquals("Stream already failed.", ex.getMessage());
- }
- }
- }
-
- @Test
- public void testFlowControlBehaviorBlock() throws Exception {
- StreamWriter writer =
- getTestStreamWriterBuilder()
- .setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setElementCountThreshold(1L)
- .setFlowControlSettings(
- StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS
- .toBuilder()
- .setMaxOutstandingElementCount(2L)
- .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
- .build())
- .build())
- .build();
-
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build())
- .build());
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build())
- .build());
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(4)).build())
- .build());
- // Response will have a 10 second delay before server sends them back.
- testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10));
-
- ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}, 2);
- final StreamWriter writer1 = writer;
- ExecutorService executor = Executors.newFixedThreadPool(2);
- Callable callable =
- new Callable() {
- @Override
- public Throwable call() {
- try {
- ApiFuture appendFuture2 =
- sendTestMessage(writer1, new String[] {"B"}, 3);
- ApiFuture appendFuture3 =
- sendTestMessage(writer1, new String[] {"C"}, 4);
- // This request will be send out immediately because there is space in inflight queue.
- // The time advance in the main thread will cause it to be sent back.
- if (3 != appendFuture2.get().getAppendResult().getOffset().getValue()) {
- return new Exception(
- "expected 3 but got "
- + appendFuture2.get().getAppendResult().getOffset().getValue());
- }
- testBigQueryWrite.waitForResponseScheduled();
- // Wait a while so that the close is called before we release the last response on the
- // ohter thread.
- Thread.sleep(500);
- // This triggers the last response to come back.
- fakeExecutor.advanceTime(Duration.ofSeconds(10));
- // This request will be waiting for previous response to come back.
- if (4 != appendFuture3.get().getAppendResult().getOffset().getValue()) {
- return new Exception(
- "expected 4 but got "
- + appendFuture3.get().getAppendResult().getOffset().getValue());
- }
- return null;
- } catch (IllegalStateException ex) {
- // Sometimes the close will race before these calls.
- return null;
- } catch (Exception e) {
- return e;
- }
- }
- };
- Future future = executor.submit(callable);
- assertEquals(false, appendFuture1.isDone());
- testBigQueryWrite.waitForResponseScheduled();
- testBigQueryWrite.waitForResponseScheduled();
- // This will trigger the previous two responses to come back.
- fakeExecutor.advanceTime(Duration.ofSeconds(10));
- assertEquals(2L, appendFuture1.get().getAppendResult().getOffset().getValue());
- Thread.sleep(500);
- // When close is called, there should be one inflight request waiting.
- writer.close();
- if (future.get() != null) {
- future.get().printStackTrace();
- fail("Call got exception: " + future.get().toString());
- }
- // Everything should come back.
- executor.shutdown();
- }
-
- @Test
- public void testFlowControlBehaviorBlockWithError() throws Exception {
- StreamWriter writer =
- getTestStreamWriterBuilder()
- .setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setElementCountThreshold(1L)
- .setFlowControlSettings(
- StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS
- .toBuilder()
- .setMaxOutstandingElementCount(2L)
- .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
- .build())
- .build())
- .build();
-
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build())
- .build());
- testBigQueryWrite.addException(Status.DATA_LOSS.asException());
-
- ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}, 2);
- final StreamWriter writer1 = writer;
- ExecutorService executor = Executors.newFixedThreadPool(2);
- Callable callable =
- new Callable() {
- @Override
- public Throwable call() {
- try {
- ApiFuture appendFuture2 =
- sendTestMessage(writer1, new String[] {"B"}, 3);
- ApiFuture appendFuture3 =
- sendTestMessage(writer1, new String[] {"C"}, 4);
- try {
- // This request will be send out immediately because there is space in inflight
- // queue.
- assertEquals(3L, appendFuture2.get().getAppendResult().getOffset().getValue());
- return new Exception("Should have failure on future2");
- } catch (ExecutionException e) {
- if (e.getCause().getClass() != DataLossException.class) {
- return e;
- }
- }
- try {
- // This request will be waiting for previous response to come back.
- assertEquals(4L, appendFuture3.get().getAppendResult().getOffset().getValue());
- fail("Should be aborted future3");
- } catch (ExecutionException e) {
- if (e.getCause().getClass() != AbortedException.class) {
- return e;
- }
- }
- return null;
- } catch (IllegalStateException ex) {
- // Sometimes the append will happen after the stream is shutdown.
- ex.printStackTrace();
- return null;
- } catch (Exception e) {
- return e;
- }
- }
- };
- Future future = executor.submit(callable);
- // Wait is necessary for response to be scheduled before timer is advanced.
- testBigQueryWrite.waitForResponseScheduled();
- testBigQueryWrite.waitForResponseScheduled();
- // This will trigger the previous two responses to come back.
- fakeExecutor.advanceTime(Duration.ofSeconds(10));
- // The first requests gets back while the second one is blocked.
- assertEquals(2L, appendFuture1.get().getAppendResult().getOffset().getValue());
- Thread.sleep(500);
- // When close is called, there should be one inflight request waiting.
- writer.close();
- if (future.get() != null) {
- future.get().printStackTrace();
- fail("Call got exception: " + future.get().toString());
- }
- // Everything should come back.
- executor.shutdown();
- }
-
- @Test
- public void testAppendWhileShutdownSuccess() throws Exception {
- StreamWriter writer =
- getTestStreamWriterBuilder()
- .setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- // When shutdown, we should have something in batch.
- .setElementCountThreshold(3L)
- .setDelayThreshold(Duration.ofSeconds(1000))
- .setFlowControlSettings(
- StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS
- .toBuilder()
- // When shutdown, we should have something in flight.
- .setMaxOutstandingElementCount(5L)
- .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
- .build())
- .build())
- .build();
-
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build())
- .build());
- for (int i = 1; i < 13; i++) {
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder()
- .setOffset(Int64Value.of(i * 3 + 2))
- .build())
- .build());
- }
- ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}, 2);
- final StreamWriter writer1 = writer;
- ExecutorService executor = Executors.newFixedThreadPool(2);
- Callable callable =
- new Callable() {
- @Override
- public Throwable call() {
- try {
- LinkedList> responses =
- new LinkedList>();
- int last_count = 0;
- for (int i = 0; i < 20; i++) {
- try {
- responses.add(sendTestMessage(writer1, new String[] {"B"}, i + 3));
- } catch (IllegalStateException ex) {
- LOG.info("Stopped at " + i + " responses:" + responses.size());
- last_count = i;
- if ("Cannot append on a shut-down writer." != ex.getMessage()) {
- return new Exception("Got unexpected message:" + ex.getMessage());
- }
- break;
- } catch (AbortedException ex) {
- LOG.info("Stopped at " + i + " responses:" + responses.size());
- last_count = i;
- if ("Stream closed, abort append." != ex.getMessage()) {
- return new Exception("Got unexpected message:" + ex.getMessage());
- }
- break;
- }
- }
- // For all the requests that are sent in, we should get a finished callback.
- for (int i = 0; i < last_count; i++) {
- if (i + 3 != responses.get(i).get().getAppendResult().getOffset().getValue()) {
- return new Exception(
- "Got unexpected offset expect:"
- + i
- + " actual:"
- + responses.get(i - 3).get().getAppendResult().getOffset().getValue());
- }
- }
- return null;
- } catch (ExecutionException ex) {
- // Some wiredness in test presubmit runs, it seems this thread is always started after
- // the main thread.
- if (ex.getCause().getClass() == AbortedException.class) {
- return null;
- } else {
- return ex;
- }
- } catch (Exception e) {
- return e;
- }
- }
- };
- Future future = executor.submit(callable);
- assertEquals(false, appendFuture1.isDone());
- // The first requests gets back while the second one is blocked.
- assertEquals(2L, appendFuture1.get().getAppendResult().getOffset().getValue());
- // When close is called, there should be one inflight request waiting.
- writer.close();
- if (future.get() != null) {
- future.get().printStackTrace();
- fail("Call got exception: " + future.get().toString());
- }
- // Everything should come back.
- executor.shutdown();
- }
-
- @Test
- public void testAppendWhileShutdownFailed() throws Exception {
- StreamWriter writer =
- getTestStreamWriterBuilder()
- .setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- // When shutdown, we should have something in batch.
- .setElementCountThreshold(3L)
- .setDelayThreshold(Duration.ofSeconds(10))
- .setFlowControlSettings(
- StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS
- .toBuilder()
- // When shutdown, we should have something in flight.
- .setMaxOutstandingElementCount(5L)
- .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
- .build())
- .build())
- .build();
-
- // The responses are for every 3 messages.
- for (int i = 0; i < 2; i++) {
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder()
- .setOffset(Int64Value.of(i * 3))
- .build())
- .build());
- }
- for (int i = 2; i < 6; i++) {
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setError(
- com.google.rpc.Status.newBuilder().setCode(3).setMessage("error " + i).build())
- .build());
- }
- // Stream failed at 7th request.
- for (int i = 6; i < 10; i++) {
- testBigQueryWrite.addException(new UnsupportedOperationException("Strange exception"));
- }
- final StreamWriter writer1 = writer;
- ExecutorService executor = Executors.newFixedThreadPool(2);
- Callable callable =
- new Callable() {
- @Override
- public Throwable call() {
- try {
- LinkedList> responses =
- new LinkedList>();
- int last_count = 30;
- LOG.info(
- "Send 30 messages, will be batched into 10 messages, start fail at 7th message");
- for (int i = 0; i < 30; i++) {
- try {
- responses.add(sendTestMessage(writer1, new String[] {"B"}, i));
- Thread.sleep(500);
- } catch (IllegalStateException ex) {
- LOG.info("Stopped at sending request no." + i + " ex: " + ex.toString());
- last_count = i;
- if ("Stream already failed." != ex.getMessage()
- && "Cannot append on a shut-down writer." != ex.getMessage()) {
- return new Exception("Got unexpected message:" + ex.getMessage());
- }
- break;
- }
- }
- // Verify sent responses.
- // For all the requests that are sent in, we should get a finished callback.
- for (int i = 0; i < 2 * 3; i++) {
- try {
- if (i != responses.get(i).get().getAppendResult().getOffset().getValue()) {
- return new Exception(
- "Got unexpected offset expect:"
- + i
- + " actual:"
- + responses.get(i).get().getAppendResult().getOffset().getValue());
- }
- } catch (Exception e) {
- return e;
- }
- }
- // For all the requests that are sent in, we should get a finished callback.
- for (int i = 2 * 3; i < 6 * 3; i++) {
- try {
- responses.get(i).get();
- return new Exception(
- "Expect response return an error after a in-stream exception");
- } catch (Exception e) {
- if (e.getCause().getClass() != StatusRuntimeException.class) {
- return new Exception(
- "Expect first error of stream exception to be the original exception but got"
- + e.getCause().toString());
- }
- }
- }
- LOG.info("Last count is:" + last_count);
- for (int i = 6 * 3; i < last_count; i++) {
- try {
- responses.get(i).get();
- return new Exception("Expect response return an error after a stream exception");
- } catch (Exception e) {
- if (e.getCause().getClass() != UnknownException.class
- && e.getCause().getClass() != AbortedException.class) {
- return new Exception("Unexpected stream exception:" + e.toString());
- }
- }
- }
- return null;
- } catch (Exception e) {
- return e;
- }
- }
- };
- Future future = executor.submit(callable);
- // Wait for at least 7 request (after batch) to reach server.
- for (int i = 0; i < 7; i++) {
- LOG.info("Wait for " + i + " response scheduled");
- testBigQueryWrite.waitForResponseScheduled();
- }
- Thread.sleep(500);
- writer.close();
- if (future.get() != null) {
- future.get().printStackTrace();
- fail("Callback got exception" + future.get().toString());
- }
- // Everything should come back.
- executor.shutdown();
- }
-
- @Test
- public void testFlowControlBehaviorException() throws Exception {
- try (StreamWriter writer =
- getTestStreamWriterBuilder()
- .setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setElementCountThreshold(1L)
- .setFlowControlSettings(
- StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS
- .toBuilder()
- .setMaxOutstandingElementCount(1L)
- .setLimitExceededBehavior(
- FlowController.LimitExceededBehavior.ThrowException)
- .build())
- .build())
- .build()) {
- assertEquals(
- 1L,
- writer
- .getBatchingSettings()
- .getFlowControlSettings()
- .getMaxOutstandingElementCount()
- .longValue());
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build())
- .build());
- testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10));
- ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"});
- ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"});
- // Wait is necessary for response to be scheduled before timer is advanced.
- testBigQueryWrite.waitForResponseScheduled();
- fakeExecutor.advanceTime(Duration.ofSeconds(10));
- try {
- appendFuture2.get();
- Assert.fail("This should fail");
- } catch (Exception e) {
- assertEquals(
- "java.util.concurrent.ExecutionException: The maximum number of batch elements: 1 have been reached.",
- e.toString());
- }
- assertEquals(1L, appendFuture1.get().getAppendResult().getOffset().getValue());
- }
- }
-
- @Test
- public void testStreamReconnectionPermanant() throws Exception {
- StreamWriter writer =
- getTestStreamWriterBuilder()
- .setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setDelayThreshold(Duration.ofSeconds(100000))
- .setElementCountThreshold(1L)
- .build())
- .build();
- StatusRuntimeException permanentError = new StatusRuntimeException(Status.INVALID_ARGUMENT);
- testBigQueryWrite.addException(permanentError);
- ApiFuture future2 = sendTestMessage(writer, new String[] {"m2"});
- try {
- future2.get();
- Assert.fail("This should fail.");
- } catch (ExecutionException e) {
- assertEquals(permanentError.toString(), e.getCause().getCause().toString());
- }
- writer.close();
- }
-
- @Test
- public void testOffset() throws Exception {
- try (StreamWriter writer =
- getTestStreamWriterBuilder()
- .setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setElementCountThreshold(2L)
- .setDelayThreshold(Duration.ofSeconds(1000))
- .build())
- .build()) {
-
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(10)).build())
- .build());
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(13)).build())
- .build());
-
- AppendRowsRequest request1 = createAppendRequest(new String[] {"A"}, 10L);
- ApiFuture appendFuture1 = writer.append(request1);
- AppendRowsRequest request2 = createAppendRequest(new String[] {"B", "C"}, 11L);
- ApiFuture appendFuture2 = writer.append(request2);
- AppendRowsRequest request3 = createAppendRequest(new String[] {"E", "F"}, 13L);
- ApiFuture appendFuture3 = writer.append(request3);
- AppendRowsRequest request4 = createAppendRequest(new String[] {"G"}, 15L);
- ApiFuture appendFuture4 = writer.append(request4);
- assertEquals(10L, appendFuture1.get().getAppendResult().getOffset().getValue());
- assertEquals(11L, appendFuture2.get().getAppendResult().getOffset().getValue());
- assertEquals(13L, appendFuture3.get().getAppendResult().getOffset().getValue());
- assertEquals(15L, appendFuture4.get().getAppendResult().getOffset().getValue());
- }
- }
-
- @Test
- public void testOffsetMismatch() throws Exception {
- try (StreamWriter writer =
- getTestStreamWriterBuilder()
- .setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setElementCountThreshold(1L)
- .build())
- .build()) {
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(11)).build())
- .build());
- AppendRowsRequest request1 = createAppendRequest(new String[] {"A"}, 10L);
- ApiFuture appendFuture1 = writer.append(request1);
-
- appendFuture1.get();
- fail("Should throw exception");
- } catch (Exception e) {
- assertEquals(
- "java.lang.IllegalStateException: The append result offset 11 does not match the expected offset 10.",
- e.getCause().toString());
- }
- }
-
- @Test
- public void testStreamAppendDirectException() throws Exception {
- StreamWriter writer =
- getTestStreamWriterBuilder()
- .setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setElementCountThreshold(1L)
- .setDelayThreshold(Duration.ofSeconds(5))
- .build())
- .build();
- testBigQueryWrite.addException(Status.DATA_LOSS.asException());
- ApiFuture future1 = sendTestMessage(writer, new String[] {"A"});
- try {
- future1.get();
- fail("Expected furture1 to fail");
- } catch (ExecutionException ex) {
- assertEquals(DataLossException.class, ex.getCause().getClass());
- }
- try {
- sendTestMessage(writer, new String[] {"B"});
- fail("Expected furture2 to fail");
- } catch (IllegalStateException ex) {
- assertEquals("Stream already failed.", ex.getMessage());
- }
- writer.shutdown();
- try {
- sendTestMessage(writer, new String[] {"C"});
- fail("Expected furture3 to fail");
- } catch (IllegalStateException ex) {
- assertEquals("Cannot append on a shut-down writer.", ex.getMessage());
- }
- }
-
- @Test
- public void testErrorPropagation() throws Exception {
- StreamWriter writer =
- getTestStreamWriterBuilder()
- .setExecutorProvider(SINGLE_THREAD_EXECUTOR)
- .setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setElementCountThreshold(1L)
- .setDelayThreshold(Duration.ofSeconds(5))
- .build())
- .build();
- testBigQueryWrite.addException(Status.DATA_LOSS.asException());
- testBigQueryWrite.addException(Status.DATA_LOSS.asException());
- ApiFuture future1 = sendTestMessage(writer, new String[] {"A"});
- ApiFuture future2 = sendTestMessage(writer, new String[] {"B"});
- try {
- future1.get();
- fail("should throw exception");
- } catch (ExecutionException e) {
- assertThat(e.getCause()).isInstanceOf(DataLossException.class);
- }
- try {
- future2.get();
- fail("should throw exception");
- } catch (ExecutionException e) {
- assertThat(e.getCause()).isInstanceOf(AbortedException.class);
- }
- }
-
- @Test
- public void testWriterGetters() throws Exception {
- StreamWriter.Builder builder = StreamWriter.newBuilder(TEST_STREAM);
- builder.setChannelProvider(channelProvider);
- builder.setExecutorProvider(SINGLE_THREAD_EXECUTOR);
- builder.setBatchingSettings(
- BatchingSettings.newBuilder()
- .setRequestByteThreshold(10L)
- .setDelayThreshold(Duration.ofMillis(11))
- .setElementCountThreshold(12L)
- .setFlowControlSettings(
- FlowControlSettings.newBuilder()
- .setMaxOutstandingElementCount(100L)
- .setMaxOutstandingRequestBytes(1000L)
- .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
- .build())
- .build());
- builder.setCredentialsProvider(NoCredentialsProvider.create());
- StreamWriter writer = builder.build();
-
- assertEquals(TEST_STREAM, writer.getStreamNameString());
- assertEquals(10, (long) writer.getBatchingSettings().getRequestByteThreshold());
- assertEquals(Duration.ofMillis(11), writer.getBatchingSettings().getDelayThreshold());
- assertEquals(12, (long) writer.getBatchingSettings().getElementCountThreshold());
- assertEquals(
- FlowController.LimitExceededBehavior.Block,
- writer.getBatchingSettings().getFlowControlSettings().getLimitExceededBehavior());
- assertEquals(
- 100L,
- writer
- .getBatchingSettings()
- .getFlowControlSettings()
- .getMaxOutstandingElementCount()
- .longValue());
- assertEquals(
- 1000L,
- writer
- .getBatchingSettings()
- .getFlowControlSettings()
- .getMaxOutstandingRequestBytes()
- .longValue());
- writer.close();
- }
-
- @Test
- public void testBuilderParametersAndDefaults() {
- StreamWriter.Builder builder = StreamWriter.newBuilder(TEST_STREAM);
- assertEquals(StreamWriter.Builder.DEFAULT_EXECUTOR_PROVIDER, builder.executorProvider);
- assertEquals(100 * 1024L, builder.batchingSettings.getRequestByteThreshold().longValue());
- assertEquals(Duration.ofMillis(10), builder.batchingSettings.getDelayThreshold());
- assertEquals(100L, builder.batchingSettings.getElementCountThreshold().longValue());
- assertEquals(StreamWriter.Builder.DEFAULT_RETRY_SETTINGS, builder.retrySettings);
- assertEquals(Duration.ofMillis(100), builder.retrySettings.getInitialRetryDelay());
- assertEquals(3, builder.retrySettings.getMaxAttempts());
- }
-
- @Test
- public void testBuilderInvalidArguments() {
- StreamWriter.Builder builder = StreamWriter.newBuilder(TEST_STREAM);
-
- try {
- builder.setChannelProvider(null);
- fail("Should have thrown an NullPointerException");
- } catch (NullPointerException expected) {
- // Expected
- }
-
- try {
- builder.setExecutorProvider(null);
- fail("Should have thrown an NullPointerException");
- } catch (NullPointerException expected) {
- // Expected
- }
- try {
- builder.setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setRequestByteThreshold(null)
- .build());
- fail("Should have thrown an NullPointerException");
- } catch (NullPointerException expected) {
- // Expected
- }
- try {
- builder.setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setRequestByteThreshold(0L)
- .build());
- fail("Should have thrown an IllegalArgumentException");
- } catch (IllegalArgumentException expected) {
- // Expected
- }
- try {
- builder.setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setRequestByteThreshold(-1L)
- .build());
- fail("Should have thrown an IllegalArgumentException");
- } catch (IllegalArgumentException expected) {
- // Expected
- }
-
- builder.setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setDelayThreshold(Duration.ofMillis(1))
- .build());
- try {
- builder.setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setDelayThreshold(Duration.ofMillis(-1))
- .build());
- fail("Should have thrown an IllegalArgumentException");
- } catch (IllegalArgumentException expected) {
- // Expected
- }
-
- builder.setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setElementCountThreshold(1L)
- .build());
- try {
- builder.setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setElementCountThreshold(null)
- .build());
- fail("Should have thrown an NullPointerException");
- } catch (NullPointerException expected) {
- // Expected
- }
- try {
- builder.setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setElementCountThreshold(0L)
- .build());
- fail("Should have thrown an IllegalArgumentException");
- } catch (IllegalArgumentException expected) {
- // Expected
- }
- try {
- builder.setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setElementCountThreshold(-1L)
- .build());
- fail("Should have thrown an IllegalArgumentException");
- } catch (IllegalArgumentException expected) {
- // Expected
- }
-
- try {
- FlowControlSettings flowControlSettings =
- FlowControlSettings.newBuilder().setMaxOutstandingElementCount(-1L).build();
- builder.setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setFlowControlSettings(flowControlSettings)
- .build());
- fail("Should have thrown an IllegalArgumentException");
- } catch (IllegalArgumentException expected) {
- // Expected
- }
-
- try {
- FlowControlSettings flowControlSettings =
- FlowControlSettings.newBuilder().setMaxOutstandingRequestBytes(-1L).build();
- builder.setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setFlowControlSettings(flowControlSettings)
- .build());
- fail("Should have thrown an IllegalArgumentException");
- } catch (IllegalArgumentException expected) {
- // Expected
- }
- {
- FlowControlSettings flowControlSettings =
- FlowControlSettings.newBuilder()
- .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Ignore)
- .build();
- builder.setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setFlowControlSettings(flowControlSettings)
- .build());
- }
- try {
- FlowControlSettings flowControlSettings =
- FlowControlSettings.newBuilder().setLimitExceededBehavior(null).build();
- builder.setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setFlowControlSettings(flowControlSettings)
- .build());
- fail("Should have thrown an NullPointerException");
- } catch (NullPointerException expected) {
- // Expected
- }
- }
-
- @Test
- public void testExistingClient() throws Exception {
- BigQueryWriteSettings settings =
- BigQueryWriteSettings.newBuilder()
- .setTransportChannelProvider(channelProvider)
- .setCredentialsProvider(NoCredentialsProvider.create())
- .build();
- BigQueryWriteClient client = BigQueryWriteClient.create(settings);
- StreamWriter writer = StreamWriter.newBuilder(TEST_STREAM, client).build();
- writer.close();
- assertFalse(client.isShutdown());
- client.shutdown();
- client.awaitTermination(1, TimeUnit.MINUTES);
- }
-
- @Test
- public void testDatasetTraceId() throws Exception {
- StreamWriter writer =
- getTestStreamWriterBuilder()
- .setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setElementCountThreshold(1L)
- .build())
- .setDataflowTraceId()
- .build();
- testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
- testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
-
- ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"});
- ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"});
- appendFuture1.get();
- appendFuture2.get();
- assertEquals("Dataflow", testBigQueryWrite.getAppendRequests().get(0).getTraceId());
- assertEquals("", testBigQueryWrite.getAppendRequests().get(1).getTraceId());
- }
-
- @Test
- public void testShutdownWithConnectionError() throws Exception {
- StreamWriter writer =
- getTestStreamWriterBuilder()
- .setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setElementCountThreshold(1L)
- .build())
- .build();
- // Three request will reach the server.
- testBigQueryWrite.addResponse(
- AppendRowsResponse.newBuilder()
- .setAppendResult(
- AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build())
- .build());
- testBigQueryWrite.addException(Status.DATA_LOSS.asException());
- testBigQueryWrite.addException(Status.DATA_LOSS.asException());
- testBigQueryWrite.addException(Status.DATA_LOSS.asException());
- testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10));
-
- ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}, 1);
- ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}, 2);
- ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"}, 3);
- testBigQueryWrite.waitForResponseScheduled();
- testBigQueryWrite.waitForResponseScheduled();
- testBigQueryWrite.waitForResponseScheduled();
- fakeExecutor.advanceTime(Duration.ofSeconds(10));
- // This will will never be in inflight and aborted by previous failure, because its delay is set
- // after timer advance.
- Thread.sleep(500);
- try {
- ApiFuture appendFuture4 = sendTestMessage(writer, new String[] {"D"}, 4);
- } catch (IllegalStateException ex) {
- assertEquals("Stream already failed.", ex.getMessage());
- }
- // Shutdown writer immediately and there will be some error happened when flushing the queue.
- writer.shutdown();
- assertEquals(1, appendFuture1.get().getAppendResult().getOffset().getValue());
- try {
- appendFuture2.get();
- fail("Should fail with exception future2");
- } catch (ExecutionException e) {
- assertThat(e.getCause()).isInstanceOf(DataLossException.class);
- }
- try {
- appendFuture3.get();
- fail("Should fail with exception future3");
- } catch (ExecutionException e) {
- assertThat(e.getCause()).isInstanceOf(AbortedException.class);
- }
- }
-}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java
index b93eeeaf34..3c1eeef8fd 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java
@@ -29,7 +29,6 @@
import com.google.cloud.bigquery.storage.v1beta2.*;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import com.google.protobuf.Descriptors;
-import com.google.protobuf.Int64Value;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.*;
@@ -145,28 +144,16 @@ public static void afterClass() {
}
}
- private AppendRowsRequest.Builder createAppendRequest(String streamName, String[] messages) {
- AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
-
- AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder();
- dataBuilder.setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor()));
-
+ ProtoRows CreateProtoRows(String[] messages) {
ProtoRows.Builder rows = ProtoRows.newBuilder();
for (String message : messages) {
FooType foo = FooType.newBuilder().setFoo(message).build();
rows.addSerializedRows(foo.toByteString());
}
- dataBuilder.setRows(rows.build());
- return requestBuilder.setProtoRows(dataBuilder.build()).setWriteStream(streamName);
+ return rows.build();
}
- private AppendRowsRequest.Builder createAppendRequestComplicateType(
- String streamName, String[] messages) {
- AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
-
- AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder();
- dataBuilder.setWriterSchema(ProtoSchemaConverter.convert(ComplicateType.getDescriptor()));
-
+ ProtoRows CreateProtoRowsComplex(String[] messages) {
ProtoRows.Builder rows = ProtoRows.newBuilder();
for (String message : messages) {
ComplicateType foo =
@@ -175,16 +162,6 @@ private AppendRowsRequest.Builder createAppendRequestComplicateType(
.build();
rows.addSerializedRows(foo.toByteString());
}
- dataBuilder.setRows(rows.build());
- return requestBuilder.setProtoRows(dataBuilder.build()).setWriteStream(streamName);
- }
-
- ProtoRows CreateProtoRows(String[] messages) {
- ProtoRows.Builder rows = ProtoRows.newBuilder();
- for (String message : messages) {
- FooType foo = FooType.newBuilder().setFoo(message).build();
- rows.addSerializedRows(foo.toByteString());
- }
return rows.build();
}
@@ -431,20 +408,17 @@ public void testComplicateSchemaWithPendingStream()
.setWriteStream(WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build())
.build());
FinalizeWriteStreamResponse finalizeResponse = FinalizeWriteStreamResponse.getDefaultInstance();
- try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) {
+ try (StreamWriterV2 streamWriter =
+ StreamWriterV2.newBuilder(writeStream.getName())
+ .setWriterSchema(ProtoSchemaConverter.convert(ComplicateType.getDescriptor()))
+ .build()) {
LOG.info("Sending two messages");
ApiFuture response =
- streamWriter.append(
- createAppendRequestComplicateType(writeStream.getName(), new String[] {"aaa"})
- .setOffset(Int64Value.of(0L))
- .build());
+ streamWriter.append(CreateProtoRowsComplex(new String[] {"aaa"}), 0L);
assertEquals(0, response.get().getAppendResult().getOffset().getValue());
ApiFuture response2 =
- streamWriter.append(
- createAppendRequestComplicateType(writeStream.getName(), new String[] {"bbb"})
- .setOffset(Int64Value.of(1L))
- .build());
+ streamWriter.append(CreateProtoRowsComplex(new String[] {"bbb"}), 1L);
assertEquals(1, response2.get().getAppendResult().getOffset().getValue());
// Nothing showed up since rows are not committed.
@@ -460,10 +434,7 @@ public void testComplicateSchemaWithPendingStream()
FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build());
ApiFuture response3 =
- streamWriter.append(
- createAppendRequestComplicateType(writeStream.getName(), new String[] {"ccc"})
- .setOffset(Int64Value.of(2L))
- .build());
+ streamWriter.append(CreateProtoRows(new String[] {"ccc"}), 2L);
try {
response3.get();
Assert.fail("Append to finalized stream should fail.");
@@ -503,23 +474,16 @@ public void testStreamError() throws IOException, InterruptedException, Executio
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
- try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) {
- AppendRowsRequest request =
- createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build();
- request
- .toBuilder()
- .setProtoRows(request.getProtoRows().toBuilder().clearWriterSchema().build())
- .build();
+ try (StreamWriterV2 streamWriter =
+ StreamWriterV2.newBuilder(writeStream.getName())
+ .setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor()))
+ .build()) {
ApiFuture response =
- streamWriter.append(
- createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build());
+ streamWriter.append(CreateProtoRows(new String[] {"aaa"}), -1L);
assertEquals(0L, response.get().getAppendResult().getOffset().getValue());
// Send in a bogus stream name should cause in connection error.
ApiFuture response2 =
- streamWriter.append(
- createAppendRequest(writeStream.getName(), new String[] {"aaa"})
- .setOffset(Int64Value.of(100L))
- .build());
+ streamWriter.append(CreateProtoRows(new String[] {"aaa"}), 100L);
try {
response2.get();
Assert.fail("Should fail");
@@ -529,8 +493,7 @@ public void testStreamError() throws IOException, InterruptedException, Executio
}
// We can keep sending requests on the same stream.
ApiFuture response3 =
- streamWriter.append(
- createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build());
+ streamWriter.append(CreateProtoRows(new String[] {"aaa"}), -1L);
assertEquals(1L, response3.get().getAppendResult().getOffset().getValue());
} finally {
}
@@ -545,23 +508,23 @@ public void testStreamReconnect() throws IOException, InterruptedException, Exec
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
- try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) {
+ try (StreamWriterV2 streamWriter =
+ StreamWriterV2.newBuilder(writeStream.getName())
+ .setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor()))
+ .build()) {
ApiFuture response =
- streamWriter.append(
- createAppendRequest(writeStream.getName(), new String[] {"aaa"})
- .setOffset(Int64Value.of(0L))
- .build());
+ streamWriter.append(CreateProtoRows(new String[] {"aaa"}), 0L);
assertEquals(0L, response.get().getAppendResult().getOffset().getValue());
}
- try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) {
+ try (StreamWriterV2 streamWriter =
+ StreamWriterV2.newBuilder(writeStream.getName())
+ .setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor()))
+ .build()) {
// Currently there is a bug that reconnection must wait 5 seconds to get the real row count.
Thread.sleep(5000L);
ApiFuture response =
- streamWriter.append(
- createAppendRequest(writeStream.getName(), new String[] {"bbb"})
- .setOffset(Int64Value.of(1L))
- .build());
+ streamWriter.append(CreateProtoRows(new String[] {"bbb"}), 1L);
assertEquals(1L, response.get().getAppendResult().getOffset().getValue());
}
}