Skip to content

Commit

Permalink
fix: retry certain RESOURCE_EXHAUSTED errors
Browse files Browse the repository at this point in the history
Handle certain RESOURCE_EXHAUSTED errors and report the retry attempts.
  • Loading branch information
esert-g committed Aug 24, 2021
1 parent 7755bf7 commit 8b99da1
Show file tree
Hide file tree
Showing 17 changed files with 731 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,53 @@
*/
package com.google.cloud.bigquery.storage.util;

import com.google.rpc.RetryInfo;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.protobuf.ProtoUtils;
import org.threeten.bp.Duration;

/** Static utility methods for working with Errors returned from the service. */
public class Errors {
private Errors() {};

public static class IsRetryableStatusResult {
public boolean isRetryable = false;
public Duration retryDelay = null;
}

private static final Metadata.Key<RetryInfo> KEY_RETRY_INFO =
ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());

/**
* Returns true iff the Status indicates an error that is retryable.
*
* <p>Generally, internal errors are not considered retryable, however there are certain transient
* network issues that appear as internal but are in fact retryable.
*
* <p>Resource exhausted errors are only considered retryable if metadata contains a serialized
* RetryInfo object.
*/
public static IsRetryableStatusResult isRetryableStatus(Status status, Metadata metadata) {
IsRetryableStatusResult result = new IsRetryableStatusResult();

result.isRetryable = isRetryableInternalStatus(status);
if (!result.isRetryable
&& status.getCode() == Status.Code.RESOURCE_EXHAUSTED
&& metadata != null
&& metadata.containsKey(KEY_RETRY_INFO)) {
RetryInfo retryInfo = metadata.get(KEY_RETRY_INFO);
if (retryInfo.hasRetryDelay()) {
result.isRetryable = true;
result.retryDelay =
Duration.ofSeconds(
retryInfo.getRetryDelay().getSeconds(), retryInfo.getRetryDelay().getNanos());
}
}

return result;
}

/**
* Returns true iff the Status indicates and internal error that is retryable.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ public static final BigQueryReadClient create(EnhancedBigQueryReadStub stub) {
*/
protected BigQueryReadClient(BigQueryReadSettings settings) throws IOException {
this.settings = settings;
this.stub = EnhancedBigQueryReadStub.create(settings.getTypedStubSettings());
this.stub =
EnhancedBigQueryReadStub.create(
settings.getTypedStubSettings(), settings.getReadRowsRetryAttemptListener());
}

@BetaApi("A restructuring of stub classes is planned, so this may break in the future")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.cloud.bigquery.storage.v1.stub.EnhancedBigQueryReadStubSettings;
import io.grpc.Metadata;
import io.grpc.Status;
import java.io.IOException;
import java.util.List;

Expand Down Expand Up @@ -69,6 +71,26 @@ public ServerStreamingCallSettings<ReadRowsRequest, ReadRowsResponse> readRowsSe
return getTypedStubSettings().readRowsSettings();
}

public static interface RetryAttemptListener {
public void onRetryAttempt(Status prevStatus, Metadata prevMetadata);
}

private RetryAttemptListener readRowsRetryAttemptListener = null;

/**
* If a non null readRowsRetryAttemptListener is provided, client will call onRetryAttempt
* function before a failed ReadRows request is retried. This can be used as negative feedback
* mechanism for future decision to split read streams because some retried failures are due to
* resource exhaustion that increased parallelism only makes it worse.
*/
public void setReadRowsRetryAttemptListener(RetryAttemptListener readRowsRetryAttemptListener) {
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
}

public RetryAttemptListener getReadRowsRetryAttemptListener() {
return readRowsRetryAttemptListener;
}

/** Returns the object with the settings used for calls to splitReadStream. */
public UnaryCallSettings<SplitReadStreamRequest, SplitReadStreamResponse>
splitReadStreamSettings() {
Expand Down Expand Up @@ -176,6 +198,14 @@ public Builder applyToAllUnaryMethods(
return this;
}

private RetryAttemptListener readRowsRetryAttemptListener = null;

public Builder setReadRowsRetryAttemptListener(
RetryAttemptListener readRowsRetryAttemptListener) {
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
return this;
}

/** Returns the builder for the settings used for calls to createReadSession. */
public UnaryCallSettings.Builder<CreateReadSessionRequest, ReadSession>
createReadSessionSettings() {
Expand All @@ -196,7 +226,9 @@ public Builder applyToAllUnaryMethods(

@Override
public BigQueryReadSettings build() throws IOException {
return new BigQueryReadSettings(this);
BigQueryReadSettings settings = new BigQueryReadSettings(this);
settings.setReadRowsRetryAttemptListener(readRowsRetryAttemptListener);
return settings;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.api.gax.tracing.SpanName;
import com.google.api.gax.tracing.TracedServerStreamingCallable;
import com.google.cloud.bigquery.storage.v1.BigQueryReadGrpc;
import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
Expand All @@ -54,10 +55,18 @@ public class EnhancedBigQueryReadStub implements BackgroundResource {
private static final String TRACING_OUTER_CLIENT_NAME = "BigQueryStorage";
private final GrpcBigQueryReadStub stub;
private final BigQueryReadStubSettings stubSettings;
private final BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener;
private final ClientContext context;

public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings settings)
throws IOException {
return create(settings, null);
}

public static EnhancedBigQueryReadStub create(
EnhancedBigQueryReadStubSettings settings,
BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener)
throws IOException {
// Configure the base settings.
BigQueryReadStubSettings.Builder baseSettingsBuilder =
BigQueryReadStubSettings.newBuilder()
Expand Down Expand Up @@ -88,14 +97,19 @@ public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings s
BigQueryReadStubSettings baseSettings = baseSettingsBuilder.build();
ClientContext clientContext = ClientContext.create(baseSettings);
GrpcBigQueryReadStub stub = new GrpcBigQueryReadStub(baseSettings, clientContext);
return new EnhancedBigQueryReadStub(stub, baseSettings, clientContext);
return new EnhancedBigQueryReadStub(
stub, baseSettings, readRowsRetryAttemptListener, clientContext);
}

@InternalApi("Visible for testing")
EnhancedBigQueryReadStub(
GrpcBigQueryReadStub stub, BigQueryReadStubSettings stubSettings, ClientContext context) {
GrpcBigQueryReadStub stub,
BigQueryReadStubSettings stubSettings,
BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener,
ClientContext context) {
this.stub = stub;
this.stubSettings = stubSettings;
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
this.context = context;
}

Expand Down Expand Up @@ -123,7 +137,7 @@ public Map<String, String> extract(ReadRowsRequest request) {

StreamingRetryAlgorithm<Void> retryAlgorithm =
new StreamingRetryAlgorithm<>(
new ApiResultRetryAlgorithm<Void>(),
new ApiResultRetryAlgorithm<Void>(readRowsRetryAttemptListener),
new ExponentialRetryAlgorithm(callSettings.getRetrySettings(), context.getClock()));

ScheduledRetryingExecutor<Void> retryingExecutor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.bigquery.storage.util.Errors;
import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
import io.grpc.Metadata;
import io.grpc.Status;
import org.threeten.bp.Duration;

Expand All @@ -30,17 +32,41 @@ public class ApiResultRetryAlgorithm<ResponseT> implements ResultRetryAlgorithm<
// Duration to sleep on if the error is DEADLINE_EXCEEDED.
public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1);

private final BigQueryReadSettings.RetryAttemptListener retryAttemptListener;

public ApiResultRetryAlgorithm() {
this(null);
}

public ApiResultRetryAlgorithm(BigQueryReadSettings.RetryAttemptListener retryAttemptListener) {
super();
this.retryAttemptListener = retryAttemptListener;
}

@Override
public TimedAttemptSettings createNextAttempt(
Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) {
if (prevThrowable != null) {
Status status = Status.fromThrowable(prevThrowable);
if (Errors.isRetryableInternalStatus(status)) {
Metadata metadata = Status.trailersFromThrowable(prevThrowable);
Errors.IsRetryableStatusResult result = Errors.isRetryableStatus(status, metadata);
if (result.isRetryable) {
// If result.retryDelay isn't null, we know exactly how long we must wait, so both regular
// and randomized delays are the same.
Duration retryDelay = result.retryDelay;
Duration randomizedRetryDelay = result.retryDelay;
if (retryDelay == null) {
retryDelay = prevSettings.getRetryDelay();
randomizedRetryDelay = DEADLINE_SLEEP_DURATION;
}
if (retryAttemptListener != null) {
retryAttemptListener.onRetryAttempt(status, metadata);
}
return TimedAttemptSettings.newBuilder()
.setGlobalSettings(prevSettings.getGlobalSettings())
.setRetryDelay(prevSettings.getRetryDelay())
.setRetryDelay(retryDelay)
.setRpcTimeout(prevSettings.getRpcTimeout())
.setRandomizedRetryDelay(DEADLINE_SLEEP_DURATION)
.setRandomizedRetryDelay(randomizedRetryDelay)
.setAttemptCount(prevSettings.getAttemptCount() + 1)
.setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos())
.build();
Expand All @@ -53,7 +79,8 @@ public TimedAttemptSettings createNextAttempt(
public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) {
if (prevThrowable != null) {
Status status = Status.fromThrowable(prevThrowable);
if (Errors.isRetryableInternalStatus(status)) {
Metadata metadata = Status.trailersFromThrowable(prevThrowable);
if (Errors.isRetryableStatus(status, metadata).isRetryable) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ public static final BigQueryStorageClient create(EnhancedBigQueryStorageStub stu
*/
protected BigQueryStorageClient(BigQueryStorageSettings settings) throws IOException {
this.settings = settings;
this.stub = EnhancedBigQueryStorageStub.create(settings.getTypedStubSettings());
this.stub =
EnhancedBigQueryStorageStub.create(
settings.getTypedStubSettings(), settings.getReadRowsRetryAttemptListener());
}

@BetaApi("A restructuring of stub classes is planned, so this may break in the future")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import com.google.cloud.bigquery.storage.v1beta1.Storage.SplitReadStreamResponse;
import com.google.cloud.bigquery.storage.v1beta1.stub.EnhancedBigQueryStorageStubSettings;
import com.google.protobuf.Empty;
import io.grpc.Metadata;
import io.grpc.Status;
import java.io.IOException;
import java.util.List;

Expand Down Expand Up @@ -78,6 +80,26 @@ public ServerStreamingCallSettings<ReadRowsRequest, ReadRowsResponse> readRowsSe
return getTypedStubSettings().readRowsSettings();
}

public static interface RetryAttemptListener {
public void onRetryAttempt(Status prevStatus, Metadata prevMetadata);
}

private RetryAttemptListener readRowsRetryAttemptListener = null;

/**
* If a non null readRowsRetryAttemptListener is provided, client will call onRetryAttempt
* function before a failed ReadRows request is retried. This can be used as negative feedback
* mechanism for future decision to split read streams because some retried failures are due to
* resource exhaustion that increased parallelism only makes it worse.
*/
public void setReadRowsRetryAttemptListener(RetryAttemptListener readRowsRetryAttemptListener) {
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
}

public RetryAttemptListener getReadRowsRetryAttemptListener() {
return readRowsRetryAttemptListener;
}

/** Returns the object with the settings used for calls to batchCreateReadSessionStreams. */
public UnaryCallSettings<
BatchCreateReadSessionStreamsRequest, BatchCreateReadSessionStreamsResponse>
Expand Down Expand Up @@ -197,6 +219,14 @@ public Builder applyToAllUnaryMethods(
return this;
}

private RetryAttemptListener readRowsRetryAttemptListener = null;

public Builder setReadRowsRetryAttemptListener(
RetryAttemptListener readRowsRetryAttemptListener) {
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
return this;
}

/** Returns the builder for the settings used for calls to createReadSession. */
public UnaryCallSettings.Builder<CreateReadSessionRequest, ReadSession>
createReadSessionSettings() {
Expand Down Expand Up @@ -229,7 +259,9 @@ public UnaryCallSettings.Builder<FinalizeStreamRequest, Empty> finalizeStreamSet

@Override
public BigQueryStorageSettings build() throws IOException {
return new BigQueryStorageSettings(this);
BigQueryStorageSettings settings = new BigQueryStorageSettings(this);
settings.setReadRowsRetryAttemptListener(readRowsRetryAttemptListener);
return settings;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.api.gax.tracing.SpanName;
import com.google.api.gax.tracing.TracedServerStreamingCallable;
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageGrpc;
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageSettings;
import com.google.cloud.bigquery.storage.v1beta1.Storage.BatchCreateReadSessionStreamsRequest;
import com.google.cloud.bigquery.storage.v1beta1.Storage.BatchCreateReadSessionStreamsResponse;
import com.google.cloud.bigquery.storage.v1beta1.Storage.CreateReadSessionRequest;
Expand Down Expand Up @@ -58,10 +59,18 @@ public class EnhancedBigQueryStorageStub implements BackgroundResource {
private static final String TRACING_OUTER_CLIENT_NAME = "BigQueryStorage";
private final GrpcBigQueryStorageStub stub;
private final BigQueryStorageStubSettings stubSettings;
private final BigQueryStorageSettings.RetryAttemptListener readRowsRetryAttemptListener;
private final ClientContext context;

public static EnhancedBigQueryStorageStub create(EnhancedBigQueryStorageStubSettings settings)
throws IOException {
return create(settings, null);
}

public static EnhancedBigQueryStorageStub create(
EnhancedBigQueryStorageStubSettings settings,
BigQueryStorageSettings.RetryAttemptListener readRowsRetryAttemptListener)
throws IOException {
// Configure the base settings.
BigQueryStorageStubSettings.Builder baseSettingsBuilder =
BigQueryStorageStubSettings.newBuilder()
Expand Down Expand Up @@ -107,16 +116,19 @@ public static EnhancedBigQueryStorageStub create(EnhancedBigQueryStorageStubSett
BigQueryStorageStubSettings baseSettings = baseSettingsBuilder.build();
ClientContext clientContext = ClientContext.create(baseSettings);
GrpcBigQueryStorageStub stub = new GrpcBigQueryStorageStub(baseSettings, clientContext);
return new EnhancedBigQueryStorageStub(stub, baseSettings, clientContext);
return new EnhancedBigQueryStorageStub(
stub, baseSettings, readRowsRetryAttemptListener, clientContext);
}

@InternalApi("Visible for testing")
EnhancedBigQueryStorageStub(
GrpcBigQueryStorageStub stub,
BigQueryStorageStubSettings stubSettings,
BigQueryStorageSettings.RetryAttemptListener readRowsRetryAttemptListener,
ClientContext context) {
this.stub = stub;
this.stubSettings = stubSettings;
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
this.context = context;
}

Expand Down Expand Up @@ -145,7 +157,7 @@ public Map<String, String> extract(ReadRowsRequest request) {

StreamingRetryAlgorithm<Void> retryAlgorithm =
new StreamingRetryAlgorithm<>(
new ApiResultRetryAlgorithm<Void>(),
new ApiResultRetryAlgorithm<Void>(readRowsRetryAttemptListener),
new ExponentialRetryAlgorithm(callSettings.getRetrySettings(), context.getClock()));

ScheduledRetryingExecutor<Void> retryingExecutor =
Expand Down
Loading

0 comments on commit 8b99da1

Please sign in to comment.