Skip to content

Commit

Permalink
fix: also shutdown the stream connection in case the timeout exceptio…
Browse files Browse the repository at this point in the history
…n is

triggered.
  • Loading branch information
agrawal-siddharth committed Mar 9, 2024
1 parent 4b1ff23 commit cf57b8b
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ public void run() {
} finally {
lock.unlock();
}
cleanupInflightRequests();
cleanup();
});
this.appendThread.start();
}
Expand Down Expand Up @@ -812,7 +812,10 @@ private void appendLoop() {
this.streamConnection.send(originalRequestBuilder.build());
}
}
cleanup();
}

private void cleanup() {
log.info(
"Cleanup starts. Stream: "
+ streamName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
import com.google.cloud.bigquery.storage.v1.*;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;
import com.google.cloud.bigquery.storage.v1.Exceptions.MaximumRequestCallbackWaitTimeExceededException;
import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetAlreadyExists;
import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetOutOfRange;
import com.google.cloud.bigquery.storage.v1.Exceptions.SchemaMismatchedException;
import com.google.cloud.bigquery.storage.v1.Exceptions.StreamFinalizedException;
import com.google.cloud.bigquery.storage.v1.Exceptions.StreamWriterClosedException;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
Expand All @@ -49,6 +51,7 @@
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.text.ParseException;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
Expand Down Expand Up @@ -1612,4 +1615,70 @@ public void testLargeRequest() throws IOException, InterruptedException, Executi
assertEquals("50", queryIter.next().get(0).getStringValue());
}
}

@Test
public void testTimeoutException() throws IOException, InterruptedException, ExecutionException {
try {
JsonStreamWriter.setMaxRequestCallbackWaitTime(
Duration.ofMillis(10)); // induce timeout exception
String tableName = "TestTimeoutExceptionTable";
TableInfo tableInfo =
TableInfo.newBuilder(
TableId.of(DATASET, tableName),
StandardTableDefinition.of(
Schema.of(
com.google.cloud.bigquery.Field.newBuilder(
"foo", StandardSQLTypeName.STRING)
.build())))
.build();
bigquery.create(tableInfo);
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
StreamWriter streamWriter =
StreamWriter.newBuilder(parent.toString() + "/_default")
.setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor()))
.build();

// Create initial set of requests; these will go to the inflight queue
List<ApiFuture<AppendRowsResponse>> futureResponses = new ArrayList<>();
final int MAX_INITIAL_REQUESTS = 10;
for (int i = 0; i < MAX_INITIAL_REQUESTS; i++) {
futureResponses.add(streamWriter.append(CreateProtoRows(new String[] {"aaa"}), -1));
}
Thread.sleep(
50); // Delay enough time for background thread to schedule existing requests, but not
// enough for all the responses to arrive

// Now generate one more request; once this is scheduled the timeout exception will be
// triggered
futureResponses.add(streamWriter.append(CreateProtoRows(new String[] {"ddd"}), -1));

boolean gotFirstException =
false; // We don't know how many responses will have arrived by the time the last request
// is scheduled; however, we expect not all responses will have arrived.
boolean gotSubsequentException = false;
for (int i = 0; i < MAX_INITIAL_REQUESTS + 1; i++) {
try {
AppendRowsResponse actualResponse = futureResponses.get(i).get();
} catch (Throwable t) {
assertTrue(t instanceof ExecutionException);
t = t.getCause();
if (!gotFirstException) {
gotFirstException = true;
assertTrue(t instanceof MaximumRequestCallbackWaitTimeExceededException);
} else {
gotSubsequentException = true;
assertTrue(t instanceof StreamWriterClosedException);
assertEquals(Code.ABORTED, Status.fromThrowable(t).getCode());
}
}
}
assertTrue(gotFirstException);
assertTrue(gotSubsequentException);
} finally {
JsonStreamWriter.setMaxRequestCallbackWaitTime(
Duration.ofMinutes(
5)); // restore timeout exception as this is a static setting and will affect other
// tests
}
}
}

0 comments on commit cf57b8b

Please sign in to comment.