Skip to content

Commit

Permalink
spanner: use ResourceInfo to extract error
Browse files Browse the repository at this point in the history
Detecting a 'Session not found' error should be extracted from the
ResourceInfo of the underlying statusErr instead of depending on the
error message.

Updates #1814.

Change-Id: I99e448ef4242573b8fb00e0020de839c4bd39660
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/52791
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Hengfeng Li <hengfeng@google.com>
  • Loading branch information
olavloite committed Mar 5, 2020
1 parent 8d3876f commit 863a84d
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 36 deletions.
30 changes: 15 additions & 15 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func TestClient_Single_SessionNotFound(t *testing.T) {
defer teardown()
server.TestSpanner.PutExecutionTime(
MethodExecuteStreamingSql,
SimulatedExecutionTime{Errors: []error{status.Errorf(codes.NotFound, "Session not found")}},
SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
)
ctx := context.Background()
iter := client.Single().Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestClient_Single_NonRetryableErrorOnPartialResultSet(t *testing.T) {
SelectSingerIDAlbumIDAlbumTitleFromAlbums,
PartialResultSetExecutionTime{
ResumeToken: EncodeResumeToken(3),
Err: status.Errorf(codes.NotFound, "Session not found"),
Err: newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s"),
},
)
ctx := context.Background()
Expand Down Expand Up @@ -593,9 +593,9 @@ func TestClient_ReadOnlyTransaction_SessionNotFoundOnExecuteStreamingSql(t *test
// transaction, as we would need to start a new transaction on a new
// session.
err := testReadOnlyTransaction(t, map[string]SimulatedExecutionTime{
MethodExecuteStreamingSql: {Errors: []error{status.Errorf(codes.NotFound, "Session not found")}},
MethodExecuteStreamingSql: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
})
want := spannerErrorf(codes.NotFound, "Session not found")
want := toSpannerError(newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s"))
if err == nil {
t.Fatalf("missing expected error\nGot: nil\nWant: %v", want)
}
Expand Down Expand Up @@ -633,7 +633,7 @@ func TestClient_ReadOnlyTransaction_SessionNotFoundOnBeginTransaction(t *testing
if err := testReadOnlyTransaction(
t,
map[string]SimulatedExecutionTime{
MethodBeginTransaction: {Errors: []error{status.Error(codes.NotFound, "Session not found")}},
MethodBeginTransaction: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
},
); err != nil {
t.Fatal(err)
Expand All @@ -653,7 +653,7 @@ func TestClient_ReadOnlyTransaction_SessionNotFoundOnBeginTransaction_WithMaxOne
defer teardown()
server.TestSpanner.PutExecutionTime(
MethodBeginTransaction,
SimulatedExecutionTime{Errors: []error{status.Error(codes.NotFound, "Session not found")}},
SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
)
tx := client.ReadOnlyTransaction()
defer tx.Close()
Expand Down Expand Up @@ -694,7 +694,7 @@ func TestClient_ReadWriteTransactionCommitAborted(t *testing.T) {
func TestClient_ReadWriteTransaction_SessionNotFoundOnCommit(t *testing.T) {
t.Parallel()
if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
MethodCommitTransaction: {Errors: []error{status.Error(codes.NotFound, "Session not found")}},
MethodCommitTransaction: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
}, 2); err != nil {
t.Fatal(err)
}
Expand All @@ -705,7 +705,7 @@ func TestClient_ReadWriteTransaction_SessionNotFoundOnBeginTransaction(t *testin
// We expect only 1 attempt, as the 'Session not found' error is already
//handled in the session pool where the session is prepared.
if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
MethodBeginTransaction: {Errors: []error{status.Error(codes.NotFound, "Session not found")}},
MethodBeginTransaction: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
}, 1); err != nil {
t.Fatal(err)
}
Expand All @@ -720,7 +720,7 @@ func TestClient_ReadWriteTransaction_SessionNotFoundOnBeginTransactionWithEmptyS
if err := testReadWriteTransactionWithConfig(t, ClientConfig{
SessionPoolConfig: SessionPoolConfig{WriteSessions: 0.0},
}, map[string]SimulatedExecutionTime{
MethodBeginTransaction: {Errors: []error{status.Error(codes.NotFound, "Session not found")}},
MethodBeginTransaction: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
}, 1); err != nil {
t.Fatal(err)
}
Expand All @@ -729,7 +729,7 @@ func TestClient_ReadWriteTransaction_SessionNotFoundOnBeginTransactionWithEmptyS
func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteStreamingSql(t *testing.T) {
t.Parallel()
if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
MethodExecuteStreamingSql: {Errors: []error{status.Error(codes.NotFound, "Session not found")}},
MethodExecuteStreamingSql: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
}, 2); err != nil {
t.Fatal(err)
}
Expand All @@ -742,7 +742,7 @@ func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteUpdate(t *testing.T
defer teardown()
server.TestSpanner.PutExecutionTime(
MethodExecuteSql,
SimulatedExecutionTime{Errors: []error{status.Error(codes.NotFound, "Session not found")}},
SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
)
ctx := context.Background()
var attempts int
Expand Down Expand Up @@ -772,7 +772,7 @@ func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteBatchUpdate(t *test
defer teardown()
server.TestSpanner.PutExecutionTime(
MethodExecuteBatchDml,
SimulatedExecutionTime{Errors: []error{status.Error(codes.NotFound, "Session not found")}},
SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
)
ctx := context.Background()
var attempts int
Expand Down Expand Up @@ -1292,15 +1292,15 @@ func TestReadWriteTransaction_WrapSessionNotFoundError(t *testing.T) {
defer teardown()
server.TestSpanner.PutExecutionTime(MethodBeginTransaction,
SimulatedExecutionTime{
Errors: []error{status.Error(codes.NotFound, "Session not found")},
Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
})
server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql,
SimulatedExecutionTime{
Errors: []error{status.Error(codes.NotFound, "Session not found")},
Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
})
server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
SimulatedExecutionTime{
Errors: []error{status.Error(codes.NotFound, "Session not found")},
Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
})
msg := "query failed"
numAttempts := 0
Expand Down
23 changes: 23 additions & 0 deletions spanner/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"

"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -171,3 +172,25 @@ func ErrDesc(err error) string {
}
return se.Desc
}

// extractResourceType extracts the resource type from any ResourceInfo detail
// included in the error.
func extractResourceType(err error) (string, bool) {
var s *status.Status
var se *Error
if errorAs(err, &se) {
// Unwrap statusError.
s = status.Convert(se.Unwrap())
} else {
s = status.Convert(err)
}
if s == nil {
return "", false
}
for _, detail := range s.Details() {
if resourceInfo, ok := detail.(*errdetails.ResourceInfo); ok {
return resourceInfo.ResourceType, true
}
}
return "", false
}
39 changes: 36 additions & 3 deletions spanner/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,15 @@ func initIntegrationTests() (cleanup func()) {
func TestIntegration_InitSessionPool(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Set up testing environment.
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
// Set up an empty testing environment.
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, []string{})
defer cleanup()
sp := client.idleSessions
sp.mu.Lock()
want := sp.MinOpened
sp.mu.Unlock()
var numOpened int
loop:
for {
select {
case <-ctx.Done():
Expand All @@ -248,10 +249,42 @@ func TestIntegration_InitSessionPool(t *testing.T) {
numOpened = sp.idleList.Len() + sp.idleWriteList.Len()
sp.mu.Unlock()
if uint64(numOpened) == want {
return
break loop
}
}
}
// Delete all sessions in the pool on the backend and then try to execute a
// simple query. The 'Session not found' error should cause an automatic
// retry of the read-only transaction.
sp.mu.Lock()
s := sp.idleList.Front()
for {
if s == nil {
break
}
// This will delete the session on the backend without removing it
// from the pool.
s.Value.(*session).delete(context.Background())
s = s.Next()
}
sp.mu.Unlock()
sql := "SELECT 1, 'FOO', 'BAR'"
tx := client.ReadOnlyTransaction()
defer tx.Close()
iter := tx.Query(context.Background(), NewStatement(sql))
rows, err := readAll(iter)
if err != nil {
t.Fatalf("Unexpected error for query %q: %v", sql, err)
}
if got, want := len(rows), 1; got != want {
t.Fatalf("Row count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
}
if got, want := len(rows[0]), 3; got != want {
t.Fatalf("Column count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
}
if got, want := rows[0][0].(int64), int64(1); got != want {
t.Fatalf("Column value mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
}
}

// Test SingleUse transaction.
Expand Down
11 changes: 10 additions & 1 deletion spanner/internal/testutil/inmem_spanner_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,11 +433,20 @@ func (s *inMemSpannerServer) findSession(name string) (*spannerpb.Session, error
defer s.mu.Unlock()
session := s.sessions[name]
if session == nil {
return nil, gstatus.Error(codes.NotFound, fmt.Sprintf("Session not found: %s", name))
return nil, newSessionNotFoundError(name)
}
return session, nil
}

// sessionResourceType is the type name of Spanner sessions.
const sessionResourceType = "type.googleapis.com/google.spanner.v1.Session"

func newSessionNotFoundError(name string) error {
s := gstatus.Newf(codes.NotFound, "Session not found: Session with id %s not found", name)
s, _ = s.WithDetails(&errdetails.ResourceInfo{ResourceType: sessionResourceType, ResourceName: name})
return s.Err()
}

func (s *inMemSpannerServer) updateSessionLastUseTime(session string) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions spanner/internal/testutil/mockclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (m *MockCloudSpannerClient) GetSession(ctx context.Context, r *sppb.GetSess
defer m.mu.Unlock()
m.pings = append(m.pings, r.Name)
if _, ok := m.sessions[r.Name]; !ok {
return nil, status.Errorf(codes.NotFound, fmt.Sprintf("Session not found: %v", r.Name))
return nil, newSessionNotFoundError(r.Name)
}
return &sppb.Session{Name: r.Name}, nil
}
Expand All @@ -128,7 +128,7 @@ func (m *MockCloudSpannerClient) DeleteSession(ctx context.Context, r *sppb.Dele
defer m.mu.Unlock()
if _, ok := m.sessions[r.Name]; !ok {
// Session not found.
return &empty.Empty{}, status.Errorf(codes.NotFound, fmt.Sprintf("Session not found: %v", r.Name))
return &empty.Empty{}, newSessionNotFoundError(r.Name)
}
// Delete session from in-memory table.
delete(m.sessions, r.Name)
Expand Down
16 changes: 9 additions & 7 deletions spanner/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"math"
"math/rand"
"runtime/debug"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -1522,16 +1521,19 @@ func minUint64(a, b uint64) uint64 {
return a
}

// sessionResourceType is the type name of Spanner sessions.
const sessionResourceType = "type.googleapis.com/google.spanner.v1.Session"

// isSessionNotFoundError returns true if the given error is a
// `Session not found` error.
func isSessionNotFoundError(err error) bool {
if err == nil {
return false
}
// We are checking specifically for the error message `Session not found`,
// as the error could also be a `Database not found`. The latter should
// cause the session pool to stop preparing sessions for read/write
// transactions, while the former should not.
// TODO: once gRPC can return auxiliary error information, stop parsing the error message.
return ErrCode(err) == codes.NotFound && strings.Contains(err.Error(), "Session not found")
if ErrCode(err) == codes.NotFound {
if rt, ok := extractResourceType(err); ok {
return rt == sessionResourceType
}
}
return false
}
15 changes: 11 additions & 4 deletions spanner/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,18 @@ import (

. "cloud.google.com/go/spanner/internal/testutil"
"google.golang.org/api/iterator"
"google.golang.org/genproto/googleapis/rpc/errdetails"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func newSessionNotFoundError(name string) error {
s := status.Newf(codes.NotFound, "Session not found: Session with id %s not found", name)
s, _ = s.WithDetails(&errdetails.ResourceInfo{ResourceType: sessionResourceType, ResourceName: name})
return s.Err()
}

// TestSessionPoolConfigValidation tests session pool config validation.
func TestSessionPoolConfigValidation(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -366,7 +373,7 @@ func TestTakeFromIdleListChecked(t *testing.T) {
// will create a new session.
server.TestSpanner.PutExecutionTime(MethodGetSession,
SimulatedExecutionTime{
Errors: []error{status.Errorf(codes.NotFound, "Session not found")},
Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
})

// Force ping by setting check time in the past.
Expand Down Expand Up @@ -446,7 +453,7 @@ func TestTakeFromIdleWriteListChecked(t *testing.T) {
// will create a new session.
server.TestSpanner.PutExecutionTime(MethodGetSession,
SimulatedExecutionTime{
Errors: []error{status.Errorf(codes.NotFound, "Session not found")},
Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
})

// Force ping by setting check time in the past.
Expand Down Expand Up @@ -1131,7 +1138,7 @@ func TestSessionNotFoundOnPrepareSession(t *testing.T) {

// The server will return 'Session not found' for the first 8
// BeginTransaction calls.
sessionNotFoundErr := status.Errorf(codes.NotFound, `Session not found: projects/<project>/instances/<instance>/databases/<database>/sessions/<session> resource_type: "Session" resource_name: "projects/<project>/instances/<instance>/databases/<database>/sessions/<session>" description: "Session does not exist."`)
sessionNotFoundErr := newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")
serverErrors := make([]error, 8)
for i := range serverErrors {
serverErrors[i] = sessionNotFoundErr
Expand Down Expand Up @@ -1239,7 +1246,7 @@ func TestSessionHealthCheck(t *testing.T) {
server.TestSpanner.Freeze()
server.TestSpanner.PutExecutionTime(MethodGetSession,
SimulatedExecutionTime{
Errors: []error{status.Errorf(codes.NotFound, "Session not found")},
Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
KeepError: true,
})
server.TestSpanner.Unfreeze()
Expand Down
20 changes: 16 additions & 4 deletions spanner/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"errors"
"fmt"
"reflect"
"strings"
"sync"
"testing"
"time"

. "cloud.google.com/go/spanner/internal/testutil"
"github.com/golang/protobuf/ptypes"
"github.com/google/go-cmp/cmp"
"google.golang.org/genproto/googleapis/rpc/errdetails"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -203,7 +205,7 @@ func TestTransaction_SessionNotFound(t *testing.T) {
server, client, teardown := setupMockedTestServer(t)
defer teardown()

serverErr := gstatus.Errorf(codes.NotFound, "Session not found")
serverErr := newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")
server.TestSpanner.PutExecutionTime(MethodBeginTransaction,
SimulatedExecutionTime{
Errors: []error{serverErr, serverErr, serverErr},
Expand Down Expand Up @@ -235,14 +237,24 @@ func TestTransaction_SessionNotFound(t *testing.T) {
t.Fatalf("Expect Read to succeed, got %v, want %v.", got.err, wantErr)
}

wantErr = toSpannerError(serverErr)
wantErr = toSpannerError(newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s"))
ms := []*Mutation{
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
}
_, got := client.Apply(ctx, ms, ApplyAtLeastOnce())
if !testEqual(wantErr, got) {
t.Fatalf("Expect Apply to fail, got %v, want %v.", got, wantErr)
if !cmp.Equal(wantErr, got,
cmp.AllowUnexported(Error{}), cmp.FilterPath(func(path cmp.Path) bool {
// Ignore statusError Details and Error.trailers.
if strings.Contains(path.GoString(), "{*spanner.Error}.err.(*status.statusError).Details") {
return true
}
if strings.Contains(path.GoString(), "{*spanner.Error}.trailers") {
return true
}
return false
}, cmp.Ignore())) {
t.Fatalf("Expect Apply to fail\nGot: %v\nWant: %v\n", got, wantErr)
}
}

Expand Down

0 comments on commit 863a84d

Please sign in to comment.