Skip to content

Commit

Permalink
feat(spanner): single-RPC, batched commit of mutation groups (#12930)
Browse files Browse the repository at this point in the history
`spanner::Client::CommitAtLeastOnce(std::vector<Mutations>)`

Add support for committing mutation groups, batched efficiently into
transactions with at-least-once semantics, using a single RPC.

All mutations within a group are committed atomically, but there is
no atomicity or ordering between groups, so they must be independent
of each other.

Partial failure is possible. That is, some batches can commit while
others fail. The results of individual batches are returned via the
response stream as their transactions complete.

Mutation groups are not replay protected. That is, it is possible
that any mutation group may be applied more than once. They should
be idempotent to avoid replay complications.
  • Loading branch information
devbww authored Oct 20, 2023
1 parent 34db6a5 commit 11e9d0f
Show file tree
Hide file tree
Showing 13 changed files with 541 additions and 0 deletions.
1 change: 1 addition & 0 deletions google/cloud/spanner/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ cc_library(
":google_cloud_cpp_spanner_mocks",
":spanner_client_testing",
"//:common",
"//google/cloud:google_cloud_cpp_mocks",
"//google/cloud/testing_util:google_cloud_cpp_testing_grpc_private",
"//google/cloud/testing_util:google_cloud_cpp_testing_private",
"@com_google_absl//absl/numeric:int128",
Expand Down
9 changes: 9 additions & 0 deletions google/cloud/spanner/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -307,13 +307,22 @@ StatusOr<CommitResult> Client::Commit(Transaction transaction,
StatusOr<CommitResult> Client::CommitAtLeastOnce(
Transaction::ReadWriteOptions transaction_options, Mutations mutations,
Options opts) {
// Note: This implementation differs from `CommitAtLeastOnce({mutations})`
// for historical reasons.
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), opts_));
return conn_->Commit({spanner_internal::MakeSingleUseCommitTransaction(
std::move(transaction_options)),
std::move(mutations),
CommitOptions(internal::CurrentOptions())});
}

BatchedCommitResultStream Client::CommitAtLeastOnce(
std::vector<Mutations> mutation_groups, Options opts) {
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), opts_));
return conn_->BatchWrite(
{std::move(mutation_groups), internal::CurrentOptions()});
}

Status Client::Rollback(Transaction transaction, Options opts) {
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), opts_));
return conn_->Rollback({std::move(transaction)});
Expand Down
37 changes: 37 additions & 0 deletions google/cloud/spanner/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,43 @@ class Client {
Transaction::ReadWriteOptions transaction_options, Mutations mutations,
Options opts = {});

/**
* Commits the mutation groups, batched efficiently into transactions with
* at-least-once semantics, using a single RPC.
*
* All mutations within a group are committed atomically. There is no
* atomicity or ordering between groups however, so all groups must be
* independent of each other.
*
* Partial failure is possible. That is, some groups can commit successfully
* while others fail. The results of individual batches are returned via the
* response stream as their transactions complete.
*
* Mutation groups are not replay protected. That is, it is possible that
* each mutation group may be applied more than once. If the mutations are
* not idempotent, this may lead to a failure. For example, replays of an
* insert mutation might produce an "already exists" error, or, if you use
* generated or commit-timestamp-based keys, might result in additional
* rows being added to the mutation's table. We recommend structuring your
* mutation groups to be idempotent to avoid this issue.
*
* @note Prefer the `Commit` overloads if you want exactly-once semantics
* or want to automatically reapply mutations after a `kAborted` error.
*
* @par Example
* @snippet samples.cc commit-at-least-once-batched
*
* @param mutation_groups The mutation groups to be batched into temporary
* transactions and committed.
* @param opts (optional) The options to use for this call. Expected options
* include any of the following types:
* - `google::cloud::spanner::RequestPriorityOption`
* - `google::cloud::spanner::RequestTagOption`
* - `google::cloud::spanner::TransactionTagOption`
*/
BatchedCommitResultStream CommitAtLeastOnce(
std::vector<Mutations> mutation_groups, Options opts = {});

/**
* Rolls back a read-write transaction, releasing any locks it holds.
*
Expand Down
81 changes: 81 additions & 0 deletions google/cloud/spanner/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "google/cloud/spanner/client.h"
#include "google/cloud/mocks/mock_stream_range.h"
#include "google/cloud/spanner/connection.h"
#include "google/cloud/spanner/internal/defaults.h"
#include "google/cloud/spanner/mocks/mock_spanner_connection.h"
Expand All @@ -33,6 +34,7 @@
#include <cstdint>
#include <string>
#include <utility>
#include <vector>

namespace google {
namespace cloud {
Expand All @@ -47,14 +49,17 @@ using ::google::cloud::testing_util::IsOkAndHolds;
using ::google::cloud::testing_util::IsProtoEqual;
using ::google::cloud::testing_util::StatusIs;
using ::google::protobuf::TextFormat;
using ::testing::AllOf;
using ::testing::ByMove;
using ::testing::DoAll;
using ::testing::ElementsAre;
using ::testing::Eq;
using ::testing::Field;
using ::testing::HasSubstr;
using ::testing::Pair;
using ::testing::Return;
using ::testing::SaveArg;
using ::testing::SizeIs;
using ::testing::UnorderedElementsAre;

TEST(ClientTest, CopyAndMove) {
Expand Down Expand Up @@ -978,6 +983,82 @@ TEST(ClientTest, CommitAtLeastOnce) {
EXPECT_EQ(*timestamp, result->commit_timestamp);
}

TEST(ClientTest, CommitAtLeastOnceBatched) {
std::string const request_tag = "action=upsert";
std::string const transaction_tag = "app=cart,env=dev";
auto const timestamp =
spanner_internal::TimestampFromRFC3339("2023-09-27T06:11:34.335Z");
ASSERT_STATUS_OK(timestamp);
std::vector<Mutations> const mutation_groups = {
{MakeInsertOrUpdateMutation("table", {"col1", "col2"}, 10, 20)},
{MakeInsertOrUpdateMutation("table", {"col1", "col2"}, 11, 21)},
};

auto conn = std::make_shared<MockConnection>();
EXPECT_CALL(*conn, options()).WillRepeatedly(Return(Options{}));
EXPECT_CALL(*conn, BatchWrite)
.WillOnce([&](Connection::BatchWriteParams const& params) {
EXPECT_THAT(params.mutation_groups, SizeIs(2));
EXPECT_FALSE(params.options.has<RequestPriorityOption>());
EXPECT_FALSE(params.options.has<RequestTagOption>());
EXPECT_FALSE(params.options.has<TransactionTagOption>());
return mocks::MakeStreamRange<BatchedCommitResult>(
{{{0, 1}, timestamp}});
})
.WillOnce([&](Connection::BatchWriteParams const& params) {
EXPECT_THAT(params.mutation_groups, SizeIs(2));
EXPECT_TRUE(params.options.has<RequestPriorityOption>());
EXPECT_EQ(params.options.get<RequestPriorityOption>(),
RequestPriority::kHigh);
EXPECT_TRUE(params.options.has<RequestTagOption>());
EXPECT_EQ(params.options.get<RequestTagOption>(), request_tag);
EXPECT_FALSE(params.options.has<TransactionTagOption>());
return mocks::MakeStreamRange<BatchedCommitResult>(
{{{0, 1}, timestamp}});
})
.WillOnce([&](Connection::BatchWriteParams const& params) {
EXPECT_THAT(params.mutation_groups, SizeIs(2));
EXPECT_FALSE(params.options.has<RequestPriorityOption>());
EXPECT_FALSE(params.options.has<RequestTagOption>());
EXPECT_TRUE(params.options.has<TransactionTagOption>());
EXPECT_EQ(params.options.get<TransactionTagOption>(), transaction_tag);
return mocks::MakeStreamRange<BatchedCommitResult>(
{{{0, 1}, timestamp}});
})
.WillOnce([](Connection::BatchWriteParams const& params) {
EXPECT_THAT(params.mutation_groups, SizeIs(2));
EXPECT_FALSE(params.options.has<RequestPriorityOption>());
EXPECT_FALSE(params.options.has<RequestTagOption>());
EXPECT_FALSE(params.options.has<TransactionTagOption>());
return mocks::MakeStreamRange<BatchedCommitResult>(
{}, Status(StatusCode::kInvalidArgument, "oops"));
});

Client client(conn);
for (auto const& opts : {
Options{},
Options{}
.set<RequestPriorityOption>(RequestPriority::kHigh)
.set<RequestTagOption>(request_tag),
Options{}.set<TransactionTagOption>(transaction_tag),
}) {
auto commit_results = client.CommitAtLeastOnce(mutation_groups, opts);
auto it = commit_results.begin();
ASSERT_NE(it, commit_results.end());
EXPECT_THAT(
*it,
IsOkAndHolds(AllOf(
Field(&BatchedCommitResult::indexes, ElementsAre(0, 1)),
Field(&BatchedCommitResult::commit_timestamp, Eq(timestamp)))));
EXPECT_EQ(++it, commit_results.end());
}
auto commit_results = client.CommitAtLeastOnce(mutation_groups, Options{});
auto it = commit_results.begin();
ASSERT_NE(it, commit_results.end());
EXPECT_THAT(*it, StatusIs(StatusCode::kInvalidArgument, "oops"));
EXPECT_EQ(++it, commit_results.end());
}

TEST(ClientTest, ProfileQuerySuccess) {
auto conn = std::make_shared<MockConnection>();
Client client(conn);
Expand Down
24 changes: 24 additions & 0 deletions google/cloud/spanner/commit_result.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

#include "google/cloud/spanner/timestamp.h"
#include "google/cloud/spanner/version.h"
#include "google/cloud/status_or.h"
#include "google/cloud/stream_range.h"
#include "absl/types/optional.h"
#include <cstddef>
#include <cstdint>
#include <vector>

namespace google {
namespace cloud {
Expand All @@ -43,6 +47,26 @@ struct CommitResult {
absl::optional<CommitStats> commit_stats;
};

/**
* The result of committing a Transaction containing a batch of mutation
* groups. See the batched form of `Client::CommitAtLeastOnce()`.
*/
struct BatchedCommitResult {
/// The mutation groups applied in this batch. Each value is an index into
/// the `std::vector<Mutations>` passed to `Client::CommitAtLeastOnce()`.
std::vector<std::size_t> indexes;

/// If OK, the Cloud Spanner timestamp at which the transaction committed,
/// and otherwise the reason why the commit failed.
StatusOr<Timestamp> commit_timestamp;
};

/**
* Represents the stream of `BatchedCommitResult` objects returned from the
* batched `Client::CommitAtLeastOnce()`.
*/
using BatchedCommitResultStream = StreamRange<BatchedCommitResult>;

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace spanner
} // namespace cloud
Expand Down
6 changes: 6 additions & 0 deletions google/cloud/spanner/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ Status Connection::Rollback(RollbackParams) {
return Status(StatusCode::kUnimplemented, "not implemented");
}

// NOLINTNEXTLINE(performance-unnecessary-value-param)
BatchedCommitResultStream Connection::BatchWrite(BatchWriteParams) {
return internal::MakeStreamRange<BatchedCommitResult>(
[] { return Status(StatusCode::kUnimplemented, "not implemented"); });
}

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace spanner
} // namespace cloud
Expand Down
9 changes: 9 additions & 0 deletions google/cloud/spanner/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ class Connection {
struct RollbackParams {
Transaction transaction;
};

/// Wrap the arguments to `BatchWrite()`.
struct BatchWriteParams {
std::vector<Mutations> mutation_groups;
Options options;
};
///@}

/// Returns the options used by the Connection.
Expand Down Expand Up @@ -171,6 +177,9 @@ class Connection {

/// Defines the interface for `Client::Rollback()`
virtual Status Rollback(RollbackParams);

/// Defines the interface for batched `Client::CommitAtLeastOnce()`
virtual BatchedCommitResultStream BatchWrite(BatchWriteParams);
};

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
Loading

0 comments on commit 11e9d0f

Please sign in to comment.