Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): add support of multiplexed session support in writeAtleastOnce mutations #10646

Merged
merged 4 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 36 additions & 12 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3555,15 +3555,21 @@ func TestClient_ApplyAtLeastOnceReuseSession(t *testing.T) {
if err != nil {
t.Fatal(err)
}
expectedIdleSesions := sp.incStep
if isMultiplexEnabled {
expectedIdleSesions = 0
}
sp.mu.Lock()
if g, w := uint64(sp.idleList.Len())+sp.createReqs, sp.incStep; g != w {
if g, w := uint64(sp.idleList.Len())+sp.createReqs, expectedIdleSesions; g != w {
sp.mu.Unlock()
t.Fatalf("idle session count mismatch:\nGot: %v\nWant: %v", g, w)
}
expectedSessions := sp.incStep
expectedSessions := expectedIdleSesions
if isMultiplexEnabled {
expectedSessions++
}
if g, w := uint64(len(server.TestSpanner.DumpSessions())), expectedSessions; g != w {
sp.mu.Unlock()
t.Fatalf("server session count mismatch:\nGot: %v\nWant: %v", g, w)
}
sp.mu.Unlock()
Expand Down Expand Up @@ -3602,14 +3608,20 @@ func TestClient_ApplyAtLeastOnceInvalidArgument(t *testing.T) {
t.Fatal(err)
}
sp.mu.Lock()
if g, w := uint64(sp.idleList.Len())+sp.createReqs, sp.incStep; g != w {
expectedIdleSesions := sp.incStep
if isMultiplexEnabled {
expectedIdleSesions = 0
}
if g, w := uint64(sp.idleList.Len())+sp.createReqs, expectedIdleSesions; g != w {
sp.mu.Unlock()
t.Fatalf("idle session count mismatch:\nGot: %v\nWant: %v", g, w)
}
var countMuxSess uint64
if isMultiplexEnabled {
countMuxSess = 1
}
if g, w := uint64(len(server.TestSpanner.DumpSessions())), sp.incStep+countMuxSess; g != w {
if g, w := uint64(len(server.TestSpanner.DumpSessions())), expectedIdleSesions+countMuxSess; g != w {
sp.mu.Unlock()
t.Fatalf("server session count mismatch:\nGot: %v\nWant: %v", g, w)
}
sp.mu.Unlock()
Expand Down Expand Up @@ -6277,18 +6289,30 @@ func TestClient_ApplyAtLeastOnceExcludeTxnFromChangeStreams(t *testing.T) {
if err != nil {
t.Fatal(err)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{

expectedReqs := []interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.CommitRequest{}}, requests); err != nil {
t.Fatal(err)
&sppb.CommitRequest{},
}
muxCreateBuffer := 0
if isMultiplexEnabled {
muxCreateBuffer = 1
expectedReqs = []interface{}{
&sppb.CreateSessionRequest{},
&sppb.CommitRequest{},
}
}
if !requests[1+muxCreateBuffer].(*sppb.CommitRequest).Transaction.(*sppb.CommitRequest_SingleUseTransaction).SingleUseTransaction.ExcludeTxnFromChangeStreams {
t.Fatal("Transaction is not set to be excluded from change streams")
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests(expectedReqs, requests); err != nil {
t.Fatal(err)
}
for _, req := range requests {
if request, ok := req.(*sppb.CommitRequest); ok {
if !request.Transaction.(*sppb.CommitRequest_SingleUseTransaction).SingleUseTransaction.ExcludeTxnFromChangeStreams {
t.Fatal("Transaction is not set to be excluded from change streams")
}
if !testEqual(isMultiplexEnabled, strings.Contains(request.GetSession(), "multiplexed")) {
t.Errorf("TestClient_ApplyAtLeastOnceExcludeTxnFromChangeStreams expected multiplexed session to be used, got: %v", request.GetSession())
}
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion spanner/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1890,7 +1890,7 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta
for {
if sh == nil || sh.getID() == "" || sh.getClient() == nil {
// No usable session for doing the commit, take one from pool.
sh, err = t.sp.take(ctx)
sh, err = t.sp.takeMultiplexed(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that it falls back to regular sessions in case there is an issue with multiplexed sessions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it will fallback to regular session, that logic is handled in takeMultiplexed because that behaviour is common

if err != nil {
// sessionPool.Take already retries for session
// creations/retrivals.
Expand All @@ -1912,6 +1912,7 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta
RequestOptions: createRequestOptions(t.commitPriority, "", t.transactionTag),
})
if err != nil && !isAbortedErr(err) {
// should not be the case with multiplexed sessions
if isSessionNotFoundError(err) {
// Discard the bad session.
sh.destroy()
Expand Down
23 changes: 20 additions & 3 deletions spanner/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,30 @@ func TestApply_Single(t *testing.T) {
if _, e := client.Apply(ctx, ms, ApplyAtLeastOnce()); e != nil {
t.Fatalf("applyAtLeastOnce retry on abort, got %v, want nil.", e)
}

if _, err := shouldHaveReceived(server.TestSpanner, []interface{}{
requests := drainRequestsFromServer(server.TestSpanner)
expectedReqs := []interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.CommitRequest{},
}); err != nil {
}
if isMultiplexEnabled {
expectedReqs = []interface{}{
&sppb.CreateSessionRequest{},
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
&sppb.CommitRequest{},
}
}
if err := compareRequests(expectedReqs, requests); err != nil {
t.Fatal(err)
}
for _, s := range requests {
switch s.(type) {
case *sppb.CommitRequest:
req, _ := s.(*sppb.CommitRequest)
// Validate the session is multiplexed
if !testEqual(isMultiplexEnabled, strings.Contains(req.Session, "multiplexed")) {
t.Errorf("TestApply_Single expected multiplexed session to be used, got: %v", req.Session)
}
}
}
}

// Transaction retries on abort.
Expand Down
Loading