From 087079728195e20f93701e8d5e1e59ba29a7d21b Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Tue, 2 Mar 2021 09:43:10 -0800 Subject: [PATCH] fix: Add unit test for concurrent issues we worried about, and fix some locking issues (#854) * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . * . --- .../storage/v1beta2/StreamWriter.java | 182 ++++-- .../bigquery/storage/v1beta2/Waiter.java | 2 + .../storage/v1beta2/FakeBigQueryWrite.java | 4 + .../v1beta2/FakeBigQueryWriteImpl.java | 14 +- .../storage/v1beta2/StreamWriterTest.java | 608 +++++++++++++++++- 5 files changed, 723 insertions(+), 87 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java index b2da57e75b..652d87c6d0 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java @@ -50,12 +50,14 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; +import javax.annotation.concurrent.GuardedBy; import org.threeten.bp.Duration; /** @@ -100,25 +102,34 @@ public class StreamWriter implements AutoCloseable { private final Lock messagesBatchLock; private final Lock appendAndRefreshAppendLock; + + @GuardedBy("appendAndRefreshAppendLock") private final MessagesBatch messagesBatch; // Indicates if a stream has some non recoverable exception happened. - private final Lock exceptionLock; - private Throwable streamException; + private AtomicReference streamException; private BackgroundResource backgroundResources; private List backgroundResourceList; private BigQueryWriteClient stub; BidiStreamingCallable bidiStreamingCallable; + + @GuardedBy("appendAndRefreshAppendLock") ClientStream clientStream; + private final AppendResponseObserver responseObserver; private final ScheduledExecutorService executor; - private final AtomicBoolean shutdown; + @GuardedBy("appendAndRefreshAppendLock") + private boolean shutdown; + private final Waiter messagesWaiter; - private final AtomicBoolean activeAlarm; + + @GuardedBy("appendAndRefreshAppendLock") + private boolean activeAlarm; + private ScheduledFuture currentAlarmFuture; private Integer currentRetries = 0; @@ -160,9 +171,8 @@ private StreamWriter(Builder builder) this.messagesBatch = new MessagesBatch(batchingSettings, this.streamName, this); messagesBatchLock = new ReentrantLock(); appendAndRefreshAppendLock = new ReentrantLock(); - activeAlarm = new AtomicBoolean(false); - this.exceptionLock = new ReentrantLock(); - this.streamException = null; + activeAlarm = false; + this.streamException = new AtomicReference(null); executor = builder.executorProvider.getExecutor(); backgroundResourceList = new ArrayList<>(); @@ -185,7 +195,7 @@ private StreamWriter(Builder builder) stub = builder.client; } backgroundResources = new BackgroundResourceAggregation(backgroundResourceList); - shutdown = new AtomicBoolean(false); + shutdown = false; if (builder.onSchemaUpdateRunnable != null) { this.onSchemaUpdateRunnable = builder.onSchemaUpdateRunnable; this.onSchemaUpdateRunnable.setStreamWriter(this); @@ -216,14 +226,6 @@ OnSchemaUpdateRunnable getOnSchemaUpdateRunnable() { return this.onSchemaUpdateRunnable; } - private void setException(Throwable t) { - exceptionLock.lock(); - if (this.streamException == null) { - this.streamException = t; - } - exceptionLock.unlock(); - } - /** * Schedules the writing of a message. The write of the message may occur immediately or be * delayed based on the writer batching options. @@ -253,27 +255,27 @@ private void setException(Throwable t) { */ public ApiFuture append(AppendRowsRequest message) { appendAndRefreshAppendLock.lock(); - Preconditions.checkState(!shutdown.get(), "Cannot append on a shut-down writer."); - Preconditions.checkNotNull(message, "Message is null."); - final AppendRequestAndFutureResponse outstandingAppend = - new AppendRequestAndFutureResponse(message); - List batchesToSend; - messagesBatchLock.lock(); + try { + Preconditions.checkState(!shutdown, "Cannot append on a shut-down writer."); + Preconditions.checkNotNull(message, "Message is null."); + Preconditions.checkState(streamException.get() == null, "Stream already failed."); + final AppendRequestAndFutureResponse outstandingAppend = + new AppendRequestAndFutureResponse(message); + List batchesToSend; batchesToSend = messagesBatch.add(outstandingAppend); // Setup the next duration based delivery alarm if there are messages batched. setupAlarm(); if (!batchesToSend.isEmpty()) { for (final InflightBatch batch : batchesToSend) { - LOG.fine("Scheduling a batch for immediate sending."); + LOG.fine("Scheduling a batch for immediate sending"); writeBatch(batch); } } + return outstandingAppend.appendResult; } finally { - messagesBatchLock.unlock(); appendAndRefreshAppendLock.unlock(); } - return outstandingAppend.appendResult; } /** @@ -285,9 +287,10 @@ public void refreshAppend() throws InterruptedException { throw new UnimplementedException(null, GrpcStatusCode.of(Status.Code.UNIMPLEMENTED), false); } + @GuardedBy("appendAndRefreshAppendLock") private void setupAlarm() { if (!messagesBatch.isEmpty()) { - if (!activeAlarm.getAndSet(true)) { + if (!activeAlarm) { long delayThresholdMs = getBatchingSettings().getDelayThreshold().toMillis(); LOG.log(Level.FINE, "Setting up alarm for the next {0} ms.", delayThresholdMs); currentAlarmFuture = @@ -296,12 +299,12 @@ private void setupAlarm() { @Override public void run() { LOG.fine("Sending messages based on schedule"); - activeAlarm.getAndSet(false); - messagesBatchLock.lock(); + appendAndRefreshAppendLock.lock(); + activeAlarm = false; try { writeBatch(messagesBatch.popBatch()); } finally { - messagesBatchLock.unlock(); + appendAndRefreshAppendLock.unlock(); } } }, @@ -310,9 +313,8 @@ public void run() { } } else if (currentAlarmFuture != null) { LOG.log(Level.FINER, "Cancelling alarm, no more messages"); - if (activeAlarm.getAndSet(false)) { - currentAlarmFuture.cancel(false); - } + currentAlarmFuture.cancel(false); + activeAlarm = false; } } @@ -321,27 +323,41 @@ public void run() { * wait for the send operations to complete. To wait for messages to send, call {@code get} on the * futures returned from {@code append}. */ + @GuardedBy("appendAndRefreshAppendLock") public void writeAllOutstanding() { InflightBatch unorderedOutstandingBatch = null; - messagesBatchLock.lock(); - try { - if (!messagesBatch.isEmpty()) { - writeBatch(messagesBatch.popBatch()); - } - messagesBatch.reset(); - } finally { - messagesBatchLock.unlock(); + if (!messagesBatch.isEmpty()) { + writeBatch(messagesBatch.popBatch()); } + messagesBatch.reset(); } + @GuardedBy("appendAndRefreshAppendLock") private void writeBatch(final InflightBatch inflightBatch) { if (inflightBatch != null) { AppendRowsRequest request = inflightBatch.getMergedRequest(); try { + appendAndRefreshAppendLock.unlock(); messagesWaiter.acquire(inflightBatch.getByteSize()); + appendAndRefreshAppendLock.lock(); + if (shutdown || streamException.get() != null) { + appendAndRefreshAppendLock.unlock(); + messagesWaiter.release(inflightBatch.getByteSize()); + appendAndRefreshAppendLock.lock(); + inflightBatch.onFailure( + new AbortedException( + shutdown + ? "Stream closed, abort append." + : "Stream has previous errors, abort append.", + null, + GrpcStatusCode.of(Status.Code.ABORTED), + true)); + return; + } responseObserver.addInflightBatch(inflightBatch); clientStream.send(request); } catch (FlowController.FlowControlException ex) { + appendAndRefreshAppendLock.lock(); inflightBatch.onFailure(ex); } } @@ -447,9 +463,6 @@ private void onFailure(Throwable t) { // Error has been set already. LOG.warning("Ignore " + t.toString() + " since error has already been set"); return; - } else { - LOG.info("Setting " + t.toString() + " on response"); - this.streamWriter.setException(t); } for (AppendRequestAndFutureResponse request : inflightRequests) { @@ -511,26 +524,68 @@ public RetrySettings getRetrySettings() { * pending messages are lost. */ protected void shutdown() { - if (shutdown.getAndSet(true)) { - LOG.fine("Already shutdown."); - return; - } - LOG.fine("Shutdown called on writer"); - if (currentAlarmFuture != null && activeAlarm.getAndSet(false)) { - currentAlarmFuture.cancel(false); - } - writeAllOutstanding(); + appendAndRefreshAppendLock.lock(); try { - synchronized (messagesWaiter) { + if (shutdown) { + LOG.fine("Already shutdown."); + return; + } + shutdown = true; + LOG.info("Shutdown called on writer: " + streamName); + if (currentAlarmFuture != null && activeAlarm) { + currentAlarmFuture.cancel(false); + activeAlarm = false; + } + // Wait for current inflight to drain. + try { + appendAndRefreshAppendLock.unlock(); messagesWaiter.waitComplete(0); + } catch (InterruptedException e) { + LOG.warning("Failed to wait for messages to return " + e.toString()); } - } catch (InterruptedException e) { - LOG.warning("Failed to wait for messages to return " + e.toString()); - } - if (clientStream.isSendReady()) { - clientStream.closeSend(); + appendAndRefreshAppendLock.lock(); + // Try to send out what's left in batch. + if (!messagesBatch.isEmpty()) { + InflightBatch inflightBatch = messagesBatch.popBatch(); + AppendRowsRequest request = inflightBatch.getMergedRequest(); + if (streamException.get() != null) { + inflightBatch.onFailure( + new AbortedException( + shutdown + ? "Stream closed, abort append." + : "Stream has previous errors, abort append.", + null, + GrpcStatusCode.of(Status.Code.ABORTED), + true)); + } else { + try { + appendAndRefreshAppendLock.unlock(); + messagesWaiter.acquire(inflightBatch.getByteSize()); + appendAndRefreshAppendLock.lock(); + responseObserver.addInflightBatch(inflightBatch); + clientStream.send(request); + } catch (FlowController.FlowControlException ex) { + appendAndRefreshAppendLock.lock(); + LOG.warning( + "Unexpected flow control exception when sending batch leftover: " + ex.toString()); + } + } + } + // Close the stream. + try { + appendAndRefreshAppendLock.unlock(); + messagesWaiter.waitComplete(0); + } catch (InterruptedException e) { + LOG.warning("Failed to wait for messages to return " + e.toString()); + } + appendAndRefreshAppendLock.lock(); + if (clientStream.isSendReady()) { + clientStream.closeSend(); + } + backgroundResources.shutdown(); + } finally { + appendAndRefreshAppendLock.unlock(); } - backgroundResources.shutdown(); } /** @@ -815,11 +870,12 @@ public void onStart(StreamController controller) { } private void abortInflightRequests(Throwable t) { + LOG.fine("Aborting all inflight requests"); synchronized (this.inflightBatches) { boolean first_error = true; while (!this.inflightBatches.isEmpty()) { InflightBatch inflightBatch = this.inflightBatches.poll(); - if (first_error) { + if (first_error || t.getCause().getClass() == AbortedException.class) { inflightBatch.onFailure(t); first_error = false; } else { @@ -894,7 +950,8 @@ public void onComplete() { @Override public void onError(Throwable t) { - LOG.fine("OnError called"); + LOG.info("OnError called: " + t.toString()); + streamWriter.streamException.set(t); abortInflightRequests(t); } }; @@ -917,6 +974,7 @@ private MessagesBatch( } // Get all the messages out in a batch. + @GuardedBy("appendAndRefreshAppendLock") private InflightBatch popBatch() { InflightBatch batch = new InflightBatch( @@ -958,6 +1016,7 @@ private long getMaxBatchBytes() { // The message batch returned could contain the previous batch of messages plus the current // message. // if the message is too large. + @GuardedBy("appendAndRefreshAppendLock") private List add(AppendRequestAndFutureResponse outstandingAppend) { List batchesToSend = new ArrayList<>(); // Check if the next message makes the current batch exceed the max batch byte size. @@ -978,7 +1037,6 @@ && getBatchedBytes() + outstandingAppend.messageSize >= getMaxBatchBytes()) { || getMessagesCount() == batchingSettings.getElementCountThreshold()) { batchesToSend.add(popBatch()); } - return batchesToSend; } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/Waiter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/Waiter.java index 4422f53b32..9c37a71fc0 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/Waiter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/Waiter.java @@ -66,6 +66,7 @@ private void notifyNextAcquires() { public synchronized void release(long messageSize) throws IllegalStateException { lock.lock(); + LOG.fine("release: " + pendingCount + " to " + (pendingCount - 1)); --pendingCount; if (pendingCount < 0) { throw new IllegalStateException("pendingCount cannot be less than 0"); @@ -82,6 +83,7 @@ public synchronized void release(long messageSize) throws IllegalStateException public void acquire(long messageSize) throws FlowController.FlowControlException { lock.lock(); try { + LOG.fine("acquire " + pendingCount + " to " + (pendingCount + 1)); if (pendingCount >= countLimit && behavior == FlowController.LimitExceededBehavior.ThrowException) { throw new FlowController.MaxOutstandingElementCountReachedException(countLimit); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWrite.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWrite.java index a333260529..69279f01e7 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWrite.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWrite.java @@ -39,6 +39,10 @@ public List getRequests() { return new LinkedList(serviceImpl.getCapturedRequests()); } + public void waitForResponseScheduled() throws InterruptedException { + serviceImpl.waitForResponseScheduled(); + } + public List getAppendRequests() { return serviceImpl.getCapturedRequests(); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWriteImpl.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWriteImpl.java index b99dab99bd..e706f8198a 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWriteImpl.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWriteImpl.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; @@ -45,7 +46,9 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase { private boolean autoPublishResponse; private ScheduledExecutorService executor = null; private Duration responseDelay = Duration.ZERO; + private Duration responseSleep = Duration.ZERO; + private Semaphore responseSemaphore = new Semaphore(0, true); /** Class used to save the state of a possible response. */ private static class Response { @@ -113,6 +116,10 @@ public void flushRows( } } + public void waitForResponseScheduled() throws InterruptedException { + responseSemaphore.acquire(); + } + @Override public StreamObserver appendRows( final StreamObserver responseObserver) { @@ -120,7 +127,7 @@ public StreamObserver appendRows( new StreamObserver() { @Override public void onNext(AppendRowsRequest value) { - LOG.info("Get request:" + value.toString()); + LOG.fine("Get request:" + value.toString()); final Response response = responses.remove(); requests.add(value); if (responseSleep.compareTo(Duration.ZERO) > 0) { @@ -133,7 +140,7 @@ public void onNext(AppendRowsRequest value) { } else { final Response responseToSend = response; // TODO(yirutang): This is very wrong because it messes up response/complete ordering. - LOG.info("Schedule a response to be sent at delay"); + LOG.fine("Schedule a response to be sent at delay"); executor.schedule( new Runnable() { @Override @@ -144,6 +151,7 @@ public void run() { responseDelay.toMillis(), TimeUnit.MILLISECONDS); } + responseSemaphore.release(); } @Override @@ -161,7 +169,7 @@ public void onCompleted() { private void sendResponse( Response response, StreamObserver responseObserver) { - LOG.info("Sending response: " + response.toString()); + LOG.fine("Sending response: " + response.toString()); if (response.isError()) { responseObserver.onError(response.getError()); } else { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java index 664f21c35e..73bf22f122 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java @@ -34,6 +34,7 @@ import com.google.api.gax.grpc.testing.MockServiceHelper; import com.google.api.gax.rpc.AbortedException; import com.google.api.gax.rpc.DataLossException; +import com.google.api.gax.rpc.UnknownException; import com.google.cloud.bigquery.storage.test.Test.FooType; import com.google.common.base.Strings; import com.google.protobuf.DescriptorProtos; @@ -42,8 +43,13 @@ import io.grpc.Status; import io.grpc.StatusRuntimeException; import java.util.Arrays; +import java.util.LinkedList; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import org.junit.After; @@ -133,6 +139,11 @@ private AppendRowsRequest createAppendRequest(String[] messages, long offset) { .build(); } + private ApiFuture sendTestMessage( + StreamWriter writer, String[] messages, int offset) { + return writer.append(createAppendRequest(messages, offset)); + } + private ApiFuture sendTestMessage(StreamWriter writer, String[] messages) { return writer.append(createAppendRequest(messages, -1)); } @@ -312,7 +323,7 @@ public void testAppendByNumBytes() throws Exception { } @Test - public void testWriteByShutdown() throws Exception { + public void testShutdownFlushBatch() throws Exception { StreamWriter writer = getTestStreamWriterBuilder() .setBatchingSettings( @@ -356,7 +367,7 @@ public void testWriteMixedSizeAndDuration() throws Exception { StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() .setElementCountThreshold(2L) - .setDelayThreshold(Duration.ofSeconds(5)) + .setDelayThreshold(Duration.ofSeconds(1)) .build()) .build()) { testBigQueryWrite.addResponse( @@ -405,6 +416,75 @@ public void testWriteMixedSizeAndDuration() throws Exception { .getSerializedRowsCount()); assertEquals( false, testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema()); + Thread.sleep(1005); + assertTrue(appendFuture3.isDone()); + } + } + + @Test + public void testBatchIsFlushed() throws Exception { + try (StreamWriter writer = + getTestStreamWriterBuilder() + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(2L) + .setDelayThreshold(Duration.ofSeconds(1)) + .build()) + .build()) { + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) + .build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build()) + .build()); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + assertFalse(appendFuture1.isDone()); + writer.shutdown(); + // Write triggered by shutdown. + assertTrue(appendFuture1.isDone()); + assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); + } + } + + @Test + public void testBatchIsFlushedWithError() throws Exception { + try (StreamWriter writer = + getTestStreamWriterBuilder() + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(2L) + .setDelayThreshold(Duration.ofSeconds(1)) + .build()) + .build()) { + testBigQueryWrite.addException(Status.DATA_LOSS.asException()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build()) + .build()); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"}); + try { + appendFuture2.get(); + } catch (ExecutionException ex) { + assertEquals(DataLossException.class, ex.getCause().getClass()); + } + assertFalse(appendFuture3.isDone()); + writer.shutdown(); + try { + appendFuture3.get(); + } catch (ExecutionException ex) { + assertEquals(AbortedException.class, ex.getCause().getClass()); + } } } @@ -419,7 +499,7 @@ public void testFlowControlBehaviorBlock() throws Exception { .setFlowControlSettings( StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS .toBuilder() - .setMaxOutstandingRequestBytes(40L) + .setMaxOutstandingElementCount(2L) .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) .build()) .build()) @@ -435,32 +515,471 @@ public void testFlowControlBehaviorBlock() throws Exception { .setAppendResult( AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build()) .build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(4)).build()) + .build()); + // Response will have a 10 second delay before server sends them back. testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10)); - ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}, 2); final StreamWriter writer1 = writer; - Runnable runnable = - new Runnable() { + ExecutorService executor = Executors.newFixedThreadPool(2); + Callable callable = + new Callable() { @Override - public void run() { - ApiFuture appendFuture2 = - sendTestMessage(writer1, new String[] {"B"}); + public Throwable call() { + try { + ApiFuture appendFuture2 = + sendTestMessage(writer1, new String[] {"B"}, 3); + ApiFuture appendFuture3 = + sendTestMessage(writer1, new String[] {"C"}, 4); + // This request will be send out immediately because there is space in inflight queue. + // The time advance in the main thread will cause it to be sent back. + if (3 != appendFuture2.get().getAppendResult().getOffset().getValue()) { + return new Exception( + "expected 3 but got " + + appendFuture2.get().getAppendResult().getOffset().getValue()); + } + testBigQueryWrite.waitForResponseScheduled(); + // Wait a while so that the close is called before we release the last response on the + // ohter thread. + Thread.sleep(500); + // This triggers the last response to come back. + fakeExecutor.advanceTime(Duration.ofSeconds(10)); + // This request will be waiting for previous response to come back. + if (4 != appendFuture3.get().getAppendResult().getOffset().getValue()) { + return new Exception( + "expected 4 but got " + + appendFuture3.get().getAppendResult().getOffset().getValue()); + } + return null; + } catch (IllegalStateException ex) { + // Sometimes the close will race before these calls. + return null; + } catch (Exception e) { + return e; + } } }; - Thread t = new Thread(runnable); - t.start(); - assertEquals(true, t.isAlive()); + Future future = executor.submit(callable); assertEquals(false, appendFuture1.isDone()); + testBigQueryWrite.waitForResponseScheduled(); + testBigQueryWrite.waitForResponseScheduled(); + // This will trigger the previous two responses to come back. + fakeExecutor.advanceTime(Duration.ofSeconds(10)); + assertEquals(2L, appendFuture1.get().getAppendResult().getOffset().getValue()); + Thread.sleep(500); + // When close is called, there should be one inflight request waiting. + writer.close(); + if (future.get() != null) { + future.get().printStackTrace(); + fail("Call got exception: " + future.get().toString()); + } + // Everything should come back. + executor.shutdown(); + } + + @Test + public void testFlowControlBehaviorBlockWithError() throws Exception { + StreamWriter writer = + getTestStreamWriterBuilder() + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .setFlowControlSettings( + StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS + .toBuilder() + .setMaxOutstandingElementCount(2L) + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) + .build()) + .build()) + .build(); + + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build()) + .build()); + testBigQueryWrite.addException(Status.DATA_LOSS.asException()); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}, 2); + final StreamWriter writer1 = writer; + ExecutorService executor = Executors.newFixedThreadPool(2); + Callable callable = + new Callable() { + @Override + public Throwable call() { + try { + ApiFuture appendFuture2 = + sendTestMessage(writer1, new String[] {"B"}, 3); + ApiFuture appendFuture3 = + sendTestMessage(writer1, new String[] {"C"}, 4); + try { + // This request will be send out immediately because there is space in inflight + // queue. + assertEquals(3L, appendFuture2.get().getAppendResult().getOffset().getValue()); + return new Exception("Should have failure on future2"); + } catch (ExecutionException e) { + if (e.getCause().getClass() != DataLossException.class) { + return e; + } + } + try { + // This request will be waiting for previous response to come back. + assertEquals(4L, appendFuture3.get().getAppendResult().getOffset().getValue()); + fail("Should be aborted future3"); + } catch (ExecutionException e) { + if (e.getCause().getClass() != AbortedException.class) { + return e; + } + } + return null; + } catch (IllegalStateException ex) { + // Sometimes the append will happen after the stream is shutdown. + ex.printStackTrace(); + return null; + } catch (Exception e) { + return e; + } + } + }; + Future future = executor.submit(callable); // Wait is necessary for response to be scheduled before timer is advanced. - Thread.sleep(5000L); + testBigQueryWrite.waitForResponseScheduled(); + testBigQueryWrite.waitForResponseScheduled(); + // This will trigger the previous two responses to come back. fakeExecutor.advanceTime(Duration.ofSeconds(10)); // The first requests gets back while the second one is blocked. assertEquals(2L, appendFuture1.get().getAppendResult().getOffset().getValue()); - Thread.sleep(5000L); + Thread.sleep(500); + // When close is called, there should be one inflight request waiting. + writer.close(); + if (future.get() != null) { + future.get().printStackTrace(); + fail("Call got exception: " + future.get().toString()); + } + // Everything should come back. + executor.shutdown(); + } + + @Test + public void testAppendWhileShutdownSuccess() throws Exception { + StreamWriter writer = + getTestStreamWriterBuilder() + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + // When shutdown, we should have something in batch. + .setElementCountThreshold(3L) + .setFlowControlSettings( + StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS + .toBuilder() + // When shutdown, we should have something in flight. + .setMaxOutstandingElementCount(5L) + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) + .build()) + .build()) + .build(); + + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build()) + .build()); + for (int i = 1; i < 13; i++) { + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder() + .setOffset(Int64Value.of(i * 3 + 2)) + .build()) + .build()); + } + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}, 2); + final StreamWriter writer1 = writer; + ExecutorService executor = Executors.newFixedThreadPool(2); + Callable callable = + new Callable() { + @Override + public Throwable call() { + try { + LinkedList> responses = + new LinkedList>(); + int last_count = 0; + for (int i = 0; i < 20; i++) { + try { + responses.add(sendTestMessage(writer1, new String[] {"B"}, i + 3)); + } catch (IllegalStateException ex) { + LOG.info("Stopped at " + i + " responses:" + responses.size()); + last_count = i; + if ("Cannot append on a shut-down writer." != ex.getMessage()) { + return new Exception("Got unexpected message:" + ex.getMessage()); + } + break; + } catch (AbortedException ex) { + LOG.info("Stopped at " + i + " responses:" + responses.size()); + last_count = i; + if ("Stream closed, abort append." != ex.getMessage()) { + return new Exception("Got unexpected message:" + ex.getMessage()); + } + break; + } + } + // For all the requests that are sent in, we should get a finished callback. + for (int i = 0; i < last_count; i++) { + if (i + 3 != responses.get(i).get().getAppendResult().getOffset().getValue()) { + return new Exception( + "Got unexpected offset expect:" + + i + + " actual:" + + responses.get(i - 3).get().getAppendResult().getOffset().getValue()); + } + } + return null; + } catch (Exception e) { + return e; + } + } + }; + Future future = executor.submit(callable); + assertEquals(false, appendFuture1.isDone()); + // The first requests gets back while the second one is blocked. + assertEquals(2L, appendFuture1.get().getAppendResult().getOffset().getValue()); + // When close is called, there should be one inflight request waiting. + writer.close(); + if (future.get() != null) { + future.get().printStackTrace(); + fail("Call got exception: " + future.get().toString()); + } + // Everything should come back. + executor.shutdown(); + } + + @Test + public void testAppendWhileShutdownFailed() throws Exception { + StreamWriter writer = + getTestStreamWriterBuilder() + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + // When shutdown, we should have something in batch. + .setElementCountThreshold(3L) + .setDelayThreshold(Duration.ofSeconds(10)) + .setFlowControlSettings( + StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS + .toBuilder() + // When shutdown, we should have something in flight. + .setMaxOutstandingElementCount(5L) + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) + .build()) + .build()) + .build(); + + // The responses are for every 3 messages. + for (int i = 0; i < 2; i++) { + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder() + .setOffset(Int64Value.of(i * 3)) + .build()) + .build()); + } + for (int i = 2; i < 6; i++) { + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setError( + com.google.rpc.Status.newBuilder().setCode(3).setMessage("error " + i).build()) + .build()); + } + // Stream failed at 7th request. + for (int i = 6; i < 10; i++) { + testBigQueryWrite.addException(new UnsupportedOperationException("Strange exception")); + } + final StreamWriter writer1 = writer; + ExecutorService executor = Executors.newFixedThreadPool(2); + Callable callable = + new Callable() { + @Override + public Throwable call() { + try { + LinkedList> responses = + new LinkedList>(); + int last_count = 30; + LOG.info( + "Send 30 messages, will be batched into 10 messages, start fail at 7th message"); + for (int i = 0; i < 30; i++) { + try { + responses.add(sendTestMessage(writer1, new String[] {"B"}, i)); + Thread.sleep(500); + } catch (IllegalStateException ex) { + LOG.info("Stopped at sending request no." + i + " ex: " + ex.toString()); + last_count = i; + if ("Stream already failed." != ex.getMessage() + && "Cannot append on a shut-down writer." != ex.getMessage()) { + return new Exception("Got unexpected message:" + ex.getMessage()); + } + break; + } + } + // Verify sent responses. + // For all the requests that are sent in, we should get a finished callback. + for (int i = 0; i < 2 * 3; i++) { + try { + if (i != responses.get(i).get().getAppendResult().getOffset().getValue()) { + return new Exception( + "Got unexpected offset expect:" + + i + + " actual:" + + responses.get(i).get().getAppendResult().getOffset().getValue()); + } + } catch (Exception e) { + return e; + } + } + // For all the requests that are sent in, we should get a finished callback. + for (int i = 2 * 3; i < 6 * 3; i++) { + try { + responses.get(i).get(); + return new Exception( + "Expect response return an error after a in-stream exception"); + } catch (Exception e) { + if (e.getCause().getClass() != StatusRuntimeException.class) { + return new Exception( + "Expect first error of stream exception to be the original exception but got" + + e.getCause().toString()); + } + } + } + LOG.info("Last count is:" + last_count); + for (int i = 6 * 3; i < last_count; i++) { + try { + responses.get(i).get(); + return new Exception("Expect response return an error after a stream exception"); + } catch (Exception e) { + if (e.getCause().getClass() != UnknownException.class + && e.getCause().getClass() != AbortedException.class) { + return new Exception("Unexpected stream exception:" + e.toString()); + } + } + } + return null; + } catch (Exception e) { + return e; + } + } + }; + Future future = executor.submit(callable); + // Wait for at least 7 request (after batch) to reach server. + for (int i = 0; i < 7; i++) { + LOG.info("Wait for " + i + " response scheduled"); + testBigQueryWrite.waitForResponseScheduled(); + } + Thread.sleep(500); + writer.close(); + if (future.get() != null) { + future.get().printStackTrace(); + fail("Callback got exception" + future.get().toString()); + } + // Everything should come back. + executor.shutdown(); + } + + @Test + public void testFlowControlBehaviorBlockAbortOnShutdown() throws Exception { + StreamWriter writer = + getTestStreamWriterBuilder() + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .setFlowControlSettings( + StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS + .toBuilder() + .setMaxOutstandingElementCount(2L) + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) + .build()) + .build()) + .build(); + + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build()) + .build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build()) + .build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(4)).build()) + .build()); + // Response will have a 10 second delay before server sends them back. + testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10)); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}, 2); + final StreamWriter writer1 = writer; + ExecutorService executor = Executors.newFixedThreadPool(2); + Callable callable = + new Callable() { + @Override + public Throwable call() { + try { + ApiFuture appendFuture2 = + sendTestMessage(writer1, new String[] {"B"}, 3); + ApiFuture appendFuture3 = + sendTestMessage(writer1, new String[] {"C"}, 4); + + // This request will be send out immediately because there is space in inflight queue. + if (3L != appendFuture2.get().getAppendResult().getOffset().getValue()) { + return new Exception( + "Expect offset to be 3 but got " + + appendFuture2.get().getAppendResult().getOffset().getValue()); + } + testBigQueryWrite.waitForResponseScheduled(); + // This triggers the last response to come back. + fakeExecutor.advanceTime(Duration.ofSeconds(10)); + // This request will be waiting for previous response to come back. + if (4L != appendFuture3.get().getAppendResult().getOffset().getValue()) { + return new Exception( + "Expect offset to be 4 but got " + + appendFuture2.get().getAppendResult().getOffset().getValue()); + } + } catch (InterruptedException e) { + return e; + } catch (ExecutionException e) { + return e; + } catch (IllegalStateException e) { + // In very rare cases, the stream is shutdown before the request is send, ignore this + // error. + } + return null; + } + }; + Future future = executor.submit(callable); + assertEquals(false, appendFuture1.isDone()); // Wait is necessary for response to be scheduled before timer is advanced. + testBigQueryWrite.waitForResponseScheduled(); + testBigQueryWrite.waitForResponseScheduled(); + // This will trigger the previous two responses to come back. fakeExecutor.advanceTime(Duration.ofSeconds(10)); - t.join(); + // The first requests gets back while the second one is blocked. + assertEquals(2L, appendFuture1.get().getAppendResult().getOffset().getValue()); + // When close is called, there should be one inflight request waiting. + Thread.sleep(500); writer.close(); + if (future.get() != null) { + future.get().printStackTrace(); + fail("Callback got exception" + future.get().toString()); + } + // Everything should come back. + executor.shutdown(); } @Test @@ -496,7 +1015,7 @@ public void testFlowControlBehaviorException() throws Exception { ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); // Wait is necessary for response to be scheduled before timer is advanced. - Thread.sleep(5000L); + testBigQueryWrite.waitForResponseScheduled(); fakeExecutor.advanceTime(Duration.ofSeconds(10)); try { appendFuture2.get(); @@ -597,6 +1116,40 @@ public void testOffsetMismatch() throws Exception { } } + @Test + public void testStreamAppendDirectException() throws Exception { + StreamWriter writer = + getTestStreamWriterBuilder() + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .setDelayThreshold(Duration.ofSeconds(5)) + .build()) + .build(); + testBigQueryWrite.addException(Status.DATA_LOSS.asException()); + ApiFuture future1 = sendTestMessage(writer, new String[] {"A"}); + try { + future1.get(); + fail("Expected furture1 to fail"); + } catch (ExecutionException ex) { + assertEquals(DataLossException.class, ex.getCause().getClass()); + } + try { + sendTestMessage(writer, new String[] {"B"}); + fail("Expected furture2 to fail"); + } catch (IllegalStateException ex) { + assertEquals("Stream already failed.", ex.getMessage()); + } + writer.shutdown(); + try { + sendTestMessage(writer, new String[] {"C"}); + fail("Expected furture3 to fail"); + } catch (IllegalStateException ex) { + assertEquals("Cannot append on a shut-down writer.", ex.getMessage()); + } + } + @Test public void testErrorPropagation() throws Exception { StreamWriter writer = @@ -895,6 +1448,7 @@ public void testShutdownWithConnectionError() throws Exception { .setElementCountThreshold(1L) .build()) .build(); + // Three request will reach the server. testBigQueryWrite.addResponse( AppendRowsResponse.newBuilder() .setAppendResult( @@ -902,14 +1456,24 @@ public void testShutdownWithConnectionError() throws Exception { .build()); testBigQueryWrite.addException(Status.DATA_LOSS.asException()); testBigQueryWrite.addException(Status.DATA_LOSS.asException()); + testBigQueryWrite.addException(Status.DATA_LOSS.asException()); testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10)); - ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); - ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); - ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"B"}); - Thread.sleep(5000L); - // Move the needle for responses to be sent. - fakeExecutor.advanceTime(Duration.ofSeconds(20)); + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}, 1); + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}, 2); + ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"}, 3); + testBigQueryWrite.waitForResponseScheduled(); + testBigQueryWrite.waitForResponseScheduled(); + testBigQueryWrite.waitForResponseScheduled(); + fakeExecutor.advanceTime(Duration.ofSeconds(10)); + // This will will never be in inflight and aborted by previous failure, because its delay is set + // after timer advance. + Thread.sleep(500); + try { + ApiFuture appendFuture4 = sendTestMessage(writer, new String[] {"D"}, 4); + } catch (IllegalStateException ex) { + assertEquals("Stream already failed.", ex.getMessage()); + } // Shutdown writer immediately and there will be some error happened when flushing the queue. writer.shutdown(); assertEquals(1, appendFuture1.get().getAppendResult().getOffset().getValue());