Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: base transaction retries on error codes #129

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,16 @@ private FirestoreException(String reason, Status status, @Nullable Throwable cau
this.status = status;
}

private FirestoreException(IOException exception, boolean retryable) {
super(exception, retryable);
private FirestoreException(String reason, ApiException exception) {
super(
reason,
exception,
exception.getStatusCode().getCode().getHttpStatusCode(),
exception.isRetryable());
}

private FirestoreException(ApiException exception) {
super(exception);
private FirestoreException(IOException exception, boolean retryable) {
super(exception, retryable);
}

/**
Expand Down Expand Up @@ -91,7 +95,16 @@ static FirestoreException networkException(IOException exception, boolean retrya
* @return The FirestoreException
*/
static FirestoreException apiException(ApiException exception) {
return new FirestoreException(exception);
return new FirestoreException(exception.getMessage(), exception);
}

/**
* Creates a FirestoreException from an ApiException.
*
* @return The FirestoreException
*/
static FirestoreException apiException(ApiException exception, String message) {
return new FirestoreException(message, exception);
}

@InternalApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
package com.google.cloud.firestore;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.api.gax.rpc.BidiStreamingCallable;
import com.google.api.gax.rpc.ServerStreamingCallable;
Expand All @@ -29,16 +26,12 @@
import com.google.cloud.firestore.spi.v1.FirestoreRpc;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.firestore.v1.BatchGetDocumentsRequest;
import com.google.firestore.v1.BatchGetDocumentsResponse;
import com.google.firestore.v1.DatabaseRootName;
import com.google.protobuf.ByteString;
import io.grpc.Context;
import io.grpc.Status;
import io.opencensus.common.Scope;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import java.util.ArrayList;
Expand All @@ -47,7 +40,6 @@
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand All @@ -62,10 +54,7 @@ class FirestoreImpl implements Firestore {
private static final String AUTO_ID_ALPHABET =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";

private static final Logger LOGGER = Logger.getLogger("Firestore");
private static final Tracer tracer = Tracing.getTracer();
private static final io.opencensus.trace.Status TOO_MANY_RETRIES_STATUS =
io.opencensus.trace.Status.ABORTED.withDescription("too many retries");

private final FirestoreRpc firestoreClient;
private final FirestoreOptions firestoreOptions;
Expand Down Expand Up @@ -290,17 +279,16 @@ public Query collectionGroup(@Nonnull final String collectionId) {
@Nonnull
@Override
public <T> ApiFuture<T> runTransaction(@Nonnull final Transaction.Function<T> updateFunction) {
return runTransaction(updateFunction, TransactionOptions.create());
return runAsyncTransaction(
new TransactionAsyncAdapter<>(updateFunction), TransactionOptions.create());
}

@Nonnull
@Override
public <T> ApiFuture<T> runTransaction(
@Nonnull final Transaction.Function<T> updateFunction,
@Nonnull TransactionOptions transactionOptions) {
SettableApiFuture<T> resultFuture = SettableApiFuture.create();
runTransaction(new TransactionAsyncAdapter<>(updateFunction), resultFuture, transactionOptions);
return resultFuture;
return runAsyncTransaction(new TransactionAsyncAdapter<>(updateFunction), transactionOptions);
}

@Nonnull
Expand All @@ -315,160 +303,16 @@ public <T> ApiFuture<T> runAsyncTransaction(
public <T> ApiFuture<T> runAsyncTransaction(
@Nonnull final Transaction.AsyncFunction<T> updateFunction,
@Nonnull TransactionOptions transactionOptions) {
SettableApiFuture<T> resultFuture = SettableApiFuture.create();
runTransaction(updateFunction, resultFuture, transactionOptions);
return resultFuture;
}

/** Transaction functions that returns its result in the provided SettableFuture. */
private <T> void runTransaction(
final Transaction.AsyncFunction<T> transactionCallback,
final SettableApiFuture<T> resultFuture,
final TransactionOptions options) {
// span is intentionally not ended here. It will be ended by runTransactionAttempt on success
// or error.
Span span = tracer.spanBuilder("CloudFirestore.Transaction").startSpan();
try (Scope s = tracer.withSpan(span)) {
runTransactionAttempt(transactionCallback, resultFuture, options, span);
}
}

private <T> void runTransactionAttempt(
final Transaction.AsyncFunction<T> transactionCallback,
final SettableApiFuture<T> resultFuture,
final TransactionOptions options,
final Span span) {
final Transaction transaction = new Transaction(this, options.getPreviousTransactionId());
final Executor userCallbackExecutor =
Context.currentContextExecutor(
options.getExecutor() != null ? options.getExecutor() : firestoreClient.getExecutor());

final int attemptsRemaining = options.getNumberOfAttempts() - 1;
span.addAnnotation(
"Start runTransaction",
ImmutableMap.of("attemptsRemaining", AttributeValue.longAttributeValue(attemptsRemaining)));

ApiFutures.addCallback(
transaction.begin(),
new ApiFutureCallback<Void>() {
@Override
public void onFailure(Throwable throwable) {
// Don't retry failed BeginTransaction requests.
rejectTransaction(throwable);
}

@Override
public void onSuccess(Void ignored) {
ApiFutures.addCallback(
invokeUserCallback(),
new ApiFutureCallback<T>() {
@Override
public void onFailure(Throwable throwable) {
// This was a error in the user callback, forward the throwable.
rejectTransaction(throwable);
}

@Override
public void onSuccess(final T userResult) {
// Commit the transaction
ApiFutures.addCallback(
transaction.commit(),
new ApiFutureCallback<List<WriteResult>>() {
@Override
public void onFailure(Throwable throwable) {
// Retry failed commits.
maybeRetry(throwable);
}

@Override
public void onSuccess(List<WriteResult> writeResults) {
span.setStatus(io.opencensus.trace.Status.OK);
span.end();
resultFuture.set(userResult);
}
},
MoreExecutors.directExecutor());
}
},
MoreExecutors.directExecutor());
}

private SettableApiFuture<T> invokeUserCallback() {
// Execute the user callback on the provided executor.
final SettableApiFuture<T> callbackResult = SettableApiFuture.create();
userCallbackExecutor.execute(
new Runnable() {
@Override
public void run() {
try {
ApiFuture<T> updateCallback = transactionCallback.updateCallback(transaction);
ApiFutures.addCallback(
updateCallback,
new ApiFutureCallback<T>() {
@Override
public void onFailure(Throwable t) {
callbackResult.setException(t);
}

@Override
public void onSuccess(T result) {
callbackResult.set(result);
}
},
MoreExecutors.directExecutor());
} catch (Throwable t) {
callbackResult.setException(t);
}
}
});
return callbackResult;
}

private void maybeRetry(Throwable throwable) {
if (attemptsRemaining > 0) {
span.addAnnotation("retrying");
runTransactionAttempt(
transactionCallback,
resultFuture,
new TransactionOptions(
attemptsRemaining, options.getExecutor(), transaction.getTransactionId()),
span);
} else {
span.setStatus(TOO_MANY_RETRIES_STATUS);
rejectTransaction(
FirestoreException.serverRejected(
Status.ABORTED,
throwable,
"Transaction was cancelled because of too many retries."));
}
}

private void rejectTransaction(final Throwable throwable) {
if (throwable instanceof ApiException) {
span.setStatus(TraceUtil.statusFromApiException((ApiException) throwable));
}
span.end();
if (transaction.isPending()) {
ApiFutures.addCallback(
transaction.rollback(),
new ApiFutureCallback<Void>() {
@Override
public void onFailure(Throwable throwable) {
resultFuture.setException(throwable);
}

@Override
public void onSuccess(Void ignored) {
resultFuture.setException(throwable);
}
},
MoreExecutors.directExecutor());
} else {
resultFuture.setException(throwable);
}
}
},
MoreExecutors.directExecutor());
transactionOptions.getExecutor() != null
? transactionOptions.getExecutor()
: firestoreClient.getExecutor());

TransactionRunner<T> transactionRunner =
new TransactionRunner<>(
this, updateFunction, userCallbackExecutor, transactionOptions.getNumberOfAttempts());
return transactionRunner.run();
}

/** Returns whether the user has opted into receiving dates as com.google.cloud.Timestamp. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,25 +61,15 @@ public interface AsyncFunction<T> {
ApiFuture<T> updateCallback(Transaction transaction);
}

private final ByteString previousTransactionId;
private ByteString transactionId;
private boolean pending;
private @Nullable ByteString previousTransactionId;

Transaction(FirestoreImpl firestore, @Nullable ByteString previousTransactionId) {
Transaction(FirestoreImpl firestore, @Nullable Transaction previousTransaction) {
super(firestore);
this.previousTransactionId = previousTransactionId;
previousTransactionId = previousTransaction != null ? previousTransaction.transactionId : null;
}

@Nullable
ByteString getTransactionId() {
return transactionId;
}

boolean isPending() {
return pending;
}

/** Starts a transaction and obtains the transaction id from the server. */
/** Starts a transaction and obtains the transaction id. */
ApiFuture<Void> begin() {
BeginTransactionRequest.Builder beginTransaction = BeginTransactionRequest.newBuilder();
beginTransaction.setDatabase(firestore.getDatabaseName());
Expand All @@ -101,7 +91,6 @@ ApiFuture<Void> begin() {
@Override
public Void apply(BeginTransactionResponse beginTransactionResponse) {
transactionId = beginTransactionResponse.getTransaction();
pending = true;
return null;
}
},
Expand All @@ -110,14 +99,11 @@ public Void apply(BeginTransactionResponse beginTransactionResponse) {

/** Commits a transaction. */
ApiFuture<List<WriteResult>> commit() {
pending = false;
return super.commit(transactionId);
}

/** Rolls a transaction back and releases all read locks. */
ApiFuture<Void> rollback() {
pending = false;

RollbackRequest.Builder reqBuilder = RollbackRequest.newBuilder();
reqBuilder.setTransaction(transactionId);
reqBuilder.setDatabase(firestore.getDatabaseName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.google.cloud.firestore;

import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -29,12 +28,10 @@ public final class TransactionOptions {

private final int numberOfAttempts;
private final Executor executor;
private final ByteString previousTransactionId;

TransactionOptions(int maxAttempts, Executor executor, ByteString previousTransactionId) {
TransactionOptions(int maxAttempts, Executor executor) {
this.numberOfAttempts = maxAttempts;
this.executor = executor;
this.previousTransactionId = previousTransactionId;
}

public int getNumberOfAttempts() {
Expand All @@ -46,11 +43,6 @@ public Executor getExecutor() {
return executor;
}

@Nullable
ByteString getPreviousTransactionId() {
return previousTransactionId;
}

/**
* Create a default set of options suitable for most use cases. Transactions will be attempted 5
* times.
Expand All @@ -59,7 +51,7 @@ ByteString getPreviousTransactionId() {
*/
@Nonnull
public static TransactionOptions create() {
return new TransactionOptions(DEFAULT_NUM_ATTEMPTS, null, null);
return new TransactionOptions(DEFAULT_NUM_ATTEMPTS, null);
}

/**
Expand All @@ -71,7 +63,7 @@ public static TransactionOptions create() {
@Nonnull
public static TransactionOptions create(int numberOfAttempts) {
Preconditions.checkArgument(numberOfAttempts > 0, "You must allow at least one attempt");
return new TransactionOptions(numberOfAttempts, null, null);
return new TransactionOptions(numberOfAttempts, null);
}

/**
Expand All @@ -82,7 +74,7 @@ public static TransactionOptions create(int numberOfAttempts) {
*/
@Nonnull
public static TransactionOptions create(@Nonnull Executor executor) {
return new TransactionOptions(DEFAULT_NUM_ATTEMPTS, executor, null);
return new TransactionOptions(DEFAULT_NUM_ATTEMPTS, executor);
}

/**
Expand All @@ -95,6 +87,6 @@ public static TransactionOptions create(@Nonnull Executor executor) {
@Nonnull
public static TransactionOptions create(@Nonnull Executor executor, int numberOfAttempts) {
Preconditions.checkArgument(numberOfAttempts > 0, "You must allow at least one attempt");
return new TransactionOptions(numberOfAttempts, executor, null);
return new TransactionOptions(numberOfAttempts, executor);
}
}
Loading