Skip to content

Commit

Permalink
JCO-5 Retry failed queries
Browse files Browse the repository at this point in the history
Change-Id: I35c5f58f8d3ddf5e0386a69d91be91f4db18f4fc
Reviewed-on: https://review.couchbase.org/c/couchbase-jvm-clients/+/214074
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: David Nault <david.nault@couchbase.com>
  • Loading branch information
dnault committed Aug 13, 2024
1 parent 338a0da commit aa310b0
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 58 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2024 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.couchbase.columnar.client.java;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import static java.util.Collections.unmodifiableList;

class BackoffCalculator {
private final long maxDelayMillis;
private final List<Long> delayMillisLookupTable;

BackoffCalculator(Duration initialDelay, Duration maxDelay) {
final long initialDelayMillis = Math.max(1, initialDelay.toMillis());
this.maxDelayMillis = Math.max(1, maxDelay.toMillis());
if (maxDelayMillis < initialDelayMillis) {
throw new IllegalArgumentException("maxDelay must be <= initialDelay");
}

// Precompute delays. Radically faster than calling Math.pow() every time,
// simpler than twiddling bits, and doesn't rely on Guava's saturated math.
// Resulting list always has fewer than 64 elements. In practice, it has ~10.
List<Long> mutableDelays = new ArrayList<>();
long d = initialDelayMillis;
try {
do {
mutableDelays.add(d);
d = Math.multiplyExact(d, 2);
} while (d < maxDelayMillis);
} catch (ArithmeticException done) {
// overflow is definitely greater than maxDelay
}
this.delayMillisLookupTable = unmodifiableList(mutableDelays);
}

Duration delayForAttempt(long attempt) {
final long millis = attempt < delayMillisLookupTable.size() ? delayMillisLookupTable.get((int) attempt) : maxDelayMillis;
long jitteredMillis = (long) (millis * ThreadLocalRandom.current().nextDouble());
return Duration.ofMillis(jitteredMillis);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public QueryResult executeQuery(
String statement,
Consumer<QueryOptions> optionsCustomizer
) {
return queryExecutor.query(statement, optionsCustomizer, null);
return queryExecutor.queryBuffered(statement, optionsCustomizer, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,34 +38,44 @@
import com.couchbase.client.core.retry.BestEffortRetryStrategy;
import com.couchbase.client.core.retry.RetryAction;
import com.couchbase.client.core.retry.RetryReason;
import com.couchbase.client.core.util.CbThrowables;
import com.couchbase.client.core.util.ConnectionString;
import com.couchbase.client.core.util.CoreAsyncUtils;
import com.couchbase.client.core.util.Deadline;
import com.couchbase.columnar.client.java.codec.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;

import static com.couchbase.client.core.retry.AuthErrorDecider.getTlsHandshakeFailure;
import static com.couchbase.client.core.retry.RetryReason.AUTHENTICATION_ERROR;
import static com.couchbase.client.core.retry.RetryReason.ENDPOINT_NOT_AVAILABLE;
import static com.couchbase.client.core.retry.RetryReason.GLOBAL_CONFIG_LOAD_IN_PROGRESS;
import static com.couchbase.client.core.util.CbObjects.defaultIfNull;
import static com.couchbase.client.core.util.Golang.encodeDurationToMs;
import static com.couchbase.columnar.client.java.internal.ReactorHelper.forEachBlocking;
import static com.couchbase.columnar.client.java.internal.ReactorHelper.propagateAsCancellation;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

class QueryExecutor {
private static final Logger log = LoggerFactory.getLogger(QueryExecutor.class);

private static final double dispatchTimeoutFactor = 1.5; // of connectTimeout

private static final BackoffCalculator backoff = new BackoffCalculator(
Duration.ofMillis(100),
Duration.ofMinutes(1)
);

private final Core core;
private final Environment environment;
private final ColumnarRetryStrategy columnarRetryStrategy;
Expand All @@ -82,63 +92,136 @@ public QueryExecutor(
columnarRetryStrategy = new ColumnarRetryStrategy(dispatchTimeout, connectionString);
}

QueryResult query(
QueryResult queryBuffered(
String statement,
Consumer<QueryOptions> optionsCustomizer,
@Nullable CoreBucketAndScope scope
) {
QueryOptions builder = new QueryOptions();
optionsCustomizer.accept(builder);
QueryOptions.Unmodifiable builtOpts = builder.build();

Deserializer customDeserializer = builtOpts.deserializer();

try {
return CoreAsyncUtils.block(
analyticsQueryAsync(
core,
analyticsRequest(statement, builtOpts, scope),
customDeserializer != null ? customDeserializer : environment.deserializer()
)
);
} catch (RuntimeException t) {
throw translateException(t);
}
return doWithRetry(
optionsCustomizer,
(options, remainingTimeout) -> blockAndRewriteStackTrace(analyticsQueryAsync(
core,
analyticsRequest(statement, options, remainingTimeout, scope),
defaultIfNull(options.deserializer(), environment.deserializer())
))
);
}

QueryMetadata queryStreaming(
String statement,
Consumer<QueryOptions> optionsCustomizer,
@Nullable CoreBucketAndScope scope,
Consumer<Row> rowConsumer
) {
return doWithRetry(
optionsCustomizer,
(options, remainingTimeout) -> analyticsQueryBlockingStreaming(
core,
analyticsRequest(statement, options, remainingTimeout, scope),
defaultIfNull(options.deserializer(), environment.deserializer()),
rowConsumer
)
);
}

/**
* A little adapter that lets buffered and streaming queries
* both use the same retry code.
*/
private interface QueryStrategy<R> {
R apply(
QueryOptions.Unmodifiable options,
Duration remainingTimeout
);
}

private <R> R doWithRetry(
Consumer<QueryOptions> optionsCustomizer,
QueryStrategy<R> strategy
) {
QueryOptions builder = new QueryOptions();
optionsCustomizer.accept(builder);
QueryOptions.Unmodifiable builtOpts = builder.build();

Deserializer customDeserializer = builtOpts.deserializer();
Duration remainingTimeout = resolveTimeout(builtOpts);
Deadline retryDeadline = Deadline.of(remainingTimeout);

CoreErrorCodeAndMessageException prevError = null;
int attempt = 0;

while (true) {
try {
return strategy.apply(builtOpts, remainingTimeout);

} catch (RuntimeException t) {
if (t instanceof CoreErrorCodeAndMessageException) {
CoreErrorCodeAndMessageException currentError = (CoreErrorCodeAndMessageException) t;

if (currentError.retriable()) {
Duration delay = backoff.delayForAttempt(attempt++);
remainingTimeout = retryDeadline.remaining().orElse(Duration.ZERO);

if (remainingTimeout.compareTo(delay) <= 0) {
throw notEnoughTimeToRetry(attempt, currentError);
}

log.debug("Query attempt {} failed; retrying after {}. {}", attempt, delay, context(currentError));
sleep(delay);
prevError = currentError;
continue; // retry!
}
}

throw translateException(t, prevError);
}
}
}

private static TimeoutException notEnoughTimeToRetry(int attempt, CoreErrorCodeAndMessageException t) {
TimeoutException timeoutException = new TimeoutException(
"Query attempt " + attempt + " failed, and there's not enough time left to try again. " +
t.context().exportAsString(Context.ExportFormat.JSON)
);
timeoutException.addSuppressed(translateException(t));
return timeoutException;
}

private static Object context(CouchbaseException e) {
// defers building the string unless actually needed [logged].
return new Object() {
@Override
public String toString() {
ErrorContext ctx = e.context();
return ctx == null ? "{}" : ctx.exportAsString(Context.ExportFormat.JSON);
}
};
}

private static void sleep(Duration d) {
try {
return analyticsQueryBlockingStreaming(
core,
analyticsRequest(statement, builtOpts, scope),
customDeserializer != null ? customDeserializer : environment.deserializer(),
rowConsumer
);
} catch (RuntimeException t) {
throw translateException(t);
MILLISECONDS.sleep(d.toMillis());
} catch (InterruptedException e) {
throw propagateAsCancellation(e);
}
}

private static RuntimeException translateException(RuntimeException e, @Nullable Exception suppressMe) {
RuntimeException result = translateException(e);
if (suppressMe != null) {
result.addSuppressed(suppressMe);
}
return result;
}

private static RuntimeException translateException(RuntimeException e) {
if (e instanceof CoreErrorCodeAndMessageException) {
CoreErrorCodeAndMessageException t = (CoreErrorCodeAndMessageException) e;

if (t.errorCodeAndMessage().code() == 20000) {
if (t.hasCode(20000)) {
return new InvalidCredentialException(t.context());
}

if (t.errorCodeAndMessage().code() == 21002) {
if (t.hasCode(21002)) {
if (t.context() instanceof AnalyticsErrorContext) {
AnalyticsErrorContext ctx = (AnalyticsErrorContext) t.context();
if (ctx.requestContext().request().idempotent()) {
Expand All @@ -148,7 +231,7 @@ private static RuntimeException translateException(RuntimeException e) {
return newAmbiguousTimeoutException(t.context());
}

return new QueryException(t.errorCodeAndMessage(), t.context());
return new QueryException(t.errors().get(0), t.context());
}

if (e instanceof com.couchbase.client.core.error.TimeoutException) {
Expand All @@ -167,16 +250,14 @@ private static RuntimeException translateException(RuntimeException e) {
return hide((CouchbaseException) e);
}

if (CbThrowables.hasCause(e, InterruptedException.class)) {
// Satisfy executeQuery() contract.
CancellationException ce = new CancellationException("Thread was interrupted.");
ce.addSuppressed(e);
return ce;
}

return e;
}

Duration resolveTimeout(QueryOptions.Unmodifiable opts) {
Duration customTimeout = opts.timeout();
return customTimeout != null ? customTimeout : environment.timeoutConfig().analyticsTimeout();
}

/**
* Helper method to craft an analytics request.
*
Expand All @@ -187,16 +268,19 @@ private static RuntimeException translateException(RuntimeException e) {
AnalyticsRequest analyticsRequest(
final String statement,
final QueryOptions.Unmodifiable opts,
final Duration timeout,
@Nullable final CoreBucketAndScope scope
) {
requireNonNull(statement);

Duration customTimeout = opts.timeout();
Duration timeout = customTimeout != null ? customTimeout : environment.timeoutConfig().analyticsTimeout();
// The server timeout doesn't _need_ to be different, but making it longer
// means it's more likely the user will consistently get timeout exceptions
// with client timeout error contexts instead of a mix of client- and server-side contexts.
Duration serverTimeout = timeout.plus(Duration.ofSeconds(5));

ObjectNode query = Mapper.createObjectNode();
query.put("statement", statement);
query.put("timeout", encodeDurationToMs(timeout));
query.put("timeout", encodeDurationToMs(serverTimeout));
if (scope != null) {
query.put("query_context", "default:`" + scope.bucketName() + "`.`" + scope.scopeName() + "`");
}
Expand Down Expand Up @@ -256,6 +340,7 @@ private static QueryMetadata analyticsQueryBlockingStreaming(
final Consumer<Row> callback
) {
Deadline deadline = Deadline.of(request.timeout());

AnalyticsResponse response = analyticsQueryInternal(core, request).blockOptional().get();

Mono<?> wholeStreamDeadlineAsMono = Mono.never().timeout(
Expand Down Expand Up @@ -385,7 +470,7 @@ private ErrorContext getErrorContext(Throwable t, Request<?> request) {
* is dispatched to the server.
* </ul>
*
* @see QueryOptions#readOnly(boolean)
* @see QueryOptions#readOnly(Boolean)
*/
private static TimeoutException newSafeTimeoutException(String message, Context context) {
return new TimeoutException(message + " " + context.exportAsString(Context.ExportFormat.JSON));
Expand Down Expand Up @@ -427,4 +512,37 @@ private static RuntimeException hide(CouchbaseException e) {

return r;
}

/**
* Similar to {@link com.couchbase.client.core.util.CoreAsyncUtils#block(CompletableFuture)}
* but handles interruption differently.
*/
private static <T> T blockAndRewriteStackTrace(CompletableFuture<T> future) {
try {
return future.get();

} catch (InterruptedException e) {
throw propagateAsCancellation(e);

} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof RuntimeException) {
rewriteStackTrace(cause);
throw (RuntimeException) cause;
}
throw new RuntimeException(cause);
}
}

/**
* Adjusts the stack trace to point HERE instead of the thread where the exception was actually thrown.
* Preserves the original async stack trace as a suppressed exception.
*/
private static void rewriteStackTrace(Throwable t) {
Exception suppressed = new Exception(
"The above exception was originally thrown by another thread at the following location.");
suppressed.setStackTrace(t.getStackTrace());
t.fillInStackTrace();
t.addSuppressed(suppressed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public String name() {

@Override
public QueryResult executeQuery(String statement, Consumer<QueryOptions> options) {
return cluster.queryExecutor.query(
return cluster.queryExecutor.queryBuffered(
statement,
options,
new CoreBucketAndScope(database.name(), name)
Expand Down
Loading

0 comments on commit aa310b0

Please sign in to comment.