Skip to content

Commit

Permalink
add a flag to allow spicedb to run against non-head migrations
Browse files Browse the repository at this point in the history
fixes #2135
  • Loading branch information
ecordell committed Nov 21, 2024
1 parent d1a7873 commit 621ffac
Show file tree
Hide file tree
Showing 13 changed files with 428 additions and 73 deletions.
42 changes: 42 additions & 0 deletions internal/datastore/common/migrations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package common

import (
"fmt"
"slices"
"strings"

"github.com/authzed/spicedb/pkg/datastore"
)

type MigrationValidator struct {
additionalAllowedMigrations []string
headMigration string
}

func NewMigrationValidator(headMigration string, additionalAllowedMigrations []string) *MigrationValidator {
return &MigrationValidator{
additionalAllowedMigrations: additionalAllowedMigrations,
headMigration: headMigration,
}
}

// MigrationReadyState returns the readiness of the datastore for the given version.
func (mv *MigrationValidator) MigrationReadyState(version string) datastore.ReadyState {
if version == mv.headMigration {
return datastore.ReadyState{IsReady: true}
}
if slices.Contains(mv.additionalAllowedMigrations, version) {
return datastore.ReadyState{IsReady: true}
}
var msgBuilder strings.Builder
msgBuilder.WriteString(fmt.Sprintf("datastore is not migrated: currently at revision %q, but requires %q", version, mv.headMigration))

if len(mv.additionalAllowedMigrations) > 0 {
msgBuilder.WriteString(fmt.Sprintf(" (additional allowed migrations: %v)", mv.additionalAllowedMigrations))
}
msgBuilder.WriteString(". Please run \"spicedb datastore migrate\".")
return datastore.ReadyState{
Message: msgBuilder.String(),
IsReady: false,
}
}
69 changes: 69 additions & 0 deletions internal/datastore/common/migrations_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package common

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/authzed/spicedb/pkg/datastore"
)

func TestMigrationReadyState(t *testing.T) {
tests := []struct {
name string
additionalAllowedMigrations []string
headMigration string
version string
want datastore.ReadyState
}{
{
name: "matches head migration",
headMigration: "initial",
version: "initial",
want: datastore.ReadyState{IsReady: true},
},
{
name: "matches additional migration",
headMigration: "initial",
additionalAllowedMigrations: []string{"additional"},
version: "additional",
want: datastore.ReadyState{IsReady: true},
},
{
name: "matches one of several additional migrations",
headMigration: "initial",
additionalAllowedMigrations: []string{"additional", "additional2", "additional3"},
version: "additional2",
want: datastore.ReadyState{IsReady: true},
},
{
name: "matches head migration when additional migrations are allowed",
headMigration: "initial",
additionalAllowedMigrations: []string{"additional"},
version: "initial",
want: datastore.ReadyState{IsReady: true},
},
{
name: "doesn't match head migration",
headMigration: "initial",
version: "additional",
want: datastore.ReadyState{IsReady: false, Message: `datastore is not migrated: currently at revision "additional", but requires "initial". Please run "spicedb datastore migrate".`},
},
{
name: "doesn't match head migration or additional migrations",
headMigration: "initial",
additionalAllowedMigrations: []string{"additional", "additional2"},
version: "plustwo",
want: datastore.ReadyState{IsReady: false, Message: `datastore is not migrated: currently at revision "plustwo", but requires "initial" (additional allowed migrations: [additional additional2]). Please run "spicedb datastore migrate".`},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mv := &MigrationValidator{
additionalAllowedMigrations: tt.additionalAllowedMigrations,
headMigration: tt.headMigration,
}
require.Equal(t, tt.want, mv.MigrationReadyState(tt.version))
})
}
}
25 changes: 9 additions & 16 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
maxRevisionStaleness := time.Duration(float64(config.revisionQuantization.Nanoseconds())*
config.maxRevisionStalenessPercent) * time.Nanosecond

headMigration, err := migrations.CRDBMigrations.HeadRevision()
if err != nil {
return nil, fmt.Errorf("invalid head migration found for cockroach: %w", err)
}

ds := &crdbDatastore{
RemoteClockRevisions: revisions.NewRemoteClockRevisions(
config.gcWindow,
Expand All @@ -199,6 +204,7 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
config.revisionQuantization,
),
CommonDecoder: revisions.CommonDecoder{Kind: revisions.HybridLogicalClock},
MigrationValidator: common.NewMigrationValidator(headMigration, config.allowedMigrations),
dburl: url,
watchBufferLength: config.watchBufferLength,
watchBufferWriteTimeout: config.watchBufferWriteTimeout,
Expand Down Expand Up @@ -284,6 +290,7 @@ func NewCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
type crdbDatastore struct {
*revisions.RemoteClockRevisions
revisions.CommonDecoder
*common.MigrationValidator

dburl string
readPool, writePool *pool.RetryPool
Expand Down Expand Up @@ -407,11 +414,6 @@ func wrapError(err error) error {
}

func (cds *crdbDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) {
headMigration, err := migrations.CRDBMigrations.HeadRevision()
if err != nil {
return datastore.ReadyState{}, fmt.Errorf("invalid head migration found for cockroach: %w", err)
}

currentRevision, err := migrations.NewCRDBDriver(cds.dburl)
if err != nil {
return datastore.ReadyState{}, err
Expand All @@ -423,17 +425,8 @@ func (cds *crdbDatastore) ReadyState(ctx context.Context) (datastore.ReadyState,
return datastore.ReadyState{}, err
}

// TODO(jschorr): Remove the check for the older migration once we are confident
// that all users have migrated past it.
if version != headMigration && version != "add-caveats" {
return datastore.ReadyState{
Message: fmt.Sprintf(
"datastore is not migrated: currently at revision `%s`, but requires `%s`. Please run `spicedb migrate`.",
version,
headMigration,
),
IsReady: false,
}, nil
if state := cds.MigrationValidator.MigrationReadyState(version); !state.IsReady {
return state, nil
}

readMin := cds.readPool.MinConns()
Expand Down
7 changes: 7 additions & 0 deletions internal/datastore/crdb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type crdbOptions struct {
filterMaximumIDCount uint16
enablePrometheusStats bool
withIntegrity bool
allowedMigrations []string
}

const (
Expand Down Expand Up @@ -338,3 +339,9 @@ func FilterMaximumIDCount(filterMaximumIDCount uint16) Option {
func WithIntegrity(withIntegrity bool) Option {
return func(po *crdbOptions) { po.withIntegrity = withIntegrity }
}

// AllowedMigrations configures a set of additional migrations that will pass
// the health check (head migration is always allowed).
func AllowedMigrations(allowedMigrations []string) Option {
return func(po *crdbOptions) { po.allowedMigrations = allowedMigrations }
}
25 changes: 11 additions & 14 deletions internal/datastore/mysql/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,11 @@ func newMySQLDatastore(ctx context.Context, uri string, replicaIndex int, option
// ID value makes this idempotent (i.e. safe to execute concurrently).
createBaseTxn := fmt.Sprintf("INSERT IGNORE INTO %s (id, timestamp) VALUES (1, FROM_UNIXTIME(1))", driver.RelationTupleTransaction())

headMigration, err := migrations.Manager.HeadRevision()
if err != nil {
return nil, fmt.Errorf("invalid head migration found for mysql: %w", err)
}

gcCtx, cancelGc := context.WithCancel(context.Background())

maxRevisionStaleness := time.Duration(float64(config.revisionQuantization.Nanoseconds())*
Expand Down Expand Up @@ -241,6 +246,7 @@ func newMySQLDatastore(ctx context.Context, uri string, replicaIndex int, option
)

store := &Datastore{
MigrationValidator: common.NewMigrationValidator(headMigration, config.allowedMigrations),
db: db,
driver: driver,
url: uri,
Expand Down Expand Up @@ -504,6 +510,7 @@ func newMySQLExecutor(tx querier) common.ExecuteQueryFunc {

// Datastore is a MySQL-based implementation of the datastore.Datastore interface
type Datastore struct {
*common.MigrationValidator
db *sql.DB
driver *migrations.MySQLDriver
readTxOptions *sql.TxOptions
Expand Down Expand Up @@ -564,15 +571,9 @@ func (mds *Datastore) ReadyState(ctx context.Context) (datastore.ReadyState, err
return datastore.ReadyState{}, err
}

compatible, err := migrations.Manager.IsHeadCompatible(currentMigrationRevision)
if err != nil {
return datastore.ReadyState{}, err
}
if !compatible {
return datastore.ReadyState{
Message: "datastore is not at a currently compatible revision",
IsReady: false,
}, nil
state := mds.MigrationReadyState(currentMigrationRevision)
if !state.IsReady {
return state, nil
}

isSeeded, err := mds.isSeeded(ctx)
Expand All @@ -585,11 +586,7 @@ func (mds *Datastore) ReadyState(ctx context.Context) (datastore.ReadyState, err
IsReady: false,
}, nil
}

return datastore.ReadyState{
Message: "",
IsReady: true,
}, nil
return state, nil
}

func (mds *Datastore) Features(_ context.Context) (*datastore.Features, error) {
Expand Down
7 changes: 7 additions & 0 deletions internal/datastore/mysql/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type mysqlOptions struct {
gcEnabled bool
credentialsProviderName string
filterMaximumIDCount uint16
allowedMigrations []string
}

// Option provides the facility to configure how clients within the
Expand Down Expand Up @@ -262,3 +263,9 @@ func CredentialsProviderName(credentialsProviderName string) Option {
func FilterMaximumIDCount(filterMaximumIDCount uint16) Option {
return func(mo *mysqlOptions) { mo.filterMaximumIDCount = filterMaximumIDCount }
}

// AllowedMigrations configures a set of additional migrations that will pass
// the health check (head migration is always allowed).
func AllowedMigrations(allowedMigrations []string) Option {
return func(mo *mysqlOptions) { mo.allowedMigrations = allowedMigrations }
}
9 changes: 8 additions & 1 deletion internal/datastore/postgres/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ type postgresOptions struct {
gcEnabled bool
readStrictMode bool

migrationPhase string
migrationPhase string
allowedMigrations []string

logger *tracingLogger

Expand Down Expand Up @@ -358,6 +359,12 @@ func MigrationPhase(phase string) Option {
return func(po *postgresOptions) { po.migrationPhase = phase }
}

// AllowedMigrations configures a set of additional migrations that will pass
// the health check (head migration is always allowed).
func AllowedMigrations(allowedMigrations []string) Option {
return func(po *postgresOptions) { po.allowedMigrations = allowedMigrations }
}

// CredentialsProviderName is the name of the CredentialsProvider implementation to use
// for dynamically retrieving the datastore credentials at runtime
//
Expand Down
40 changes: 17 additions & 23 deletions internal/datastore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,11 @@ func newPostgresDatastore(
}
}

headMigration, err := migrations.DatabaseMigrations.HeadRevision()
if err != nil {
return nil, fmt.Errorf("invalid head migration found for postgres: %w", err)
}

gcCtx, cancelGc := context.WithCancel(context.Background())

quantizationPeriodNanos := config.revisionQuantization.Nanoseconds()
Expand Down Expand Up @@ -313,6 +318,7 @@ func newPostgresDatastore(
CachedOptimizedRevisions: revisions.NewCachedOptimizedRevisions(
maxRevisionStaleness,
),
MigrationValidator: common.NewMigrationValidator(headMigration, config.allowedMigrations),
dburl: pgURL,
readPool: pgxcommon.MustNewInterceptorPooler(readPool, config.queryInterceptor),
writePool: nil, /* disabled by default */
Expand Down Expand Up @@ -368,6 +374,7 @@ func newPostgresDatastore(

type pgDatastore struct {
*revisions.CachedOptimizedRevisions
*common.MigrationValidator

dburl string
readPool, writePool pgxcommon.ConnPooler
Expand Down Expand Up @@ -644,11 +651,6 @@ func errorRetryable(err error) bool {
}

func (pgd *pgDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) {
headMigration, err := migrations.DatabaseMigrations.HeadRevision()
if err != nil {
return datastore.ReadyState{}, fmt.Errorf("invalid head migration found for postgres: %w", err)
}

pgDriver, err := migrations.NewAlembicPostgresDriver(ctx, pgd.dburl, pgd.credentialsProvider)
if err != nil {
return datastore.ReadyState{}, err
Expand All @@ -660,25 +662,17 @@ func (pgd *pgDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, e
return datastore.ReadyState{}, err
}

if version == headMigration {
// Ensure a datastore ID is present. This ensures the tables have not been truncated.
uniqueID, err := pgd.datastoreUniqueID(ctx)
if err != nil {
return datastore.ReadyState{}, fmt.Errorf("database validation failed: %w; if you have previously run `TRUNCATE`, this database is no longer valid and must be remigrated. See: https://spicedb.dev/d/truncate-unsupported", err)
}

log.Trace().Str("unique_id", uniqueID).Msg("postgres datastore unique ID")
return datastore.ReadyState{IsReady: true}, nil
state := pgd.MigrationReadyState(version)
if !state.IsReady {
return state, nil
}

return datastore.ReadyState{
Message: fmt.Sprintf(
"datastore is not migrated: currently at revision `%s`, but requires `%s`. Please run `spicedb migrate`. If you have previously run `TRUNCATE`, this database is no longer valid and must be remigrated. See: https://spicedb.dev/d/truncate-unsupported",
version,
headMigration,
),
IsReady: false,
}, nil
// Ensure a datastore ID is present. This ensures the tables have not been truncated.
uniqueID, err := pgd.datastoreUniqueID(ctx)
if err != nil {
return datastore.ReadyState{}, fmt.Errorf("database validation failed: %w; if you have previously run `TRUNCATE`, this database is no longer valid and must be remigrated. See: https://spicedb.dev/d/truncate-unsupported", err)
}
log.Trace().Str("unique_id", uniqueID).Msg("postgres datastore unique ID")
return state, nil
}

func (pgd *pgDatastore) Features(ctx context.Context) (*datastore.Features, error) {
Expand Down
7 changes: 7 additions & 0 deletions internal/datastore/spanner/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type spannerOptions struct {
minSessions uint64
maxSessions uint64
migrationPhase string
allowedMigrations []string
filterMaximumIDCount uint16
}

Expand Down Expand Up @@ -213,6 +214,12 @@ func MigrationPhase(phase string) Option {
return func(po *spannerOptions) { po.migrationPhase = phase }
}

// AllowedMigrations configures a set of additional migrations that will pass
// the health check (head migration is always allowed).
func AllowedMigrations(allowedMigrations []string) Option {
return func(po *spannerOptions) { po.allowedMigrations = allowedMigrations }
}

// FilterMaximumIDCount is the maximum number of IDs that can be used to filter IDs in queries
func FilterMaximumIDCount(filterMaximumIDCount uint16) Option {
return func(po *spannerOptions) { po.filterMaximumIDCount = filterMaximumIDCount }
Expand Down
Loading

0 comments on commit 621ffac

Please sign in to comment.