From cbac7b94139091a46e689c5b5072cd0b0fdaf9f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADctor=20Rold=C3=A1n=20Betancort?= Date: Tue, 5 Nov 2024 19:37:02 +0000 Subject: [PATCH] introduces `WatchOptions.EmissionStrategy` to handle limitations of CockroachDB changefeeds. In CRDB, checkpointing for a span only occurs once its catch-up scan completes. As a result, resolved timestamps for the changefeed won't be emitted until all spans have been checkpointed and moved past the initial high-water mark. Unfortunately, no workarounds exist to make the changefeed emit resolved timestamps during a catch-up scan. This is a known issue on CRL's radar. This is important because, without checkpoints, it means changes are accumulated in memory, and depending on the use case can lead to increased memory pressure and out-of-memory errors This commit introduces a new change emission strategy option into WatchOptions: - `EmitWhenCheckpointedStrategy` is the original strategy, which accumulates until a checkpoint is emitted. - `EmitImmediatelyStrategy` is the new strategy that streams changes right away. It's important to note that makes the client responsible of: - deduplicating changes - ordering revisions - accumulate changes until a checkpoint is emitted Essentially the client is responsible for doing all the work that used to be done with the `Change` struct helper. For every other datastore, calling Watch API with `EmitImmediatelyStrategy` will return an error. We don't consider there is an evident need at the moment for other data stores. Once we've proven this strategy is viable in production workloads, we will study implementing a Watch API strategy that buffers to disk to address out-of-memory issues, and potentially deprecate the emit immediately strategy in favor of one that does all the heavy lifting for the client. --- internal/datastore/crdb/crdb.go | 9 ++ internal/datastore/crdb/watch.go | 165 +++++++++++++++++++++--- internal/datastore/memdb/memdb.go | 3 + internal/datastore/memdb/watch.go | 6 + internal/datastore/mysql/datastore.go | 3 + internal/datastore/mysql/watch.go | 7 + internal/datastore/postgres/postgres.go | 3 + internal/datastore/postgres/watch.go | 7 + internal/datastore/spanner/spanner.go | 3 + internal/datastore/spanner/watch.go | 6 + pkg/datastore/datastore.go | 24 ++++ pkg/datastore/test/datastore.go | 1 + pkg/datastore/test/watch.go | 94 ++++++++++++++ 13 files changed, 311 insertions(+), 20 deletions(-) diff --git a/internal/datastore/crdb/crdb.go b/internal/datastore/crdb/crdb.go index 80e4f83706..f10f248af8 100644 --- a/internal/datastore/crdb/crdb.go +++ b/internal/datastore/crdb/crdb.go @@ -499,6 +499,9 @@ func (cds *crdbDatastore) OfflineFeatures() (*datastore.Features, error) { ContinuousCheckpointing: datastore.Feature{ Status: datastore.FeatureSupported, }, + WatchEmitsImmediately: datastore.Feature{ + Status: datastore.FeatureSupported, + }, }, nil } @@ -509,6 +512,9 @@ func (cds *crdbDatastore) OfflineFeatures() (*datastore.Features, error) { ContinuousCheckpointing: datastore.Feature{ Status: datastore.FeatureSupported, }, + WatchEmitsImmediately: datastore.Feature{ + Status: datastore.FeatureSupported, + }, }, nil } @@ -532,6 +538,9 @@ func (cds *crdbDatastore) features(ctx context.Context) (*datastore.Features, er ContinuousCheckpointing: datastore.Feature{ Status: datastore.FeatureSupported, }, + WatchEmitsImmediately: datastore.Feature{ + Status: datastore.FeatureSupported, + }, } if cds.supportsIntegrity { features.IntegrityData.Status = datastore.FeatureSupported diff --git a/internal/datastore/crdb/watch.go b/internal/datastore/crdb/watch.go index 883d14978a..e000038e93 100644 --- a/internal/datastore/crdb/watch.go +++ b/internal/datastore/crdb/watch.go @@ -10,7 +10,9 @@ import ( "strings" "time" + "github.com/jackc/pgx/v5" "github.com/prometheus/client_golang/prometheus" + "google.golang.org/protobuf/types/known/structpb" "google.golang.org/protobuf/types/known/timestamppb" "github.com/authzed/spicedb/internal/datastore/common" @@ -72,15 +74,23 @@ func (cds *crdbDatastore) Watch(ctx context.Context, afterRevision datastore.Rev features, err := cds.Features(ctx) if err != nil { + close(updates) errs <- err return updates, errs } if features.Watch.Status != datastore.FeatureSupported { + close(updates) errs <- datastore.NewWatchDisabledErr(fmt.Sprintf("%s. See https://spicedb.dev/d/enable-watch-api-crdb", features.Watch.Reason)) return updates, errs } + if options.EmissionStrategy == datastore.EmitImmediatelyStrategy && !(options.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints) { + close(updates) + errs <- errors.New("EmitImmediatelyStrategy requires WatchCheckpoints to be set") + return updates, errs + } + go cds.watch(ctx, afterRevision, options, updates, errs) return updates, errs @@ -161,10 +171,10 @@ func (cds *crdbDatastore) watch( watchBufferWriteTimeout = cds.watchBufferWriteTimeout } - sendChange := func(change *datastore.RevisionChanges) bool { + sendChange := func(change *datastore.RevisionChanges) error { select { case updates <- change: - return true + return nil default: // If we cannot immediately write, setup the timer and try again. @@ -175,11 +185,10 @@ func (cds *crdbDatastore) watch( select { case updates <- change: - return true + return nil case <-timer.C: - errs <- datastore.NewWatchDisconnectedErr() - return false + return datastore.NewWatchDisconnectedErr() } } @@ -193,7 +202,130 @@ func (cds *crdbDatastore) watch( // no return value so we're not really losing anything. defer func() { go changes.Close() }() - tracked := common.NewChanges(revisions.HLCKeyFunc, opts.Content, opts.MaximumBufferedChangesByteSize) + cds.processChanges(ctx, changes, sendError, sendChange, opts, opts.EmissionStrategy == datastore.EmitImmediatelyStrategy) +} + +// changeTracker takes care of accumulating received from CockroachDB until a checkpoint is emitted +type changeTracker[R datastore.Revision, K comparable] interface { + FilterAndRemoveRevisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R) ([]datastore.RevisionChanges, error) + AddRelationshipChange(ctx context.Context, rev R, rel tuple.Relationship, op tuple.UpdateOperation) error + AddChangedDefinition(ctx context.Context, rev R, def datastore.SchemaDefinition) error + AddDeletedNamespace(ctx context.Context, rev R, namespaceName string) error + AddDeletedCaveat(ctx context.Context, rev R, caveatName string) error + SetRevisionMetadata(ctx context.Context, rev R, metadata map[string]any) error +} + +// streamingChangeProvider is a changeTracker that streams changes as they are processed. Instead of accumulating +// changes in memory before a checkpoint is reached, it leaves the responsibility of accumulating, deduplicating, +// normalizing changes, and waiting for a checkpoints to the caller. +// +// It's used when WatchOptions.EmissionStrategy is set to EmitImmediatelyStrategy. +type streamingChangeProvider struct { + content datastore.WatchContent + sendChange sendChangeFunc + sendError sendErrorFunc +} + +func (s streamingChangeProvider) FilterAndRemoveRevisionChanges(_ func(lhs revisions.HLCRevision, rhs revisions.HLCRevision) bool, _ revisions.HLCRevision) ([]datastore.RevisionChanges, error) { + // we do not accumulate in this implementation, but stream right away + return nil, nil +} + +func (s streamingChangeProvider) AddRelationshipChange(ctx context.Context, rev revisions.HLCRevision, rel tuple.Relationship, op tuple.UpdateOperation) error { + if s.content&datastore.WatchRelationships != datastore.WatchRelationships { + return nil + } + + changes := datastore.RevisionChanges{ + Revision: rev, + } + switch op { + case tuple.UpdateOperationCreate: + changes.RelationshipChanges = append(changes.RelationshipChanges, tuple.Create(rel)) + case tuple.UpdateOperationTouch: + changes.RelationshipChanges = append(changes.RelationshipChanges, tuple.Touch(rel)) + case tuple.UpdateOperationDelete: + changes.RelationshipChanges = append(changes.RelationshipChanges, tuple.Delete(rel)) + default: + return spiceerrors.MustBugf("unknown change operation") + } + + return s.sendChange(&changes) +} + +func (s streamingChangeProvider) AddChangedDefinition(_ context.Context, rev revisions.HLCRevision, def datastore.SchemaDefinition) error { + if s.content&datastore.WatchSchema != datastore.WatchSchema { + return nil + } + + changes := datastore.RevisionChanges{ + Revision: rev, + ChangedDefinitions: []datastore.SchemaDefinition{def}, + } + + return s.sendChange(&changes) +} + +func (s streamingChangeProvider) AddDeletedNamespace(_ context.Context, rev revisions.HLCRevision, namespaceName string) error { + if s.content&datastore.WatchSchema != datastore.WatchSchema { + return nil + } + + changes := datastore.RevisionChanges{ + Revision: rev, + DeletedNamespaces: []string{namespaceName}, + } + + return s.sendChange(&changes) +} + +func (s streamingChangeProvider) AddDeletedCaveat(_ context.Context, rev revisions.HLCRevision, caveatName string) error { + if s.content&datastore.WatchSchema != datastore.WatchSchema { + return nil + } + + changes := datastore.RevisionChanges{ + Revision: rev, + DeletedCaveats: []string{caveatName}, + } + + return s.sendChange(&changes) +} + +func (s streamingChangeProvider) SetRevisionMetadata(_ context.Context, rev revisions.HLCRevision, metadata map[string]any) error { + if len(metadata) > 0 { + parsedMetadata, err := structpb.NewStruct(metadata) + if err != nil { + return spiceerrors.MustBugf("failed to convert metadata to structpb: %v", err) + } + + changes := datastore.RevisionChanges{ + Revision: rev, + Metadata: parsedMetadata, + } + + return s.sendChange(&changes) + } + + return nil +} + +type ( + sendChangeFunc func(change *datastore.RevisionChanges) error + sendErrorFunc func(err error) +) + +func (cds *crdbDatastore) processChanges(ctx context.Context, changes pgx.Rows, sendError sendErrorFunc, sendChange sendChangeFunc, opts datastore.WatchOptions, streaming bool) { + var tracked changeTracker[revisions.HLCRevision, revisions.HLCRevision] + if streaming { + tracked = &streamingChangeProvider{ + sendChange: sendChange, + sendError: sendError, + content: opts.Content, + } + } else { + tracked = common.NewChanges(revisions.HLCKeyFunc, opts.Content, opts.MaximumBufferedChangesByteSize) + } for changes.Next() { var tableNameBytes []byte @@ -229,19 +361,22 @@ func (cds *crdbDatastore) watch( for _, revChange := range filtered { revChange := revChange - if !sendChange(&revChange) { + if err := sendChange(&revChange); err != nil { + sendError(err) return } } if opts.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints { - if !sendChange(&datastore.RevisionChanges{ + if err := sendChange(&datastore.RevisionChanges{ Revision: rev, IsCheckpoint: true, - }) { + }); err != nil { + sendError(err) return } } + continue } @@ -427,17 +562,7 @@ func (cds *crdbDatastore) watch( } if changes.Err() != nil { - if errors.Is(ctx.Err(), context.Canceled) { - closeCtx, closeCancel := context.WithTimeout(context.Background(), 5*time.Second) - defer closeCancel() - if err := conn.Close(closeCtx); err != nil { - errs <- err - return - } - errs <- datastore.NewWatchCanceledErr() - } else { - errs <- changes.Err() - } + sendError(changes.Err()) return } } diff --git a/internal/datastore/memdb/memdb.go b/internal/datastore/memdb/memdb.go index 9327c4e435..b8f416ea2c 100644 --- a/internal/datastore/memdb/memdb.go +++ b/internal/datastore/memdb/memdb.go @@ -334,6 +334,9 @@ func (mdb *memdbDatastore) OfflineFeatures() (*datastore.Features, error) { ContinuousCheckpointing: datastore.Feature{ Status: datastore.FeatureUnsupported, }, + WatchEmitsImmediately: datastore.Feature{ + Status: datastore.FeatureUnsupported, + }, }, nil } diff --git a/internal/datastore/memdb/watch.go b/internal/datastore/memdb/watch.go index 7933e005b0..049a04ed54 100644 --- a/internal/datastore/memdb/watch.go +++ b/internal/datastore/memdb/watch.go @@ -23,6 +23,12 @@ func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, opt updates := make(chan *datastore.RevisionChanges, watchBufferLength) errs := make(chan error, 1) + if options.EmissionStrategy == datastore.EmitImmediatelyStrategy { + close(updates) + errs <- errors.New("emit immediately strategy is unsupported in MemDB") + return updates, errs + } + watchBufferWriteTimeout := options.WatchBufferWriteTimeout if watchBufferWriteTimeout == 0 { watchBufferWriteTimeout = mdb.watchBufferWriteTimeout diff --git a/internal/datastore/mysql/datastore.go b/internal/datastore/mysql/datastore.go index 9d6d33827f..d8f76863f5 100644 --- a/internal/datastore/mysql/datastore.go +++ b/internal/datastore/mysql/datastore.go @@ -607,6 +607,9 @@ func (mds *Datastore) OfflineFeatures() (*datastore.Features, error) { ContinuousCheckpointing: datastore.Feature{ Status: datastore.FeatureUnsupported, }, + WatchEmitsImmediately: datastore.Feature{ + Status: datastore.FeatureUnsupported, + }, }, nil } diff --git a/internal/datastore/mysql/watch.go b/internal/datastore/mysql/watch.go index a8f8b19ce6..4e70ed3531 100644 --- a/internal/datastore/mysql/watch.go +++ b/internal/datastore/mysql/watch.go @@ -30,10 +30,17 @@ func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revi errs := make(chan error, 1) if options.Content&datastore.WatchSchema == datastore.WatchSchema { + close(updates) errs <- errors.New("schema watch unsupported in MySQL") return updates, errs } + if options.EmissionStrategy == datastore.EmitImmediatelyStrategy { + close(updates) + errs <- errors.New("emit immediately strategy is unsupported in MySQL") + return updates, errs + } + afterRevision, ok := afterRevisionRaw.(revisions.TransactionIDRevision) if !ok { errs <- datastore.NewInvalidRevisionErr(afterRevisionRaw, datastore.CouldNotDetermineRevision) diff --git a/internal/datastore/postgres/postgres.go b/internal/datastore/postgres/postgres.go index 69f018b299..75ed433754 100644 --- a/internal/datastore/postgres/postgres.go +++ b/internal/datastore/postgres/postgres.go @@ -697,6 +697,9 @@ func (pgd *pgDatastore) OfflineFeatures() (*datastore.Features, error) { ContinuousCheckpointing: datastore.Feature{ Status: datastore.FeatureUnsupported, }, + WatchEmitsImmediately: datastore.Feature{ + Status: datastore.FeatureUnsupported, + }, }, nil } diff --git a/internal/datastore/postgres/watch.go b/internal/datastore/postgres/watch.go index aac9873d4f..fc568ad30c 100644 --- a/internal/datastore/postgres/watch.go +++ b/internal/datastore/postgres/watch.go @@ -74,10 +74,17 @@ func (pgd *pgDatastore) Watch( errs := make(chan error, 1) if !pgd.watchEnabled { + close(updates) errs <- datastore.NewWatchDisabledErr("postgres must be run with track_commit_timestamp=on for watch to be enabled. See https://spicedb.dev/d/enable-watch-api-postgres") return updates, errs } + if options.EmissionStrategy == datastore.EmitImmediatelyStrategy { + close(updates) + errs <- errors.New("emit immediately strategy is unsupported in Postgres") + return updates, errs + } + afterRevision := afterRevisionRaw.(postgresRevision) watchSleep := options.CheckpointInterval if watchSleep < minimumWatchSleep { diff --git a/internal/datastore/spanner/spanner.go b/internal/datastore/spanner/spanner.go index 1c60df2b4c..55975adb22 100644 --- a/internal/datastore/spanner/spanner.go +++ b/internal/datastore/spanner/spanner.go @@ -356,6 +356,9 @@ func (sd *spannerDatastore) OfflineFeatures() (*datastore.Features, error) { ContinuousCheckpointing: datastore.Feature{ Status: datastore.FeatureSupported, }, + WatchEmitsImmediately: datastore.Feature{ + Status: datastore.FeatureUnsupported, + }, }, nil } diff --git a/internal/datastore/spanner/watch.go b/internal/datastore/spanner/watch.go index af8d68f78f..0cee6df022 100644 --- a/internal/datastore/spanner/watch.go +++ b/internal/datastore/spanner/watch.go @@ -61,6 +61,12 @@ func (sd *spannerDatastore) Watch(ctx context.Context, afterRevision datastore.R updates := make(chan *datastore.RevisionChanges, watchBufferLength) errs := make(chan error, 1) + if opts.EmissionStrategy == datastore.EmitImmediatelyStrategy { + close(updates) + errs <- errors.New("emit immediately strategy is unsupported in Spanner") + return updates, errs + } + go sd.watch(ctx, afterRevision, opts, updates, errs) return updates, errs diff --git a/pkg/datastore/datastore.go b/pkg/datastore/datastore.go index 76fff5cd48..b85bb9ce0b 100644 --- a/pkg/datastore/datastore.go +++ b/pkg/datastore/datastore.go @@ -528,8 +528,27 @@ type WatchOptions struct { // If unspecified, no maximum will be enforced. If the maximum is reached before // the changes can be sent, the watch will be closed with an error. MaximumBufferedChangesByteSize uint64 + + // EmissionStrategy defines when are changes streamed to the client. If unspecified, changes will be buffered until + // they can be checkpointed, which is the default behavior. + EmissionStrategy EmissionStrategy } +// EmissionStrategy describes when changes are emitted to the client. +type EmissionStrategy int + +const ( + // EmitWhenCheckpointedStrategy will buffer changes until a checkpoint is reached. This also means that + // changes will be deduplicated and revisions will be sorted before emission as soon as they can be checkpointed. + EmitWhenCheckpointedStrategy = iota + + // EmitImmediatelyStrategy emits changes as soon as they are available. This means changes will not be buffered, + // and thus will be emitted as soon as they are available, but clients are responsible for buffering, deduplication, + // and sorting revisions. In practical terms that can only happens if Checkpoints have been requested, so enabling + // EmitImmediatelyStrategy without Checkpoints will return an error. + EmitImmediatelyStrategy +) + // WatchJustRelationships returns watch options for just relationships. func WatchJustRelationships() WatchOptions { return WatchOptions{ @@ -723,6 +742,11 @@ type Features struct { // new transactions are committed. ContinuousCheckpointing Feature + // WatchEmitsImmediately indicates if the datastore supports the EmitImmediatelyStrategy EmissionStrategy. + // If not supported, clients of the Watch API will receive an error when calling Watch API with + // EmitImmediatelyStrategy option. + WatchEmitsImmediately Feature + // IntegrityData is enabled if the underlying datastore supports retrieving and storing // integrity information. IntegrityData Feature diff --git a/pkg/datastore/test/datastore.go b/pkg/datastore/test/datastore.go index 6c3f0c2ea0..83e3c5bb1d 100644 --- a/pkg/datastore/test/datastore.go +++ b/pkg/datastore/test/datastore.go @@ -180,6 +180,7 @@ func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories, t.Run("TestWatchWithTouch", runner(tester, WatchWithTouchTest)) t.Run("TestWatchWithDelete", runner(tester, WatchWithDeleteTest)) t.Run("TestWatchWithMetadata", runner(tester, WatchWithMetadataTest)) + t.Run("TestWatchEmissionStrategy", runner(tester, WatchEmissionStrategyTest)) } if !except.Watch() && !except.WatchSchema() { diff --git a/pkg/datastore/test/watch.go b/pkg/datastore/test/watch.go index 961260d72c..4d0fa1ddd9 100644 --- a/pkg/datastore/test/watch.go +++ b/pkg/datastore/test/watch.go @@ -10,6 +10,7 @@ import ( v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" "github.com/google/go-cmp/cmp" + "github.com/google/uuid" "github.com/scylladb/go-set/strset" "github.com/stretchr/testify/require" "google.golang.org/protobuf/testing/protocmp" @@ -746,6 +747,99 @@ func WatchCheckpointsTest(t *testing.T, tester DatastoreTester) { verifyCheckpointUpdate(require, afterTouchRevision, changes) } +func WatchEmissionStrategyTest(t *testing.T, tester DatastoreTester) { + require := require.New(t) + + ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 16) + require.NoError(err) + + setupDatastore(ds, require) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + features, err := ds.Features(ctx) + require.NoError(err) + + expectsWatchError := false + if !(features.WatchEmitsImmediately.Status == datastore.FeatureSupported) { + expectsWatchError = true + } + + lowestRevision, err := ds.HeadRevision(ctx) + require.NoError(err) + + changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchOptions{ + Content: datastore.WatchCheckpoints | datastore.WatchRelationships, + CheckpointInterval: 100 * time.Millisecond, + EmissionStrategy: datastore.EmitImmediatelyStrategy, + }) + if expectsWatchError { + require.NotZero(len(errchan)) + err := <-errchan + require.ErrorContains(err, "emit immediately strategy is unsupported") + return + } + require.Zero(len(errchan)) + + // since changes are streamed immediately, we expect changes to be streamed as independent change events, + // whereas with the default emission strategy it would be accumulated and normalized. For examples, the default + // strategy would accumulate everything into a single Change sent over the channel, like an added relationship + // and its associated transaction metadata. The emit immediately strategy will emit both + // the rel touch and tx metadata as independent changes as it won't accumulate and normalize it. + testTuple := tuple.MustParse("document:firstdoc#viewer@user:" + uuid.NewString()) + testMetadata, err := structpb.NewStruct(map[string]any{"foo": "bar"}) + require.NoError(err) + targetRev, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + return rwt.WriteRelationships(ctx, []tuple.RelationshipUpdate{ + tuple.Touch(testTuple), + }) + }, options.WithMetadata(testMetadata)) + require.NoError(err) + + var relTouchEmitted, metadataEmitted, checkpointEmitted bool + changeWait := time.NewTimer(waitForChangesTimeout) + var changeCount int + for { + select { + case change, ok := <-changes: + require.True(ok) + for _, relChange := range change.RelationshipChanges { + if relChange.Relationship == testTuple && relChange.Operation == tuple.UpdateOperationTouch { + relTouchEmitted = true + changeCount++ + require.True(targetRev.Equal(change.Revision)) + } + + continue // we expect each change to come in individual change event + } + + if change.Metadata != nil { + require.Contains(change.Metadata.AsMap(), "foo") + metadataEmitted = true + changeCount++ + require.True(targetRev.Equal(change.Revision)) + continue + } + + if change.IsCheckpoint { + if change.Revision.Equal(targetRev) || change.Revision.GreaterThan(targetRev) { + require.True(metadataEmitted, "expected tx metadata before checkpoint") + require.True(relTouchEmitted, "expected relationship touch before checkpoint") + require.GreaterOrEqual(changeCount, 2, "expected at least 2 changes over the channel") + return + } + } + case <-changeWait.C: + require.Fail("Timed out", "waited for checkpoint") + } + + if relTouchEmitted && metadataEmitted && checkpointEmitted { + return + } + } +} + func verifyCheckpointUpdate( require *require.Assertions, expectedRevision datastore.Revision,