diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 67b4bc7dc5..e295320153 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -699,7 +699,10 @@ private void appendLoop() { hasMessageInWaitingQueue.await(100, TimeUnit.MILLISECONDS); // Check whether we should error out the current append loop. if (inflightRequestQueue.size() > 0) { - throwIfWaitCallbackTooLong(inflightRequestQueue.getFirst().requestCreationTimeStamp); + Instant sendInstant = inflightRequestQueue.getFirst().requestSendTimeStamp; + if (sendInstant != null) { + throwIfWaitCallbackTooLong(sendInstant); + } } // Copy the streamConnectionIsConnected guarded by lock to a local variable. @@ -711,7 +714,9 @@ private void appendLoop() { // from inflightRequestQueue and prepent them onto the waitinRequestQueue. They need to be // prepended as they need to be sent before new requests. while (!inflightRequestQueue.isEmpty()) { - waitingRequestQueue.addFirst(inflightRequestQueue.pollLast()); + AppendRequestAndResponse requestWrapper = inflightRequestQueue.pollLast(); + requestWrapper.requestSendTimeStamp = null; + waitingRequestQueue.addFirst(requestWrapper); } // If any of the inflight messages were meant to be ignored during requestCallback, they @@ -721,7 +726,6 @@ private void appendLoop() { while (!this.waitingRequestQueue.isEmpty()) { AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst(); waitForBackoffIfNecessary(requestWrapper); - requestWrapper.trySetRequestInsertQueueTime(); this.inflightRequestQueue.add(requestWrapper); localQueue.addLast(requestWrapper); } @@ -760,6 +764,7 @@ private void appendLoop() { firstRequestForTableOrSchemaSwitch = true; } while (!localQueue.isEmpty()) { + localQueue.peekFirst().setRequestSendQueueTime(); AppendRowsRequest originalRequest = localQueue.pollFirst().message; AppendRowsRequest.Builder originalRequestBuilder = originalRequest.toBuilder(); // Always respect the first writer schema seen by the loop. @@ -1217,6 +1222,7 @@ private void doneCallback(Throwable finalStatus) { private AppendRequestAndResponse pollInflightRequestQueue(boolean pollLast) { AppendRequestAndResponse requestWrapper = pollLast ? inflightRequestQueue.pollLast() : inflightRequestQueue.poll(); + requestWrapper.requestSendTimeStamp = null; --this.inflightRequests; this.inflightBytes -= requestWrapper.messageSize; this.inflightReduced.signal(); @@ -1256,7 +1262,9 @@ static final class AppendRequestAndResponse { TimedAttemptSettings attemptSettings; - Instant requestCreationTimeStamp; + // Time at which request was last sent over the network. + // If a response is no longer expected this is set back to null. + Instant requestSendTimeStamp; AppendRequestAndResponse( AppendRowsRequest message, StreamWriter streamWriter, RetrySettings retrySettings) { @@ -1276,11 +1284,8 @@ static final class AppendRequestAndResponse { } } - void trySetRequestInsertQueueTime() { - // Only set the first time the caller tries to set the timestamp. - if (requestCreationTimeStamp == null) { - requestCreationTimeStamp = Instant.now(); - } + void setRequestSendQueueTime() { + requestSendTimeStamp = Instant.now(); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 2ded205822..06a558e658 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -1809,6 +1809,33 @@ public void testAppendSuccessAndInternalQuotaErrorRetrySuccess() throws Exceptio writer.close(); } + @Test + public void testInternalQuotaError_MaxWaitTimeExceed_RetrySuccess() throws Exception { + // In order for the test to succeed, the given request must complete successfully even after all + // the retries. The fake server is configured to fail 3 times with a quota error. This means the + // client will perform retry with exponential backoff. The fake server injects 1 second of delay + // for each response. In addition, the exponential backoff injects a couple of seconds of delay. + // This yields an overall delay of about 5 seconds before the request succeeds. If the request + // send timestamp was being set only once, this would eventually exceed the 4 second timeout + // limit, and throw an exception. With the current behavior, the request send timestamp is reset + // each time a retry is performed, so we never exceed the 4 second timeout limit. + StreamWriter.setMaxRequestCallbackWaitTime(java.time.Duration.ofSeconds(4)); + testBigQueryWrite.setResponseSleep(Duration.ofSeconds(1)); + StreamWriter writer = getTestStreamWriterRetryEnabled(); + testBigQueryWrite.addStatusException( + com.google.rpc.Status.newBuilder().setCode(Code.RESOURCE_EXHAUSTED.ordinal()).build()); + testBigQueryWrite.addStatusException( + com.google.rpc.Status.newBuilder().setCode(Code.RESOURCE_EXHAUSTED.ordinal()).build()); + testBigQueryWrite.addStatusException( + com.google.rpc.Status.newBuilder().setCode(Code.RESOURCE_EXHAUSTED.ordinal()).build()); + testBigQueryWrite.addResponse(createAppendResponse(0)); + + ApiFuture appendFuture1 = + writer.append(createProtoRows(new String[] {"A"})); + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + writer.close(); + } + @Test public void testAppendSuccessAndInternalErrorRetrySuccessExclusive() throws Exception { // Ensure we return an error from the fake server when a retry is in progress