Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
fix: fix watchdog NPE red herring (#1344)
Browse files Browse the repository at this point in the history
* fix: fix watchdog NPE red herring

* adding a test

* checking a few more things in the test

* make innercallable volatile
  • Loading branch information
mutianf authored Apr 14, 2021
1 parent 32240eb commit 06dbf12
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 1 deletion.
8 changes: 7 additions & 1 deletion gax/src/main/java/com/google/api/gax/rpc/Watchdog.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class WatchdogStream<ResponseT> extends StateCheckingResponseObserver<ResponseT>
private boolean autoAutoFlowControl = true;

private final ResponseObserver<ResponseT> outerResponseObserver;
private StreamController innerController;
private volatile StreamController innerController;

@GuardedBy("lock")
private State state = State.IDLE;
Expand Down Expand Up @@ -296,6 +296,12 @@ public void onCompleteImpl() {
* @return True if the stream was canceled.
*/
boolean cancelIfStale() {
// If the stream hasn't started yet, innerController will be null. Skip the check this time
// and return false so the stream is still watched.
if (innerController == null) {
return false;
}

Throwable myError = null;

synchronized (lock) {
Expand Down
25 changes: 25 additions & 0 deletions gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,31 @@ public void testIdleTimeout() throws InterruptedException {
assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class);
}

@Test
public void testTimedOutBeforeStart() throws InterruptedException {
MockServerStreamingCallable<String, String> callable1 = new MockServerStreamingCallable<>();
AccumulatingObserver<String> downstreamObserver1 = new AccumulatingObserver<>();
ResponseObserver observer = watchdog.watch(downstreamObserver1, waitTime, idleTime);
clock.incrementNanoTime(idleTime.toNanos() + 1);
// This should not remove callable1 from watched list
watchdog.run();
assertThat(downstreamObserver1.done.isDone()).isFalse();

callable1.call("request", observer);
// This should cancel callable1
watchdog.run();
MockServerStreamingCall<String, String> call1 = callable1.popLastCall();
assertThat(call1.getController().isCancelled()).isTrue();
call1.getController().getObserver().onError(new CancellationException("User cancelled"));
Throwable error = null;
try {
downstreamObserver1.done.get();
} catch (ExecutionException t) {
error = t.getCause();
}
assertThat(error).isInstanceOf(WatchdogTimeoutException.class);
}

@Test
public void testMultiple() throws Exception {
// Start stream1
Expand Down

0 comments on commit 06dbf12

Please sign in to comment.