Skip to content

Commit

Permalink
feat: add new Firestore.runAsyncTransaction (#103)
Browse files Browse the repository at this point in the history
Add the new methods runAsyncTransaction to com.google.cloud.firestore.Firestore
following the same pattern as runTransaction however allowing an AsyncFunction
to be provided instead of Function.
  • Loading branch information
kvakos authored Mar 20, 2020
1 parent 41b2a9a commit b28b660
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,43 @@ <T> ApiFuture<T> runTransaction(
@Nonnull final Transaction.Function<T> updateFunction,
@Nonnull TransactionOptions transactionOptions);

/**
* Executes the given updateFunction and then attempts to commit the changes applied within the
* transaction. If any document read within the transaction has changed, the updateFunction will
* be retried. If it fails to commit after 5 attempts, the transaction will fail. <br>
* <br>
* Running a transaction places locks all consumed documents. To unblock other clients, the
* Firestore backend automatically releases all locks after 60 seconds of inactivity and fails all
* transactions that last longer than 270 seconds (see <a
* href="https://firebase.google.com/docs/firestore/quotas#writes_and_transactions">Firestore
* Quotas</a>).
*
* @param updateFunction The function to execute within the transaction context.
* @return An ApiFuture that will be resolved with the result from updateFunction.
*/
@Nonnull
<T> ApiFuture<T> runAsyncTransaction(@Nonnull final Transaction.AsyncFunction<T> updateFunction);

/**
* Executes the given updateFunction and then attempts to commit the changes applied within the
* transaction. If any document read within the transaction has changed, the updateFunction will
* be retried. If it fails to commit after the maxmimum number of attemps specified in
* transactionOptions, the transaction will fail. <br>
* <br>
* Running a transaction places locks all consumed documents. To unblock other clients, the
* Firestore backend automatically releases all locks after 60 seconds of inactivity and fails all
* transactions that last longer than 270 seconds (see <a
* href="https://firebase.google.com/docs/firestore/quotas#writes_and_transactions">Firestore
* Quotas</a>).
*
* @param updateFunction The function to execute within the transaction context.
* @return An ApiFuture that will be resolved with the result from updateFunction.
*/
@Nonnull
<T> ApiFuture<T> runAsyncTransaction(
@Nonnull final Transaction.AsyncFunction<T> updateFunction,
@Nonnull TransactionOptions transactionOptions);

/**
* Retrieves multiple documents from Firestore.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,13 +299,30 @@ public <T> ApiFuture<T> runTransaction(
@Nonnull final Transaction.Function<T> updateFunction,
@Nonnull TransactionOptions transactionOptions) {
SettableApiFuture<T> resultFuture = SettableApiFuture.create();
runTransaction(new TransactionAsyncAdapter<>(updateFunction), resultFuture, transactionOptions);
return resultFuture;
}

@Nonnull
@Override
public <T> ApiFuture<T> runAsyncTransaction(
@Nonnull final Transaction.AsyncFunction<T> updateFunction) {
return runAsyncTransaction(updateFunction, TransactionOptions.create());
}

@Nonnull
@Override
public <T> ApiFuture<T> runAsyncTransaction(
@Nonnull final Transaction.AsyncFunction<T> updateFunction,
@Nonnull TransactionOptions transactionOptions) {
SettableApiFuture<T> resultFuture = SettableApiFuture.create();
runTransaction(updateFunction, resultFuture, transactionOptions);
return resultFuture;
}

/** Transaction functions that returns its result in the provided SettableFuture. */
private <T> void runTransaction(
final Transaction.Function<T> transactionCallback,
final Transaction.AsyncFunction<T> transactionCallback,
final SettableApiFuture<T> resultFuture,
final TransactionOptions options) {
// span is intentionally not ended here. It will be ended by runTransactionAttempt on success
Expand All @@ -317,7 +334,7 @@ private <T> void runTransaction(
}

private <T> void runTransactionAttempt(
final Transaction.Function<T> transactionCallback,
final Transaction.AsyncFunction<T> transactionCallback,
final SettableApiFuture<T> resultFuture,
final TransactionOptions options,
final Span span) {
Expand Down Expand Up @@ -384,7 +401,21 @@ private SettableApiFuture<T> invokeUserCallback() {
@Override
public void run() {
try {
callbackResult.set(transactionCallback.updateCallback(transaction));
ApiFuture<T> updateCallback = transactionCallback.updateCallback(transaction);
ApiFutures.addCallback(
updateCallback,
new ApiFutureCallback<T>() {
@Override
public void onFailure(Throwable t) {
callbackResult.setException(t);
}

@Override
public void onSuccess(T result) {
callbackResult.set(result);
}
},
MoreExecutors.directExecutor());
} catch (Throwable t) {
callbackResult.setException(t);
}
Expand Down Expand Up @@ -494,4 +525,23 @@ public void close() throws Exception {
firestoreClient.close();
closed = true;
}

private static class TransactionAsyncAdapter<T> implements Transaction.AsyncFunction<T> {
private final Transaction.Function<T> syncFunction;

public TransactionAsyncAdapter(Transaction.Function<T> syncFunction) {
this.syncFunction = syncFunction;
}

@Override
public ApiFuture<T> updateCallback(Transaction transaction) {
SettableApiFuture<T> callbackResult = SettableApiFuture.create();
try {
callbackResult.set(syncFunction.updateCallback(transaction));
} catch (Throwable e) {
callbackResult.setException(e);
}
return callbackResult;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public final class Transaction extends UpdateBuilder<Transaction> {
"Firestore transactions require all reads to be executed before all writes";

/**
* User callback that takes a Firestore Transaction
* User callback that takes a Firestore Transaction.
*
* @param <T> The result type of the user callback.
*/
Expand All @@ -51,6 +51,16 @@ public interface Function<T> {
T updateCallback(Transaction transaction) throws Exception;
}

/**
* User callback that takes a Firestore Async Transaction.
*
* @param <T> The result type of the user async callback.
*/
public interface AsyncFunction<T> {

ApiFuture<T> updateCallback(Transaction transaction);
}

private final ByteString previousTransactionId;
private ByteString transactionId;
private boolean pending;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static com.google.cloud.firestore.LocalFirestoreHelper.set;
import static com.google.cloud.firestore.LocalFirestoreHelper.update;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -132,6 +133,33 @@ public String updateCallback(Transaction transaction) {
assertEquals(commit(TRANSACTION_ID), requests.get(1));
}

@Test
public void returnsValueAsync() throws Exception {
doReturn(beginResponse())
.doReturn(commitResponse(0, 0))
.when(firestoreMock)
.sendRequest(requestCapture.capture(), Matchers.<UnaryCallable<Message, Message>>any());

ApiFuture<String> transaction =
firestoreMock.runAsyncTransaction(
new Transaction.AsyncFunction<String>() {
@Override
public ApiFuture<String> updateCallback(Transaction transaction) {
Assert.assertEquals("user_provided", Thread.currentThread().getName());
return ApiFutures.immediateFuture("foo");
}
},
options);

assertEquals("foo", transaction.get());

List<Message> requests = requestCapture.getAllValues();
assertEquals(2, requests.size());

assertEquals(begin(), requests.get(0));
assertEquals(commit(TRANSACTION_ID), requests.get(1));
}

@Test
public void canReturnNull() throws Exception {
doReturn(beginResponse())
Expand All @@ -154,6 +182,28 @@ public String updateCallback(Transaction transaction) {
assertEquals(null, transaction.get());
}

@Test
public void canReturnNullAsync() throws Exception {
doReturn(beginResponse())
.doReturn(ApiFutures.immediateFailedFuture(new Exception()))
.doReturn(beginResponse(ByteString.copyFromUtf8("foo2")))
.doReturn(commitResponse(0, 0))
.when(firestoreMock)
.sendRequest(requestCapture.capture(), Matchers.<UnaryCallable<Message, Message>>any());

ApiFuture<String> transaction =
firestoreMock.runAsyncTransaction(
new Transaction.AsyncFunction<String>() {
@Override
public ApiFuture<String> updateCallback(Transaction transaction) {
return ApiFutures.immediateFuture(null);
}
},
options);

assertNull(transaction.get());
}

@Test
public void rollbackOnCallbackError() throws Exception {
doReturn(beginResponse())
Expand Down Expand Up @@ -185,6 +235,37 @@ public String updateCallback(Transaction transaction) throws Exception {
assertEquals(rollback(), requests.get(1));
}

@Test
public void rollbackOnCallbackErrorAsync() throws Exception {
doReturn(beginResponse())
.doReturn(rollbackResponse())
.when(firestoreMock)
.sendRequest(requestCapture.capture(), Matchers.<UnaryCallable<Message, Message>>any());

ApiFuture<String> transaction =
firestoreMock.runAsyncTransaction(
new Transaction.AsyncFunction<String>() {
@Override
public ApiFuture<String> updateCallback(Transaction transaction) {
return ApiFutures.immediateFailedFuture(new Exception("Expected exception"));
}
},
options);

try {
transaction.get();
fail();
} catch (Exception e) {
assertTrue(e.getMessage().endsWith("Expected exception"));
}

List<Message> requests = requestCapture.getAllValues();
assertEquals(2, requests.size());

assertEquals(begin(), requests.get(0));
assertEquals(rollback(), requests.get(1));
}

@Test
public void noRollbackOnBeginFailure() throws Exception {
doReturn(ApiFutures.immediateFailedFuture(new Exception("Expected exception")))
Expand Down Expand Up @@ -213,6 +294,34 @@ public String updateCallback(Transaction transaction) {
assertEquals(1, requests.size());
}

@Test
public void noRollbackOnBeginFailureAsync() throws Exception {
doReturn(ApiFutures.immediateFailedFuture(new Exception("Expected exception")))
.when(firestoreMock)
.sendRequest(requestCapture.capture(), Matchers.<UnaryCallable<Message, Message>>any());

ApiFuture<String> transaction =
firestoreMock.runAsyncTransaction(
new Transaction.AsyncFunction<String>() {
@Override
public ApiFuture<String> updateCallback(Transaction transaction) {
fail();
return null;
}
},
options);

try {
transaction.get();
fail();
} catch (Exception e) {
assertTrue(e.getMessage().endsWith("Expected exception"));
}

List<Message> requests = requestCapture.getAllValues();
assertEquals(1, requests.size());
}

@Test
public void limitsRetriesWithFailure() throws Exception {
doReturn(beginResponse(ByteString.copyFromUtf8("foo1")))
Expand Down Expand Up @@ -343,6 +452,40 @@ public DocumentSnapshot updateCallback(Transaction transaction)
assertEquals(commit(TRANSACTION_ID), requests.get(2));
}

@Test
public void getDocumentAsync() throws Exception {
doReturn(beginResponse())
.doReturn(commitResponse(0, 0))
.when(firestoreMock)
.sendRequest(requestCapture.capture(), Matchers.<UnaryCallable<Message, Message>>any());

doAnswer(getAllResponse(SINGLE_FIELD_PROTO))
.when(firestoreMock)
.streamRequest(
requestCapture.capture(),
streamObserverCapture.capture(),
Matchers.<ServerStreamingCallable<Message, Message>>any());

ApiFuture<DocumentSnapshot> transaction =
firestoreMock.runAsyncTransaction(
new Transaction.AsyncFunction<DocumentSnapshot>() {
@Override
public ApiFuture<DocumentSnapshot> updateCallback(Transaction transaction) {
return transaction.get(documentReference);
}
},
options);

assertEquals("doc", transaction.get().getId());

List<Message> requests = requestCapture.getAllValues();
assertEquals(3, requests.size());

assertEquals(begin(), requests.get(0));
assertEquals(get(TRANSACTION_ID), requests.get(1));
assertEquals(commit(TRANSACTION_ID), requests.get(2));
}

@Test
public void getMultipleDocuments() throws Exception {
final DocumentReference doc1 = firestoreMock.document("coll/doc1");
Expand Down

0 comments on commit b28b660

Please sign in to comment.