Skip to content

Commit

Permalink
feat: transaction callable as functional interface (#1066)
Browse files Browse the repository at this point in the history
* feat: transaction callable as functional interface

Marks the transaction callable as a functional interface.

* samples: uses lambdas in samples
  • Loading branch information
thiagotnunes authored Apr 15, 2021
1 parent 1d4eed4 commit b036a77
Show file tree
Hide file tree
Showing 27 changed files with 952 additions and 1,591 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -60,16 +59,13 @@ public void run() {

private <R> R runTransaction(final AsyncWork<R> work) {
return delegate.run(
new TransactionCallable<R>() {
@Override
public R run(TransactionContext transaction) throws Exception {
try {
return work.doWorkAsync(transaction).get();
} catch (ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e.getCause());
} catch (InterruptedException e) {
throw SpannerExceptionFactory.propagateInterrupt(e);
}
transaction -> {
try {
return work.doWorkAsync(transaction).get();
} catch (ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e.getCause());
} catch (InterruptedException e) {
throw SpannerExceptionFactory.propagateInterrupt(e);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,9 @@ public CommitResponse writeWithOptions(Iterable<Mutation> mutations, Transaction
? (Collection<Mutation>) mutations
: Lists.newArrayList(mutations);
runner.run(
new TransactionRunner.TransactionCallable<Void>() {
@Override
public Void run(TransactionContext ctx) {
ctx.buffer(finalMutations);
return null;
}
ctx -> {
ctx.buffer(finalMutations);
return null;
});
return runner.getCommitResponse();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
*/
public interface TransactionRunner {
/** A unit of work to be performed in the context of a transaction. */
@FunctionalInterface
interface TransactionCallable<T> {
/**
* Invoked by the library framework to perform a single attempt of a transaction. This method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.TransactionContext;
import com.google.cloud.spanner.TransactionRunner;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.cloud.spanner.connection.StatementParser.ParsedStatement;
import com.google.cloud.spanner.connection.StatementParser.StatementType;
import com.google.common.base.Function;
Expand Down Expand Up @@ -357,12 +355,7 @@ public Long call() throws Exception {
writeTransaction = createWriteTransaction();
Long res =
writeTransaction.run(
new TransactionCallable<Long>() {
@Override
public Long run(TransactionContext transaction) throws Exception {
return transaction.executeUpdate(update.getStatement());
}
});
transaction -> transaction.executeUpdate(update.getStatement()));
state = UnitOfWorkState.COMMITTED;
return res;
} catch (Throwable t) {
Expand Down Expand Up @@ -404,31 +397,28 @@ private ApiFuture<long[]> executeTransactionalBatchUpdateAsync(
public long[] call() throws Exception {
writeTransaction = createWriteTransaction();
return writeTransaction.run(
new TransactionCallable<long[]>() {
@Override
public long[] run(TransactionContext transaction) throws Exception {
try {
long[] res =
transaction.batchUpdate(
Iterables.transform(
updates,
new Function<ParsedStatement, Statement>() {
@Override
public Statement apply(ParsedStatement input) {
return input.getStatement();
}
}));
transaction -> {
try {
long[] res =
transaction.batchUpdate(
Iterables.transform(
updates,
new Function<ParsedStatement, Statement>() {
@Override
public Statement apply(ParsedStatement input) {
return input.getStatement();
}
}));
state = UnitOfWorkState.COMMITTED;
return res;
} catch (Throwable t) {
if (t instanceof SpannerBatchUpdateException) {
// Batch update exceptions does not cause a rollback.
state = UnitOfWorkState.COMMITTED;
return res;
} catch (Throwable t) {
if (t instanceof SpannerBatchUpdateException) {
// Batch update exceptions does not cause a rollback.
state = UnitOfWorkState.COMMITTED;
} else {
state = UnitOfWorkState.COMMIT_FAILED;
}
throw t;
} else {
state = UnitOfWorkState.COMMIT_FAILED;
}
throw t;
}
});
}
Expand All @@ -455,12 +445,9 @@ public Void call() throws Exception {
writeTransaction = createWriteTransaction();
Void res =
writeTransaction.run(
new TransactionCallable<Void>() {
@Override
public Void run(TransactionContext transaction) throws Exception {
transaction.buffer(mutations);
return null;
}
transaction -> {
transaction.buffer(mutations);
return null;
});
state = UnitOfWorkState.COMMITTED;
return res;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.protobuf.ListValue;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.StructType;
Expand Down Expand Up @@ -205,13 +204,7 @@ private final class WriteRunnable implements Runnable {
@Override
public void run() {
TransactionRunner runner = client.readWriteTransaction();
runner.run(
new TransactionCallable<Long>() {
@Override
public Long run(TransactionContext transaction) {
return transaction.executeUpdate(UPDATE_STATEMENT);
}
});
runner.run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT));
}
}
}
Loading

0 comments on commit b036a77

Please sign in to comment.