diff --git a/internal/datastore/crdb/crdb.go b/internal/datastore/crdb/crdb.go index 80e4f83706..f10f248af8 100644 --- a/internal/datastore/crdb/crdb.go +++ b/internal/datastore/crdb/crdb.go @@ -499,6 +499,9 @@ func (cds *crdbDatastore) OfflineFeatures() (*datastore.Features, error) { ContinuousCheckpointing: datastore.Feature{ Status: datastore.FeatureSupported, }, + WatchEmitsImmediately: datastore.Feature{ + Status: datastore.FeatureSupported, + }, }, nil } @@ -509,6 +512,9 @@ func (cds *crdbDatastore) OfflineFeatures() (*datastore.Features, error) { ContinuousCheckpointing: datastore.Feature{ Status: datastore.FeatureSupported, }, + WatchEmitsImmediately: datastore.Feature{ + Status: datastore.FeatureSupported, + }, }, nil } @@ -532,6 +538,9 @@ func (cds *crdbDatastore) features(ctx context.Context) (*datastore.Features, er ContinuousCheckpointing: datastore.Feature{ Status: datastore.FeatureSupported, }, + WatchEmitsImmediately: datastore.Feature{ + Status: datastore.FeatureSupported, + }, } if cds.supportsIntegrity { features.IntegrityData.Status = datastore.FeatureSupported diff --git a/internal/datastore/crdb/watch.go b/internal/datastore/crdb/watch.go index 883d14978a..e000038e93 100644 --- a/internal/datastore/crdb/watch.go +++ b/internal/datastore/crdb/watch.go @@ -10,7 +10,9 @@ import ( "strings" "time" + "github.com/jackc/pgx/v5" "github.com/prometheus/client_golang/prometheus" + "google.golang.org/protobuf/types/known/structpb" "google.golang.org/protobuf/types/known/timestamppb" "github.com/authzed/spicedb/internal/datastore/common" @@ -72,15 +74,23 @@ func (cds *crdbDatastore) Watch(ctx context.Context, afterRevision datastore.Rev features, err := cds.Features(ctx) if err != nil { + close(updates) errs <- err return updates, errs } if features.Watch.Status != datastore.FeatureSupported { + close(updates) errs <- datastore.NewWatchDisabledErr(fmt.Sprintf("%s. See https://spicedb.dev/d/enable-watch-api-crdb", features.Watch.Reason)) return updates, errs } + if options.EmissionStrategy == datastore.EmitImmediatelyStrategy && !(options.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints) { + close(updates) + errs <- errors.New("EmitImmediatelyStrategy requires WatchCheckpoints to be set") + return updates, errs + } + go cds.watch(ctx, afterRevision, options, updates, errs) return updates, errs @@ -161,10 +171,10 @@ func (cds *crdbDatastore) watch( watchBufferWriteTimeout = cds.watchBufferWriteTimeout } - sendChange := func(change *datastore.RevisionChanges) bool { + sendChange := func(change *datastore.RevisionChanges) error { select { case updates <- change: - return true + return nil default: // If we cannot immediately write, setup the timer and try again. @@ -175,11 +185,10 @@ func (cds *crdbDatastore) watch( select { case updates <- change: - return true + return nil case <-timer.C: - errs <- datastore.NewWatchDisconnectedErr() - return false + return datastore.NewWatchDisconnectedErr() } } @@ -193,7 +202,130 @@ func (cds *crdbDatastore) watch( // no return value so we're not really losing anything. defer func() { go changes.Close() }() - tracked := common.NewChanges(revisions.HLCKeyFunc, opts.Content, opts.MaximumBufferedChangesByteSize) + cds.processChanges(ctx, changes, sendError, sendChange, opts, opts.EmissionStrategy == datastore.EmitImmediatelyStrategy) +} + +// changeTracker takes care of accumulating received from CockroachDB until a checkpoint is emitted +type changeTracker[R datastore.Revision, K comparable] interface { + FilterAndRemoveRevisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R) ([]datastore.RevisionChanges, error) + AddRelationshipChange(ctx context.Context, rev R, rel tuple.Relationship, op tuple.UpdateOperation) error + AddChangedDefinition(ctx context.Context, rev R, def datastore.SchemaDefinition) error + AddDeletedNamespace(ctx context.Context, rev R, namespaceName string) error + AddDeletedCaveat(ctx context.Context, rev R, caveatName string) error + SetRevisionMetadata(ctx context.Context, rev R, metadata map[string]any) error +} + +// streamingChangeProvider is a changeTracker that streams changes as they are processed. Instead of accumulating +// changes in memory before a checkpoint is reached, it leaves the responsibility of accumulating, deduplicating, +// normalizing changes, and waiting for a checkpoints to the caller. +// +// It's used when WatchOptions.EmissionStrategy is set to EmitImmediatelyStrategy. +type streamingChangeProvider struct { + content datastore.WatchContent + sendChange sendChangeFunc + sendError sendErrorFunc +} + +func (s streamingChangeProvider) FilterAndRemoveRevisionChanges(_ func(lhs revisions.HLCRevision, rhs revisions.HLCRevision) bool, _ revisions.HLCRevision) ([]datastore.RevisionChanges, error) { + // we do not accumulate in this implementation, but stream right away + return nil, nil +} + +func (s streamingChangeProvider) AddRelationshipChange(ctx context.Context, rev revisions.HLCRevision, rel tuple.Relationship, op tuple.UpdateOperation) error { + if s.content&datastore.WatchRelationships != datastore.WatchRelationships { + return nil + } + + changes := datastore.RevisionChanges{ + Revision: rev, + } + switch op { + case tuple.UpdateOperationCreate: + changes.RelationshipChanges = append(changes.RelationshipChanges, tuple.Create(rel)) + case tuple.UpdateOperationTouch: + changes.RelationshipChanges = append(changes.RelationshipChanges, tuple.Touch(rel)) + case tuple.UpdateOperationDelete: + changes.RelationshipChanges = append(changes.RelationshipChanges, tuple.Delete(rel)) + default: + return spiceerrors.MustBugf("unknown change operation") + } + + return s.sendChange(&changes) +} + +func (s streamingChangeProvider) AddChangedDefinition(_ context.Context, rev revisions.HLCRevision, def datastore.SchemaDefinition) error { + if s.content&datastore.WatchSchema != datastore.WatchSchema { + return nil + } + + changes := datastore.RevisionChanges{ + Revision: rev, + ChangedDefinitions: []datastore.SchemaDefinition{def}, + } + + return s.sendChange(&changes) +} + +func (s streamingChangeProvider) AddDeletedNamespace(_ context.Context, rev revisions.HLCRevision, namespaceName string) error { + if s.content&datastore.WatchSchema != datastore.WatchSchema { + return nil + } + + changes := datastore.RevisionChanges{ + Revision: rev, + DeletedNamespaces: []string{namespaceName}, + } + + return s.sendChange(&changes) +} + +func (s streamingChangeProvider) AddDeletedCaveat(_ context.Context, rev revisions.HLCRevision, caveatName string) error { + if s.content&datastore.WatchSchema != datastore.WatchSchema { + return nil + } + + changes := datastore.RevisionChanges{ + Revision: rev, + DeletedCaveats: []string{caveatName}, + } + + return s.sendChange(&changes) +} + +func (s streamingChangeProvider) SetRevisionMetadata(_ context.Context, rev revisions.HLCRevision, metadata map[string]any) error { + if len(metadata) > 0 { + parsedMetadata, err := structpb.NewStruct(metadata) + if err != nil { + return spiceerrors.MustBugf("failed to convert metadata to structpb: %v", err) + } + + changes := datastore.RevisionChanges{ + Revision: rev, + Metadata: parsedMetadata, + } + + return s.sendChange(&changes) + } + + return nil +} + +type ( + sendChangeFunc func(change *datastore.RevisionChanges) error + sendErrorFunc func(err error) +) + +func (cds *crdbDatastore) processChanges(ctx context.Context, changes pgx.Rows, sendError sendErrorFunc, sendChange sendChangeFunc, opts datastore.WatchOptions, streaming bool) { + var tracked changeTracker[revisions.HLCRevision, revisions.HLCRevision] + if streaming { + tracked = &streamingChangeProvider{ + sendChange: sendChange, + sendError: sendError, + content: opts.Content, + } + } else { + tracked = common.NewChanges(revisions.HLCKeyFunc, opts.Content, opts.MaximumBufferedChangesByteSize) + } for changes.Next() { var tableNameBytes []byte @@ -229,19 +361,22 @@ func (cds *crdbDatastore) watch( for _, revChange := range filtered { revChange := revChange - if !sendChange(&revChange) { + if err := sendChange(&revChange); err != nil { + sendError(err) return } } if opts.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints { - if !sendChange(&datastore.RevisionChanges{ + if err := sendChange(&datastore.RevisionChanges{ Revision: rev, IsCheckpoint: true, - }) { + }); err != nil { + sendError(err) return } } + continue } @@ -427,17 +562,7 @@ func (cds *crdbDatastore) watch( } if changes.Err() != nil { - if errors.Is(ctx.Err(), context.Canceled) { - closeCtx, closeCancel := context.WithTimeout(context.Background(), 5*time.Second) - defer closeCancel() - if err := conn.Close(closeCtx); err != nil { - errs <- err - return - } - errs <- datastore.NewWatchCanceledErr() - } else { - errs <- changes.Err() - } + sendError(changes.Err()) return } } diff --git a/internal/datastore/memdb/memdb.go b/internal/datastore/memdb/memdb.go index 9327c4e435..b8f416ea2c 100644 --- a/internal/datastore/memdb/memdb.go +++ b/internal/datastore/memdb/memdb.go @@ -334,6 +334,9 @@ func (mdb *memdbDatastore) OfflineFeatures() (*datastore.Features, error) { ContinuousCheckpointing: datastore.Feature{ Status: datastore.FeatureUnsupported, }, + WatchEmitsImmediately: datastore.Feature{ + Status: datastore.FeatureUnsupported, + }, }, nil } diff --git a/internal/datastore/memdb/watch.go b/internal/datastore/memdb/watch.go index 7933e005b0..049a04ed54 100644 --- a/internal/datastore/memdb/watch.go +++ b/internal/datastore/memdb/watch.go @@ -23,6 +23,12 @@ func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, opt updates := make(chan *datastore.RevisionChanges, watchBufferLength) errs := make(chan error, 1) + if options.EmissionStrategy == datastore.EmitImmediatelyStrategy { + close(updates) + errs <- errors.New("emit immediately strategy is unsupported in MemDB") + return updates, errs + } + watchBufferWriteTimeout := options.WatchBufferWriteTimeout if watchBufferWriteTimeout == 0 { watchBufferWriteTimeout = mdb.watchBufferWriteTimeout diff --git a/internal/datastore/mysql/datastore.go b/internal/datastore/mysql/datastore.go index 9d6d33827f..d8f76863f5 100644 --- a/internal/datastore/mysql/datastore.go +++ b/internal/datastore/mysql/datastore.go @@ -607,6 +607,9 @@ func (mds *Datastore) OfflineFeatures() (*datastore.Features, error) { ContinuousCheckpointing: datastore.Feature{ Status: datastore.FeatureUnsupported, }, + WatchEmitsImmediately: datastore.Feature{ + Status: datastore.FeatureUnsupported, + }, }, nil } diff --git a/internal/datastore/mysql/watch.go b/internal/datastore/mysql/watch.go index a8f8b19ce6..4e70ed3531 100644 --- a/internal/datastore/mysql/watch.go +++ b/internal/datastore/mysql/watch.go @@ -30,10 +30,17 @@ func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revi errs := make(chan error, 1) if options.Content&datastore.WatchSchema == datastore.WatchSchema { + close(updates) errs <- errors.New("schema watch unsupported in MySQL") return updates, errs } + if options.EmissionStrategy == datastore.EmitImmediatelyStrategy { + close(updates) + errs <- errors.New("emit immediately strategy is unsupported in MySQL") + return updates, errs + } + afterRevision, ok := afterRevisionRaw.(revisions.TransactionIDRevision) if !ok { errs <- datastore.NewInvalidRevisionErr(afterRevisionRaw, datastore.CouldNotDetermineRevision) diff --git a/internal/datastore/postgres/postgres.go b/internal/datastore/postgres/postgres.go index 69f018b299..75ed433754 100644 --- a/internal/datastore/postgres/postgres.go +++ b/internal/datastore/postgres/postgres.go @@ -697,6 +697,9 @@ func (pgd *pgDatastore) OfflineFeatures() (*datastore.Features, error) { ContinuousCheckpointing: datastore.Feature{ Status: datastore.FeatureUnsupported, }, + WatchEmitsImmediately: datastore.Feature{ + Status: datastore.FeatureUnsupported, + }, }, nil } diff --git a/internal/datastore/postgres/watch.go b/internal/datastore/postgres/watch.go index aac9873d4f..fc568ad30c 100644 --- a/internal/datastore/postgres/watch.go +++ b/internal/datastore/postgres/watch.go @@ -74,10 +74,17 @@ func (pgd *pgDatastore) Watch( errs := make(chan error, 1) if !pgd.watchEnabled { + close(updates) errs <- datastore.NewWatchDisabledErr("postgres must be run with track_commit_timestamp=on for watch to be enabled. See https://spicedb.dev/d/enable-watch-api-postgres") return updates, errs } + if options.EmissionStrategy == datastore.EmitImmediatelyStrategy { + close(updates) + errs <- errors.New("emit immediately strategy is unsupported in Postgres") + return updates, errs + } + afterRevision := afterRevisionRaw.(postgresRevision) watchSleep := options.CheckpointInterval if watchSleep < minimumWatchSleep { diff --git a/internal/datastore/spanner/spanner.go b/internal/datastore/spanner/spanner.go index 1c60df2b4c..55975adb22 100644 --- a/internal/datastore/spanner/spanner.go +++ b/internal/datastore/spanner/spanner.go @@ -356,6 +356,9 @@ func (sd *spannerDatastore) OfflineFeatures() (*datastore.Features, error) { ContinuousCheckpointing: datastore.Feature{ Status: datastore.FeatureSupported, }, + WatchEmitsImmediately: datastore.Feature{ + Status: datastore.FeatureUnsupported, + }, }, nil } diff --git a/internal/datastore/spanner/watch.go b/internal/datastore/spanner/watch.go index af8d68f78f..0cee6df022 100644 --- a/internal/datastore/spanner/watch.go +++ b/internal/datastore/spanner/watch.go @@ -61,6 +61,12 @@ func (sd *spannerDatastore) Watch(ctx context.Context, afterRevision datastore.R updates := make(chan *datastore.RevisionChanges, watchBufferLength) errs := make(chan error, 1) + if opts.EmissionStrategy == datastore.EmitImmediatelyStrategy { + close(updates) + errs <- errors.New("emit immediately strategy is unsupported in Spanner") + return updates, errs + } + go sd.watch(ctx, afterRevision, opts, updates, errs) return updates, errs diff --git a/pkg/datastore/datastore.go b/pkg/datastore/datastore.go index 76fff5cd48..b85bb9ce0b 100644 --- a/pkg/datastore/datastore.go +++ b/pkg/datastore/datastore.go @@ -528,8 +528,27 @@ type WatchOptions struct { // If unspecified, no maximum will be enforced. If the maximum is reached before // the changes can be sent, the watch will be closed with an error. MaximumBufferedChangesByteSize uint64 + + // EmissionStrategy defines when are changes streamed to the client. If unspecified, changes will be buffered until + // they can be checkpointed, which is the default behavior. + EmissionStrategy EmissionStrategy } +// EmissionStrategy describes when changes are emitted to the client. +type EmissionStrategy int + +const ( + // EmitWhenCheckpointedStrategy will buffer changes until a checkpoint is reached. This also means that + // changes will be deduplicated and revisions will be sorted before emission as soon as they can be checkpointed. + EmitWhenCheckpointedStrategy = iota + + // EmitImmediatelyStrategy emits changes as soon as they are available. This means changes will not be buffered, + // and thus will be emitted as soon as they are available, but clients are responsible for buffering, deduplication, + // and sorting revisions. In practical terms that can only happens if Checkpoints have been requested, so enabling + // EmitImmediatelyStrategy without Checkpoints will return an error. + EmitImmediatelyStrategy +) + // WatchJustRelationships returns watch options for just relationships. func WatchJustRelationships() WatchOptions { return WatchOptions{ @@ -723,6 +742,11 @@ type Features struct { // new transactions are committed. ContinuousCheckpointing Feature + // WatchEmitsImmediately indicates if the datastore supports the EmitImmediatelyStrategy EmissionStrategy. + // If not supported, clients of the Watch API will receive an error when calling Watch API with + // EmitImmediatelyStrategy option. + WatchEmitsImmediately Feature + // IntegrityData is enabled if the underlying datastore supports retrieving and storing // integrity information. IntegrityData Feature diff --git a/pkg/datastore/test/datastore.go b/pkg/datastore/test/datastore.go index 6c3f0c2ea0..83e3c5bb1d 100644 --- a/pkg/datastore/test/datastore.go +++ b/pkg/datastore/test/datastore.go @@ -180,6 +180,7 @@ func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories, t.Run("TestWatchWithTouch", runner(tester, WatchWithTouchTest)) t.Run("TestWatchWithDelete", runner(tester, WatchWithDeleteTest)) t.Run("TestWatchWithMetadata", runner(tester, WatchWithMetadataTest)) + t.Run("TestWatchEmissionStrategy", runner(tester, WatchEmissionStrategyTest)) } if !except.Watch() && !except.WatchSchema() { diff --git a/pkg/datastore/test/watch.go b/pkg/datastore/test/watch.go index 961260d72c..4d0fa1ddd9 100644 --- a/pkg/datastore/test/watch.go +++ b/pkg/datastore/test/watch.go @@ -10,6 +10,7 @@ import ( v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" "github.com/google/go-cmp/cmp" + "github.com/google/uuid" "github.com/scylladb/go-set/strset" "github.com/stretchr/testify/require" "google.golang.org/protobuf/testing/protocmp" @@ -746,6 +747,99 @@ func WatchCheckpointsTest(t *testing.T, tester DatastoreTester) { verifyCheckpointUpdate(require, afterTouchRevision, changes) } +func WatchEmissionStrategyTest(t *testing.T, tester DatastoreTester) { + require := require.New(t) + + ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 16) + require.NoError(err) + + setupDatastore(ds, require) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + features, err := ds.Features(ctx) + require.NoError(err) + + expectsWatchError := false + if !(features.WatchEmitsImmediately.Status == datastore.FeatureSupported) { + expectsWatchError = true + } + + lowestRevision, err := ds.HeadRevision(ctx) + require.NoError(err) + + changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchOptions{ + Content: datastore.WatchCheckpoints | datastore.WatchRelationships, + CheckpointInterval: 100 * time.Millisecond, + EmissionStrategy: datastore.EmitImmediatelyStrategy, + }) + if expectsWatchError { + require.NotZero(len(errchan)) + err := <-errchan + require.ErrorContains(err, "emit immediately strategy is unsupported") + return + } + require.Zero(len(errchan)) + + // since changes are streamed immediately, we expect changes to be streamed as independent change events, + // whereas with the default emission strategy it would be accumulated and normalized. For examples, the default + // strategy would accumulate everything into a single Change sent over the channel, like an added relationship + // and its associated transaction metadata. The emit immediately strategy will emit both + // the rel touch and tx metadata as independent changes as it won't accumulate and normalize it. + testTuple := tuple.MustParse("document:firstdoc#viewer@user:" + uuid.NewString()) + testMetadata, err := structpb.NewStruct(map[string]any{"foo": "bar"}) + require.NoError(err) + targetRev, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + return rwt.WriteRelationships(ctx, []tuple.RelationshipUpdate{ + tuple.Touch(testTuple), + }) + }, options.WithMetadata(testMetadata)) + require.NoError(err) + + var relTouchEmitted, metadataEmitted, checkpointEmitted bool + changeWait := time.NewTimer(waitForChangesTimeout) + var changeCount int + for { + select { + case change, ok := <-changes: + require.True(ok) + for _, relChange := range change.RelationshipChanges { + if relChange.Relationship == testTuple && relChange.Operation == tuple.UpdateOperationTouch { + relTouchEmitted = true + changeCount++ + require.True(targetRev.Equal(change.Revision)) + } + + continue // we expect each change to come in individual change event + } + + if change.Metadata != nil { + require.Contains(change.Metadata.AsMap(), "foo") + metadataEmitted = true + changeCount++ + require.True(targetRev.Equal(change.Revision)) + continue + } + + if change.IsCheckpoint { + if change.Revision.Equal(targetRev) || change.Revision.GreaterThan(targetRev) { + require.True(metadataEmitted, "expected tx metadata before checkpoint") + require.True(relTouchEmitted, "expected relationship touch before checkpoint") + require.GreaterOrEqual(changeCount, 2, "expected at least 2 changes over the channel") + return + } + } + case <-changeWait.C: + require.Fail("Timed out", "waited for checkpoint") + } + + if relTouchEmitted && metadataEmitted && checkpointEmitted { + return + } + } +} + func verifyCheckpointUpdate( require *require.Assertions, expectedRevision datastore.Revision,