Skip to content

Commit

Permalink
introduces WatchOptions.EmissionStrategy to handle limitations of C…
Browse files Browse the repository at this point in the history
…ockroachDB changefeeds.

In CRDB, checkpointing for a span only occurs once its catch-up scan completes. As a result,
resolved timestamps for the changefeed won't be emitted until all spans have been checkpointed
and moved past the initial high-water mark.
Unfortunately, no workarounds exist to make the changefeed emit resolved timestamps during
a catch-up scan. This is a known issue on CRL's radar.

This is important because, without checkpoints, it means changes are accumulated in
memory, and depending on the use case can lead to increased memory pressure and out-of-memory errors

This commit introduces a new change emission strategy option into WatchOptions:
- `EmitWhenCheckpointedStrategy` is the original strategy, which accumulates until a checkpoint is emitted.
- `EmitImmediatelyStrategy` is the new strategy that streams changes right away.
  It's important to note that makes the client responsible of:
  - deduplicating changes
  - ordering revisions
  - accumulate changes until a checkpoint is emitted

Essentially the client is responsible for doing all the work that used to be done with the `Change` struct helper.

For every other datastore, calling Watch API with `EmitImmediatelyStrategy` will return an error.
We don't consider there is an evident need at the moment for other data stores. Once we've proven
this strategy is viable in production workloads, we will study implementing a Watch API strategy that
buffers to disk to address out-of-memory issues, and potentially deprecate the emit immediately
strategy in favor of one that does all the heavy lifting for the client.
  • Loading branch information
vroldanbet committed Nov 7, 2024
1 parent 554777d commit 6c4ceb2
Show file tree
Hide file tree
Showing 13 changed files with 311 additions and 20 deletions.
9 changes: 9 additions & 0 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,9 @@ func (cds *crdbDatastore) OfflineFeatures() (*datastore.Features, error) {
ContinuousCheckpointing: datastore.Feature{
Status: datastore.FeatureSupported,
},
WatchEmitsImmediately: datastore.Feature{
Status: datastore.FeatureSupported,
},
}, nil
}

Expand All @@ -509,6 +512,9 @@ func (cds *crdbDatastore) OfflineFeatures() (*datastore.Features, error) {
ContinuousCheckpointing: datastore.Feature{
Status: datastore.FeatureSupported,
},
WatchEmitsImmediately: datastore.Feature{
Status: datastore.FeatureSupported,
},
}, nil
}

Expand All @@ -532,6 +538,9 @@ func (cds *crdbDatastore) features(ctx context.Context) (*datastore.Features, er
ContinuousCheckpointing: datastore.Feature{
Status: datastore.FeatureSupported,
},
WatchEmitsImmediately: datastore.Feature{
Status: datastore.FeatureSupported,
},
}
if cds.supportsIntegrity {
features.IntegrityData.Status = datastore.FeatureSupported
Expand Down
165 changes: 145 additions & 20 deletions internal/datastore/crdb/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"strings"
"time"

"github.com/jackc/pgx/v5"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/authzed/spicedb/internal/datastore/common"
Expand Down Expand Up @@ -72,15 +74,23 @@ func (cds *crdbDatastore) Watch(ctx context.Context, afterRevision datastore.Rev

features, err := cds.Features(ctx)
if err != nil {
close(updates)
errs <- err
return updates, errs
}

if features.Watch.Status != datastore.FeatureSupported {
close(updates)
errs <- datastore.NewWatchDisabledErr(fmt.Sprintf("%s. See https://spicedb.dev/d/enable-watch-api-crdb", features.Watch.Reason))
return updates, errs
}

if options.EmissionStrategy == datastore.EmitImmediatelyStrategy && !(options.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints) {
close(updates)
errs <- errors.New("EmitImmediatelyStrategy requires WatchCheckpoints to be set")
return updates, errs
}

go cds.watch(ctx, afterRevision, options, updates, errs)

return updates, errs
Expand Down Expand Up @@ -161,10 +171,10 @@ func (cds *crdbDatastore) watch(
watchBufferWriteTimeout = cds.watchBufferWriteTimeout
}

sendChange := func(change *datastore.RevisionChanges) bool {
sendChange := func(change *datastore.RevisionChanges) error {
select {
case updates <- change:
return true
return nil

default:
// If we cannot immediately write, setup the timer and try again.
Expand All @@ -175,11 +185,10 @@ func (cds *crdbDatastore) watch(

select {
case updates <- change:
return true
return nil

case <-timer.C:
errs <- datastore.NewWatchDisconnectedErr()
return false
return datastore.NewWatchDisconnectedErr()
}
}

Expand All @@ -193,7 +202,130 @@ func (cds *crdbDatastore) watch(
// no return value so we're not really losing anything.
defer func() { go changes.Close() }()

tracked := common.NewChanges(revisions.HLCKeyFunc, opts.Content, opts.MaximumBufferedChangesByteSize)
cds.processChanges(ctx, changes, sendError, sendChange, opts, opts.EmissionStrategy == datastore.EmitImmediatelyStrategy)
}

// changeTracker takes care of accumulating received from CockroachDB until a checkpoint is emitted
type changeTracker[R datastore.Revision, K comparable] interface {
FilterAndRemoveRevisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R) ([]datastore.RevisionChanges, error)
AddRelationshipChange(ctx context.Context, rev R, rel tuple.Relationship, op tuple.UpdateOperation) error
AddChangedDefinition(ctx context.Context, rev R, def datastore.SchemaDefinition) error
AddDeletedNamespace(ctx context.Context, rev R, namespaceName string) error
AddDeletedCaveat(ctx context.Context, rev R, caveatName string) error
SetRevisionMetadata(ctx context.Context, rev R, metadata map[string]any) error
}

// streamingChangeProvider is a changeTracker that streams changes as they are processed. Instead of accumulating
// changes in memory before a checkpoint is reached, it leaves the responsibility of accumulating, deduplicating,
// normalizing changes, and waiting for a checkpoints to the caller.
//
// It's used when WatchOptions.EmissionStrategy is set to EmitImmediatelyStrategy.
type streamingChangeProvider struct {
content datastore.WatchContent
sendChange sendChangeFunc
sendError sendErrorFunc
}

func (s streamingChangeProvider) FilterAndRemoveRevisionChanges(_ func(lhs revisions.HLCRevision, rhs revisions.HLCRevision) bool, _ revisions.HLCRevision) ([]datastore.RevisionChanges, error) {
// we do not accumulate in this implementation, but stream right away
return nil, nil
}

func (s streamingChangeProvider) AddRelationshipChange(ctx context.Context, rev revisions.HLCRevision, rel tuple.Relationship, op tuple.UpdateOperation) error {
if s.content&datastore.WatchRelationships != datastore.WatchRelationships {
return nil
}

changes := datastore.RevisionChanges{
Revision: rev,
}
switch op {
case tuple.UpdateOperationCreate:
changes.RelationshipChanges = append(changes.RelationshipChanges, tuple.Create(rel))
case tuple.UpdateOperationTouch:
changes.RelationshipChanges = append(changes.RelationshipChanges, tuple.Touch(rel))
case tuple.UpdateOperationDelete:
changes.RelationshipChanges = append(changes.RelationshipChanges, tuple.Delete(rel))
default:
return spiceerrors.MustBugf("unknown change operation")
}

return s.sendChange(&changes)
}

func (s streamingChangeProvider) AddChangedDefinition(_ context.Context, rev revisions.HLCRevision, def datastore.SchemaDefinition) error {
if s.content&datastore.WatchSchema != datastore.WatchSchema {
return nil
}

changes := datastore.RevisionChanges{
Revision: rev,
ChangedDefinitions: []datastore.SchemaDefinition{def},
}

return s.sendChange(&changes)
}

func (s streamingChangeProvider) AddDeletedNamespace(_ context.Context, rev revisions.HLCRevision, namespaceName string) error {
if s.content&datastore.WatchSchema != datastore.WatchSchema {
return nil
}

changes := datastore.RevisionChanges{
Revision: rev,
DeletedNamespaces: []string{namespaceName},
}

return s.sendChange(&changes)
}

func (s streamingChangeProvider) AddDeletedCaveat(_ context.Context, rev revisions.HLCRevision, caveatName string) error {
if s.content&datastore.WatchSchema != datastore.WatchSchema {
return nil
}

changes := datastore.RevisionChanges{
Revision: rev,
DeletedCaveats: []string{caveatName},
}

return s.sendChange(&changes)
}

func (s streamingChangeProvider) SetRevisionMetadata(_ context.Context, rev revisions.HLCRevision, metadata map[string]any) error {
if len(metadata) > 0 {
parsedMetadata, err := structpb.NewStruct(metadata)
if err != nil {
return spiceerrors.MustBugf("failed to convert metadata to structpb: %v", err)
}

changes := datastore.RevisionChanges{
Revision: rev,
Metadata: parsedMetadata,
}

return s.sendChange(&changes)
}

return nil
}

type (
sendChangeFunc func(change *datastore.RevisionChanges) error
sendErrorFunc func(err error)
)

func (cds *crdbDatastore) processChanges(ctx context.Context, changes pgx.Rows, sendError sendErrorFunc, sendChange sendChangeFunc, opts datastore.WatchOptions, streaming bool) {
var tracked changeTracker[revisions.HLCRevision, revisions.HLCRevision]
if streaming {
tracked = &streamingChangeProvider{
sendChange: sendChange,
sendError: sendError,
content: opts.Content,
}
} else {
tracked = common.NewChanges(revisions.HLCKeyFunc, opts.Content, opts.MaximumBufferedChangesByteSize)
}

for changes.Next() {
var tableNameBytes []byte
Expand Down Expand Up @@ -229,19 +361,22 @@ func (cds *crdbDatastore) watch(

for _, revChange := range filtered {
revChange := revChange
if !sendChange(&revChange) {
if err := sendChange(&revChange); err != nil {
sendError(err)
return
}
}

if opts.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints {
if !sendChange(&datastore.RevisionChanges{
if err := sendChange(&datastore.RevisionChanges{
Revision: rev,
IsCheckpoint: true,
}) {
}); err != nil {
sendError(err)
return
}
}

continue
}

Expand Down Expand Up @@ -427,17 +562,7 @@ func (cds *crdbDatastore) watch(
}

if changes.Err() != nil {
if errors.Is(ctx.Err(), context.Canceled) {
closeCtx, closeCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer closeCancel()
if err := conn.Close(closeCtx); err != nil {
errs <- err
return
}
errs <- datastore.NewWatchCanceledErr()
} else {
errs <- changes.Err()
}
sendError(changes.Err())
return
}
}
3 changes: 3 additions & 0 deletions internal/datastore/memdb/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,9 @@ func (mdb *memdbDatastore) OfflineFeatures() (*datastore.Features, error) {
ContinuousCheckpointing: datastore.Feature{
Status: datastore.FeatureUnsupported,
},
WatchEmitsImmediately: datastore.Feature{
Status: datastore.FeatureUnsupported,
},
}, nil
}

Expand Down
6 changes: 6 additions & 0 deletions internal/datastore/memdb/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, opt
updates := make(chan *datastore.RevisionChanges, watchBufferLength)
errs := make(chan error, 1)

if options.EmissionStrategy == datastore.EmitImmediatelyStrategy {
close(updates)
errs <- errors.New("emit immediately strategy is unsupported in MemDB")
return updates, errs
}

watchBufferWriteTimeout := options.WatchBufferWriteTimeout
if watchBufferWriteTimeout == 0 {
watchBufferWriteTimeout = mdb.watchBufferWriteTimeout
Expand Down
3 changes: 3 additions & 0 deletions internal/datastore/mysql/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,9 @@ func (mds *Datastore) OfflineFeatures() (*datastore.Features, error) {
ContinuousCheckpointing: datastore.Feature{
Status: datastore.FeatureUnsupported,
},
WatchEmitsImmediately: datastore.Feature{
Status: datastore.FeatureUnsupported,
},
}, nil
}

Expand Down
7 changes: 7 additions & 0 deletions internal/datastore/mysql/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,17 @@ func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revi
errs := make(chan error, 1)

if options.Content&datastore.WatchSchema == datastore.WatchSchema {
close(updates)
errs <- errors.New("schema watch unsupported in MySQL")
return updates, errs
}

if options.EmissionStrategy == datastore.EmitImmediatelyStrategy {
close(updates)
errs <- errors.New("emit immediately strategy is unsupported in MySQL")
return updates, errs
}

afterRevision, ok := afterRevisionRaw.(revisions.TransactionIDRevision)
if !ok {
errs <- datastore.NewInvalidRevisionErr(afterRevisionRaw, datastore.CouldNotDetermineRevision)
Expand Down
3 changes: 3 additions & 0 deletions internal/datastore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,9 @@ func (pgd *pgDatastore) OfflineFeatures() (*datastore.Features, error) {
ContinuousCheckpointing: datastore.Feature{
Status: datastore.FeatureUnsupported,
},
WatchEmitsImmediately: datastore.Feature{
Status: datastore.FeatureUnsupported,
},
}, nil
}

Expand Down
7 changes: 7 additions & 0 deletions internal/datastore/postgres/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,17 @@ func (pgd *pgDatastore) Watch(
errs := make(chan error, 1)

if !pgd.watchEnabled {
close(updates)
errs <- datastore.NewWatchDisabledErr("postgres must be run with track_commit_timestamp=on for watch to be enabled. See https://spicedb.dev/d/enable-watch-api-postgres")
return updates, errs
}

if options.EmissionStrategy == datastore.EmitImmediatelyStrategy {
close(updates)
errs <- errors.New("emit immediately strategy is unsupported in Postgres")
return updates, errs
}

afterRevision := afterRevisionRaw.(postgresRevision)
watchSleep := options.CheckpointInterval
if watchSleep < minimumWatchSleep {
Expand Down
3 changes: 3 additions & 0 deletions internal/datastore/spanner/spanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,9 @@ func (sd *spannerDatastore) OfflineFeatures() (*datastore.Features, error) {
ContinuousCheckpointing: datastore.Feature{
Status: datastore.FeatureSupported,
},
WatchEmitsImmediately: datastore.Feature{
Status: datastore.FeatureUnsupported,
},
}, nil
}

Expand Down
6 changes: 6 additions & 0 deletions internal/datastore/spanner/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ func (sd *spannerDatastore) Watch(ctx context.Context, afterRevision datastore.R
updates := make(chan *datastore.RevisionChanges, watchBufferLength)
errs := make(chan error, 1)

if opts.EmissionStrategy == datastore.EmitImmediatelyStrategy {
close(updates)
errs <- errors.New("emit immediately strategy is unsupported in Spanner")
return updates, errs
}

go sd.watch(ctx, afterRevision, opts, updates, errs)

return updates, errs
Expand Down
Loading

0 comments on commit 6c4ceb2

Please sign in to comment.