Skip to content

Commit

Permalink
Migrations: use a txn + commit for each migration, deprecate MigrateTx (
Browse files Browse the repository at this point in the history
riverqueue#600)

This is partly extracted from riverqueue#590. As detailed in that PR, there are certain
combinations of schema changes which are not allowed to be run within the same
transaction. The example we encountered with riverqueue#590 is adding a new enum value,
then using it in an immutable function during a subsequent migration. In
Postgres, these must be separated by a commit.

The migration in that PR exposes an issue with our migration framework, which is
that there are certain schema changes which must be committed before they can be
referenced by subsequent schema changes. The example of this I found is that if
you add a new value to an enum, you cannot later create a function that relies
on that new enum value unless the new value has first been committed. Committing
it within a DDL transaction does _not_ count—it must be a full commit.

IMO this might mean that the entire idea of a `MigrateTx` API is not workable
with certain schema change combinations. At worst, it can result in
unpredictable failures depending on the exact sequence of changes and how many
migrations are being run at once.

As such, in this PR I've deprecated `MigrateTx` and adjusted the migrator so
that it opens a new transaction for each individual migration with a commit
between them. Migrator tests were changed to move away from `MigrateTx` and to a
setup where they get a new clean schema for each test that's disposed at the
end. This makes it possible to test the full sequence of database migrations
with a clean slate each time.

I believe this is the right long-term direction because it's the approach that
other migration libraries use (Rails/ActiveRecord, [Sequel][1], Goose, etc). It
also enables the potential for us to add the ability for individual migrations
to opt _out_ of a having a wrapping transaction, which is essential if we ever
want our default to be `CREATE INDEX CONCURRENTLY` rather than synchronous
indexing (as it really should be for any at-scale system).

[1]: (https://sequel.jeremyevans.net/rdoc/files/doc/migration_rdoc.html#label-Transactions)
  • Loading branch information
bgentry authored and tigrato committed Dec 18, 2024
1 parent 7792ad5 commit 4bbd3a5
Show file tree
Hide file tree
Showing 12 changed files with 302 additions and 155 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
}
```

- **Deprecated**: The `MigrateTx` method of `rivermigrate` has been deprecated. It turns out there are certain combinations of schema changes which cannot be run within a single transaction, and the migrator now prefers to run each migration in its own transaction, one-at-a-time. `MigrateTx` will be removed in future version.

- The migrator now produces a better error in case of a non-existent migration line including suggestions for known migration lines that are similar in name to the invalid one. [PR #558](https://github.com/riverqueue/river/pull/558).

## Fixed
Expand Down
23 changes: 12 additions & 11 deletions rivermigrate/example_migrate_database_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,27 @@ import (
func Example_migrateDatabaseSQL() {
ctx := context.Background()

dbPool, err := sql.Open("pgx", riverinternaltest.DatabaseURL("river_test_example"))
// Use a dedicated Postgres schema for this example so we can migrate and drop it at will:
schemaName := "migration_example_dbsql"
url := riverinternaltest.DatabaseURL("river_test_example") + "&search_path=" + schemaName
dbPool, err := sql.Open("pgx", url)
if err != nil {
panic(err)
}
defer dbPool.Close()

tx, err := dbPool.BeginTx(ctx, nil)
driver := riverdatabasesql.New(dbPool)
migrator, err := rivermigrate.New(driver, nil)
if err != nil {
panic(err)
}
defer tx.Rollback()

migrator, err := rivermigrate.New(riverdatabasesql.New(dbPool), nil)
if err != nil {
// Create the schema used for this example. Drop it when we're done.
// This isn't necessary outside this test.
if _, err := dbPool.ExecContext(ctx, "CREATE SCHEMA IF NOT EXISTS "+schemaName); err != nil {
panic(err)
}

// Our test database starts with a full River schema. Drop it so that we can
// demonstrate working migrations. This isn't necessary outside this test.
dropRiverSchema(ctx, migrator, tx)
defer dropRiverSchema(ctx, driver, schemaName)

printVersions := func(res *rivermigrate.MigrateResult) {
for _, version := range res.Versions {
Expand All @@ -47,7 +48,7 @@ func Example_migrateDatabaseSQL() {

// Migrate to version 3. An actual call may want to omit all MigrateOpts,
// which will default to applying all available up migrations.
res, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{
res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{
TargetVersion: 3,
})
if err != nil {
Expand All @@ -57,7 +58,7 @@ func Example_migrateDatabaseSQL() {

// Migrate down by three steps. Down migrating defaults to running only one
// step unless overridden by an option like MaxSteps or TargetVersion.
res, err = migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
res, err = migrator.Migrate(ctx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
MaxSteps: 3,
})
if err != nil {
Expand Down
32 changes: 17 additions & 15 deletions rivermigrate/example_migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/jackc/pgx/v5/pgxpool"

"github.com/riverqueue/river/internal/riverinternaltest"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivermigrate"
)
Expand All @@ -17,26 +18,29 @@ import (
func Example_migrate() {
ctx := context.Background()

dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_test_example"))
// Use a dedicated Postgres schema for this example so we can migrate and drop it at will:
schemaName := "migration_example"
poolConfig := riverinternaltest.DatabaseConfig("river_test_example")
poolConfig.ConnConfig.RuntimeParams["search_path"] = schemaName

dbPool, err := pgxpool.NewWithConfig(ctx, poolConfig)
if err != nil {
panic(err)
}
defer dbPool.Close()

tx, err := dbPool.Begin(ctx)
driver := riverpgxv5.New(dbPool)
migrator, err := rivermigrate.New(driver, nil)
if err != nil {
panic(err)
}
defer tx.Rollback(ctx)

migrator, err := rivermigrate.New(riverpgxv5.New(dbPool), nil)
if err != nil {
// Create the schema used for this example. Drop it when we're done.
// This isn't necessary outside this test.
if _, err := dbPool.Exec(ctx, "CREATE SCHEMA IF NOT EXISTS "+schemaName); err != nil {
panic(err)
}

// Our test database starts with a full River schema. Drop it so that we can
// demonstrate working migrations. This isn't necessary outside this test.
dropRiverSchema(ctx, migrator, tx)
defer dropRiverSchema(ctx, driver, schemaName)

printVersions := func(res *rivermigrate.MigrateResult) {
for _, version := range res.Versions {
Expand All @@ -46,7 +50,7 @@ func Example_migrate() {

// Migrate to version 3. An actual call may want to omit all MigrateOpts,
// which will default to applying all available up migrations.
res, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{
res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{
TargetVersion: 3,
})
if err != nil {
Expand All @@ -56,7 +60,7 @@ func Example_migrate() {

// Migrate down by three steps. Down migrating defaults to running only one
// step unless overridden by an option like MaxSteps or TargetVersion.
res, err = migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
res, err = migrator.Migrate(ctx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
MaxSteps: 3,
})
if err != nil {
Expand All @@ -73,10 +77,8 @@ func Example_migrate() {
// Migrated [DOWN] version 1
}

func dropRiverSchema[TTx any](ctx context.Context, migrator *rivermigrate.Migrator[TTx], tx TTx) {
_, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
TargetVersion: -1,
})
func dropRiverSchema[TTx any](ctx context.Context, driver riverdriver.Driver[TTx], schemaName string) {
_, err := driver.GetExecutor().Exec(ctx, "DROP SCHEMA IF EXISTS "+schemaName+" CASCADE;")
if err != nil {
panic(err)
}
Expand Down
1 change: 1 addition & 0 deletions rivermigrate/migration/commit_required/001_first.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TYPE foobar;
2 changes: 2 additions & 0 deletions rivermigrate/migration/commit_required/001_first.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- create a foobar enum with values foo, bar:
CREATE TYPE foobar AS ENUM ('foo', 'bar');
1 change: 1 addition & 0 deletions rivermigrate/migration/commit_required/002_second.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- not truly reversible, can't remove enum values.
1 change: 1 addition & 0 deletions rivermigrate/migration/commit_required/002_second.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TYPE foobar ADD VALUE 'baz' AFTER 'bar';
1 change: 1 addition & 0 deletions rivermigrate/migration/commit_required/003_third.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP FUNCTION foobar_in_bitmask;
15 changes: 15 additions & 0 deletions rivermigrate/migration/commit_required/003_third.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE OR REPLACE FUNCTION foobar_in_bitmask(bitmask BIT(8), val foobar)
RETURNS boolean
LANGUAGE SQL
IMMUTABLE
AS $$
SELECT CASE val
WHEN 'foo' THEN get_bit(bitmask, 7)
WHEN 'bar' THEN get_bit(bitmask, 6)
-- Because the enum value 'baz' was added in migration 2 and not part
-- of the original enum, we can't use it in an immutable SQL function
-- unless the new enum value migration has been committed.
WHEN 'baz' THEN get_bit(bitmask, 5)
ELSE 0
END = 1;
$$;
37 changes: 25 additions & 12 deletions rivermigrate/river_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,16 +297,15 @@ func (m *Migrator[TTx]) GetVersion(version int) (Migration, error) {
// // handle error
// }
func (m *Migrator[TTx]) Migrate(ctx context.Context, direction Direction, opts *MigrateOpts) (*MigrateResult, error) {
return dbutil.WithTxV(ctx, m.driver.GetExecutor(), func(ctx context.Context, exec riverdriver.ExecutorTx) (*MigrateResult, error) {
switch direction {
case DirectionDown:
return m.migrateDown(ctx, exec, direction, opts)
case DirectionUp:
return m.migrateUp(ctx, exec, direction, opts)
}
exec := m.driver.GetExecutor()
switch direction {
case DirectionDown:
return m.migrateDown(ctx, exec, direction, opts)
case DirectionUp:
return m.migrateUp(ctx, exec, direction, opts)
}

panic("invalid direction: " + direction)
})
panic("invalid direction: " + direction)
}

// Migrate migrates the database in the given direction (up or down). The opts
Expand All @@ -327,6 +326,9 @@ func (m *Migrator[TTx]) Migrate(ctx context.Context, direction Direction, opts *
// This variant lets a caller run migrations within a transaction. Postgres DDL
// is transactional, so migration changes aren't visible until the transaction
// commits, and are rolled back if the transaction rolls back.
//
// Deprecated: Use Migrate instead. Certain migrations cannot be batched together
// in a single transaction, so this method is not recommended.
func (m *Migrator[TTx]) MigrateTx(ctx context.Context, tx TTx, direction Direction, opts *MigrateOpts) (*MigrateResult, error) {
switch direction {
case DirectionDown:
Expand Down Expand Up @@ -560,10 +562,21 @@ func (m *Migrator[TTx]) applyMigrations(ctx context.Context, exec riverdriver.Ex

if !opts.DryRun {
start := time.Now()
_, err := exec.Exec(ctx, sql)

// Similar to ActiveRecord migrations, we wrap each individual migration
// in its own transaction. Without this, certain migrations that require
// a commit on a preexisting operation (such as adding an enum value to be
// used in an immutable function) cannot succeed.
err := dbutil.WithTx(ctx, exec, func(ctx context.Context, exec riverdriver.ExecutorTx) error {
_, err := exec.Exec(ctx, sql)
if err != nil {
return fmt.Errorf("error applying version %03d [%s]: %w",
versionBundle.Version, strings.ToUpper(string(direction)), err)
}
return nil
})
if err != nil {
return nil, fmt.Errorf("error applying version %03d [%s]: %w",
versionBundle.Version, strings.ToUpper(string(direction)), err)
return nil, err
}
duration = time.Since(start)
}
Expand Down
Loading

0 comments on commit 4bbd3a5

Please sign in to comment.