Skip to content

Commit

Permalink
fix: Populate final stauts to initial request during connection shutd…
Browse files Browse the repository at this point in the history
…own (#2228)

* chore(main): release 2.41.1 (#2222)

Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com>

* chore(main): release 2.41.1 (#2222)

Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com>

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com>
Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 14, 2023
1 parent b85c562 commit 9b9b5c0
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 13 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,20 @@ If you are using Maven without the BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.21.0')
implementation platform('com.google.cloud:libraries-bom:26.22.0')
implementation 'com.google.cloud:google-cloud-bigquerystorage'
```
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.41.0'
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.41.1'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.41.0"
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.41.1"
```
<!-- {x-version-update-end} -->

Expand Down Expand Up @@ -220,7 +220,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-bigquerystorage/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-bigquerystorage.svg
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquerystorage/2.41.0
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquerystorage/2.41.1
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -839,8 +839,24 @@ private void cleanupInflightRequests() {
+ streamName
+ " id: "
+ writerId);
while (!localQueue.isEmpty()) {
localQueue.pollFirst().appendResult.setException(finalStatus);
int sizeOfQueue = localQueue.size();
for (int i = 0; i < sizeOfQueue; i++) {
if (i == 0) {
localQueue.pollFirst().appendResult.setException(finalStatus);
} else {
localQueue
.pollFirst()
.appendResult
.setException(
new Exceptions.StreamWriterClosedException(
Status.fromCode(Code.ABORTED)
.withDescription(
"Connection is aborted due to an unrecoverable failure of "
+ "another request sharing the connection. Please retry this "
+ "request."),
streamName,
writerId));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,11 @@ public void testThrowExceptionWhileWithinAppendLoop() throws Exception {
assertThrows(
ExecutionException.class,
() -> futures.get(finalI).get().getAppendResult().getOffset().getValue());
assertThat(ex.getCause()).hasMessageThat().contains("Any exception can happen.");
if (i == 0) {
assertThat(ex.getCause()).hasMessageThat().contains("Any exception can happen.");
} else {
assertThat(ex.getCause()).hasMessageThat().contains("Connection is aborted due to ");
}
}

// The future append will directly fail.
Expand Down Expand Up @@ -654,7 +658,11 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E
assertThrows(
ExecutionException.class,
() -> futures.get(finalI).get().getAppendResult().getOffset().getValue());
assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue");
if (i == 0) {
assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue");
} else {
assertThat(ex.getCause()).hasMessageThat().contains("Connection is aborted due to ");
}
}

// The future append will directly fail.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.google.api.gax.rpc.UnknownException;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings;
import com.google.cloud.bigquery.storage.v1.Exceptions.StreamWriterClosedException;
import com.google.cloud.bigquery.storage.v1.StorageError.StorageErrorCode;
import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -666,8 +667,12 @@ public void serverCloseWhileRequestsInflight() throws Exception {

// Server close should properly handle all inflight requests.
for (int i = 0; i < appendCount; i++) {
ApiException actualError = assertFutureException(ApiException.class, futures.get(i));
assertEquals(Code.INVALID_ARGUMENT, actualError.getStatusCode().getCode());
if (i == 0) {
ApiException actualError = assertFutureException(ApiException.class, futures.get(i));
assertEquals(Code.INVALID_ARGUMENT, actualError.getStatusCode().getCode());
} else {
assertFutureException(StreamWriterClosedException.class, futures.get(i));
}
}

writer.close();
Expand Down Expand Up @@ -988,7 +993,13 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E
assertThrows(
ExecutionException.class,
() -> futures.get(finalI).get().getAppendResult().getOffset().getValue());
assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue");
if (i == 0) {
assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue");
} else {
assertThat(ex.getCause())
.hasMessageThat()
.contains("Connection is aborted due to an unrecoverable");
}
}
}

Expand Down Expand Up @@ -1027,7 +1038,11 @@ public void testAppendWithResetNeverSuccess() throws Exception {
assertEquals(futures.get(0).get().getAppendResult().getOffset().getValue(), 0);
// after 5 seconds, the requests will bail out.
for (int i = 1; i < appendCount; i++) {
assertFutureException(AbortedException.class, futures.get(i));
if (i == 1) {
assertFutureException(AbortedException.class, futures.get(i));
} else {
assertFutureException(StreamWriterClosedException.class, futures.get(i));
}
}
}
}
Expand All @@ -1048,7 +1063,11 @@ public void testAppendWithResetNeverSuccessWithMultiplexing() throws Exception {
assertEquals(futures.get(0).get().getAppendResult().getOffset().getValue(), 0);
// after 5 seconds, the requests will bail out.
for (int i = 1; i < appendCount; i++) {
assertFutureException(AbortedException.class, futures.get(i));
if (i == 1) {
assertFutureException(AbortedException.class, futures.get(i));
} else {
assertFutureException(StreamWriterClosedException.class, futures.get(i));
}
}
}
}
Expand Down

0 comments on commit 9b9b5c0

Please sign in to comment.