Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watch revision checkpoints using memdb datastore #1706

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions internal/datastore/memdb/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,14 @@ func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, opt
var stagedUpdates []*datastore.RevisionChanges
var watchChan <-chan struct{}
var err error
stagedUpdates, currentTxn, watchChan, err = mdb.loadChanges(ctx, currentTxn)
stagedUpdates, currentTxn, watchChan, err = mdb.loadChanges(ctx, currentTxn, options)
if err != nil {
errs <- err
return
}

// Write the staged updates to the channel
for _, changeToWrite := range stagedUpdates {
if len(changeToWrite.RelationshipChanges) == 0 {
continue
}

select {
case updates <- changeToWrite:
default:
Expand Down Expand Up @@ -72,7 +68,7 @@ func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, opt
return updates, errs
}

func (mdb *memdbDatastore) loadChanges(_ context.Context, currentTxn int64) ([]*datastore.RevisionChanges, int64, <-chan struct{}, error) {
func (mdb *memdbDatastore) loadChanges(_ context.Context, currentTxn int64, options datastore.WatchOptions) ([]*datastore.RevisionChanges, int64, <-chan struct{}, error) {
mdb.RLock()
defer mdb.RUnlock()

Expand All @@ -88,7 +84,23 @@ func (mdb *memdbDatastore) loadChanges(_ context.Context, currentTxn int64) ([]*
lastRevision := currentTxn
for changeRaw := it.Next(); changeRaw != nil; changeRaw = it.Next() {
change := changeRaw.(*changelog)
changes = append(changes, &change.changes)

if options.Content&datastore.WatchRelationships == datastore.WatchRelationships && len(change.changes.RelationshipChanges) > 0 {
changes = append(changes, &change.changes)
}

if options.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints && change.revisionNanos > lastRevision {
changes = append(changes, &datastore.RevisionChanges{
Revision: revisions.NewForTimestamp(change.revisionNanos),
IsCheckpoint: true,
})
}

if options.Content&datastore.WatchSchema == datastore.WatchSchema &&
len(change.changes.ChangedDefinitions) > 0 || len(change.changes.DeletedCaveats) > 0 || len(change.changes.DeletedNamespaces) > 0 {
changes = append(changes, &change.changes)
}

lastRevision = change.revisionNanos
}

Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/mysql/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestMySQLDatastoreDSNWithoutParseTime(t *testing.T) {
func TestMySQL8Datastore(t *testing.T) {
b := testdatastore.RunMySQLForTestingWithOptions(t, testdatastore.MySQLTesterOptions{MigrateForNewDatastore: true}, "")
dst := datastoreTester{b: b, t: t}
test.AllWithExceptions(t, test.DatastoreTesterFunc(dst.createDatastore), test.WithCategories(test.WatchSchemaCategory))
test.AllWithExceptions(t, test.DatastoreTesterFunc(dst.createDatastore), test.WithCategories(test.WatchSchemaCategory, test.WatchCheckpointsCategory))
additionalMySQLTests(t, b)
}

Expand Down
16 changes: 13 additions & 3 deletions pkg/datastore/test/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,18 @@ func (c Categories) WatchSchema() bool {
return ok
}

func (c Categories) WatchCheckpoints() bool {
_, ok := c[WatchCheckpointsCategory]
return ok
}

var noException = Categories{}

const (
GCCategory = "GC"
WatchCategory = "Watch"
WatchSchemaCategory = "WatchSchema"
GCCategory = "GC"
WatchCategory = "Watch"
WatchSchemaCategory = "WatchSchema"
WatchCheckpointsCategory = "WatchCheckpoints"
)

func WithCategories(cats ...string) Categories {
Expand Down Expand Up @@ -138,6 +144,10 @@ func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories)
t.Run("TestWatchSchema", func(t *testing.T) { WatchSchemaTest(t, tester) })
t.Run("TestWatchAll", func(t *testing.T) { WatchAllTest(t, tester) })
}

if !except.Watch() && !except.WatchCheckpoints() {
t.Run("TestWatchCheckpoints", func(t *testing.T) { WatchCheckpointsTest(t, tester) })
}
}

// All runs all generic datastore tests on a DatastoreTester.
Expand Down
47 changes: 47 additions & 0 deletions pkg/datastore/test/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,3 +607,50 @@ func verifyMixedUpdates(

require.False(expectDisconnect, "all changes verified without expected disconnect")
}

func WatchCheckpointsTest(t *testing.T, tester DatastoreTester) {
require := require.New(t)

ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 16)
require.NoError(err)

setupDatastore(ds, require)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

lowestRevision, err := ds.HeadRevision(ctx)
require.NoError(err)

changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchOptions{
Content: datastore.WatchCheckpoints | datastore.WatchRelationships,
CheckpointInterval: 100 * time.Millisecond,
})
require.Zero(len(errchan))

afterTouchRevision, err := common.WriteTuples(ctx, ds, core.RelationTupleUpdate_TOUCH,
tuple.Parse("document:firstdoc#viewer@user:tom"),
)
require.NoError(err)
verifyCheckpointUpdate(require, afterTouchRevision, changes)
}

func verifyCheckpointUpdate(
require *require.Assertions,
expectedRevision datastore.Revision,
changes <-chan *datastore.RevisionChanges,
) {
changeWait := time.NewTimer(waitForChangesTimeout)
for {
select {
case change, ok := <-changes:
require.True(ok)
if change.IsCheckpoint {
require.True(change.Revision.Equal(change.Revision) || change.Revision.GreaterThan(expectedRevision))
return
}
case <-changeWait.C:
require.Fail("Timed out", "waited for checkpoint")
}
}
}
Loading