Skip to content

Commit

Permalink
fix: retry watch on InternalException (#875)
Browse files Browse the repository at this point in the history
Fixes #866
  • Loading branch information
schmidt-sebastian committed Feb 22, 2022
1 parent 036f7f8 commit a76a0fd
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,9 @@ private static Status getStatus(Throwable throwable) {
return ((StatusRuntimeException) throwable).getStatus();
} else if (throwable instanceof StatusException) {
return ((StatusException) throwable).getStatus();
} else if (throwable instanceof ApiException
&& ((ApiException) throwable).getStatusCode().getTransportCode() instanceof Code) {
return ((Code) ((ApiException) throwable).getStatusCode().getTransportCode()).toStatus();
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;

import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.BidiStreamObserver;
import com.google.api.gax.rpc.BidiStreamingCallable;
import com.google.api.gax.rpc.ClientStream;
import com.google.api.gax.rpc.InternalException;
import com.google.cloud.firestore.Query.Direction;
import com.google.cloud.firestore.WatchTest.SnapshotDocument.ChangeType;
import com.google.cloud.firestore.spi.v1.FirestoreRpc;
Expand Down Expand Up @@ -393,6 +395,19 @@ public void queryWatchRetriesBasedOnErrorCode() throws InterruptedException {
}
}

@Test
public void queryWatchRetriesOnInternalException() throws InterruptedException {
addQueryListener();
awaitAddTarget();
send(addTarget());
destroy(new InternalException(null, GrpcStatusCode.of(Code.INTERNAL), true));
awaitAddTarget();
send(addTarget());
send(current());
send(snapshot());
awaitQuerySnapshot();
}

@Test
public void queryWatchHandlesDocumentChange() throws InterruptedException {
addQueryListener();
Expand Down Expand Up @@ -991,7 +1006,11 @@ private void send(ListenResponse response) {
}

private void destroy(Code code) {
streamObserverCapture.getValue().onError(new StatusException(io.grpc.Status.fromCode(code)));
destroy(new StatusException(io.grpc.Status.fromCode(code)));
}

private void destroy(Exception e) {
streamObserverCapture.getValue().onError(e);
}

private void close() {
Expand Down

0 comments on commit a76a0fd

Please sign in to comment.