Skip to content

Commit

Permalink
Merge pull request #2126 from josephschorr/pg-serialization-error
Browse files Browse the repository at this point in the history
Improve PG serialization error on writes
  • Loading branch information
josephschorr authored Nov 8, 2024
2 parents 5220908 + 6bbacb3 commit 417c9cf
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 12 deletions.
39 changes: 39 additions & 0 deletions internal/datastore/postgres/postgres_shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,15 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
MigrationPhase(config.migrationPhase),
))

t.Run("TestSerializationError", createDatastoreTest(
b,
SerializationErrorTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
WatchBufferLength(50),
MigrationPhase(config.migrationPhase),
))

t.Run("TestStrictReadMode", createReplicaDatastoreTest(
b,
StrictReadModeTest,
Expand Down Expand Up @@ -270,6 +279,36 @@ func createReplicaDatastoreTest(b testdatastore.RunningEngineForTest, tf datasto
}
}

func SerializationErrorTest(t *testing.T, ds datastore.Datastore) {
require := require.New(t)

ctx := context.Background()
r, err := ds.ReadyState(ctx)
require.NoError(err)
require.True(r.IsReady)

_, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error {
updates := []tuple.RelationshipUpdate{
tuple.Create(tuple.MustParse("resource:resource#reader@user:user#...")),
}
rwt.(*pgReadWriteTXN).tx = txWithSerializationError{rwt.(*pgReadWriteTXN).tx}
return rwt.WriteRelationships(ctx, updates)
})

require.Contains(err.Error(), "unable to write relationships due to a serialization error")
}

type txWithSerializationError struct {
pgx.Tx
}

func (txwse txWithSerializationError) Exec(ctx context.Context, sql string, arguments ...any) (commandTag pgconn.CommandTag, err error) {
return pgconn.CommandTag{}, &pgconn.PgError{
Code: pgSerializationFailure,
Message: "fake serialization error",
}
}

func GarbageCollectionTest(t *testing.T, ds datastore.Datastore) {
require := require.New(t)

Expand Down
33 changes: 21 additions & 12 deletions internal/datastore/postgres/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jzelinskie/stringz"

"github.com/authzed/spicedb/internal/datastore/common"
pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common"
"github.com/authzed/spicedb/pkg/datastore"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
Expand Down Expand Up @@ -123,7 +124,7 @@ func (rwt *pgReadWriteTXN) collectSimplifiedTouchTypes(ctx context.Context, muta

namespaces, err := rwt.LookupNamespacesWithNames(ctx, touchedResourceNamespaces.AsSlice())
if err != nil {
return nil, fmt.Errorf(errUnableToWriteRelationships, err)
return nil, handleWriteError(err)
}

if len(namespaces) == 0 {
Expand All @@ -148,7 +149,7 @@ func (rwt *pgReadWriteTXN) collectSimplifiedTouchTypes(ctx context.Context, muta

vts, err := typesystem.NewNamespaceTypeSystem(nsDef, typesystem.ResolverForDatastoreReader(rwt))
if err != nil {
return nil, fmt.Errorf(errUnableToWriteRelationships, err)
return nil, handleWriteError(err)
}

notAllowed, err := vts.RelationDoesNotAllowCaveatsForSubject(rel.Resource.Relation, rel.Subject.ObjectType)
Expand Down Expand Up @@ -211,12 +212,12 @@ func (rwt *pgReadWriteTXN) WriteRelationships(ctx context.Context, mutations []t
if hasCreateInserts {
sql, args, err := createInserts.ToSql()
if err != nil {
return fmt.Errorf(errUnableToWriteRelationships, err)
return handleWriteError(err)
}

_, err = rwt.tx.Exec(ctx, sql, args...)
if err != nil {
return fmt.Errorf(errUnableToWriteRelationships, err)
return handleWriteError(err)
}
}

Expand All @@ -234,12 +235,12 @@ func (rwt *pgReadWriteTXN) WriteRelationships(ctx context.Context, mutations []t

sql, args, err := touchInserts.ToSql()
if err != nil {
return fmt.Errorf(errUnableToWriteRelationships, err)
return handleWriteError(err)
}

rows, err := rwt.tx.Query(ctx, sql, args...)
if err != nil {
return fmt.Errorf(errUnableToWriteRelationships, err)
return handleWriteError(err)
}
defer rows.Close()

Expand All @@ -263,7 +264,7 @@ func (rwt *pgReadWriteTXN) WriteRelationships(ctx context.Context, mutations []t
&subjectRelation,
)
if err != nil {
return fmt.Errorf(errUnableToWriteRelationships, err)
return handleWriteError(err)
}

rel := tuple.Relationship{
Expand Down Expand Up @@ -329,12 +330,12 @@ func (rwt *pgReadWriteTXN) WriteRelationships(ctx context.Context, mutations []t
Set(colDeletedXid, rwt.newXID).
ToSql()
if err != nil {
return fmt.Errorf(errUnableToWriteRelationships, err)
return handleWriteError(err)
}

rows, err := rwt.tx.Query(ctx, sql, args...)
if err != nil {
return fmt.Errorf(errUnableToWriteRelationships, err)
return handleWriteError(err)
}
defer rows.Close()

Expand All @@ -359,7 +360,7 @@ func (rwt *pgReadWriteTXN) WriteRelationships(ctx context.Context, mutations []t
&subjectRelation,
)
if err != nil {
return fmt.Errorf(errUnableToWriteRelationships, err)
return handleWriteError(err)
}

deletedTpl := tuple.Relationship{
Expand Down Expand Up @@ -397,17 +398,25 @@ func (rwt *pgReadWriteTXN) WriteRelationships(ctx context.Context, mutations []t
// Otherwise execute the INSERTs for the caveated-changes TOUCHed relationships.
sql, args, err = touchWrite.ToSql()
if err != nil {
return fmt.Errorf(errUnableToWriteRelationships, err)
return handleWriteError(err)
}

_, err = rwt.tx.Exec(ctx, sql, args...)
if err != nil {
return fmt.Errorf(errUnableToWriteRelationships, err)
return handleWriteError(err)
}

return nil
}

func handleWriteError(err error) error {
if pgxcommon.IsSerializationError(err) {
return common.NewSerializationError(fmt.Errorf("unable to write relationships due to a serialization error: [%w]; this typically indicates that a number of write transactions are contending over the same relationships; either reduce the contention or scale this Postgres instance", err))
}

return fmt.Errorf(errUnableToWriteRelationships, err)
}

func (rwt *pgReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) {
delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...)
if delOpts.DeleteLimit != nil && *delOpts.DeleteLimit > 0 {
Expand Down

0 comments on commit 417c9cf

Please sign in to comment.