Skip to content

Commit

Permalink
Runtime : Detach duckdb files with tx lock (#4590)
Browse files Browse the repository at this point in the history
* detach with tx

* detach with tx

* whole rename should be with tx = true

* cast_to_enum fix

* remove file even if attach failed

* fix duckdb to duckdb transfer
  • Loading branch information
k-anshul authored Apr 12, 2024
1 parent 6dbac2c commit e008a51
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 67 deletions.
70 changes: 43 additions & 27 deletions runtime/drivers/duckdb/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ func (c *connection) CreateTableAsSelect(ctx context.Context, name string, view
})
}

return c.WithConnection(ctx, 1, true, false, func(ctx, ensuredCtx context.Context, _ *dbsql.Conn) error {
var cleanupFunc func()
err := c.WithConnection(ctx, 1, true, false, func(ctx, ensuredCtx context.Context, _ *dbsql.Conn) error {
// NOTE: Running mkdir while holding the connection to avoid directory getting cleaned up when concurrent calls to RenameTable cause reopenDB to be called.

// create a new db file in /<instanceid>/<name> directory
Expand All @@ -313,15 +314,15 @@ func (c *connection) CreateTableAsSelect(ctx context.Context, name string, view

// Enforce storage limits
if err := c.execWithLimits(ctx, &drivers.Statement{Query: fmt.Sprintf("CREATE OR REPLACE TABLE %s.default AS (%s\n)", safeSQLName(db), sql)}); err != nil {
c.detachAndRemoveFile(ensuredCtx, db, dbFile)
cleanupFunc = func() { c.detachAndRemoveFile(db, dbFile) }
return fmt.Errorf("create: create %q.default table failed: %w", db, err)
}

// success update version
err = c.updateVersion(name, newVersion)
if err != nil {
// extreme bad luck
c.detachAndRemoveFile(ensuredCtx, db, dbFile)
cleanupFunc = func() { c.detachAndRemoveFile(db, dbFile) }
return fmt.Errorf("create: update version %q failed: %w", newVersion, err)
}

Expand All @@ -335,17 +336,21 @@ func (c *connection) CreateTableAsSelect(ctx context.Context, name string, view
Query: fmt.Sprintf("CREATE OR REPLACE VIEW %s AS %s", safeSQLName(name), qry),
})
if err != nil {
c.detachAndRemoveFile(ensuredCtx, db, dbFile)
cleanupFunc = func() { c.detachAndRemoveFile(db, dbFile) }
return fmt.Errorf("create: create view %q failed: %w", name, err)
}

if oldVersionExists {
oldDB := dbName(name, oldVersion)
// ignore these errors since source has been correctly ingested and attached
c.detachAndRemoveFile(ensuredCtx, oldDB, filepath.Join(sourceDir, fmt.Sprintf("%s.db", oldVersion)))
cleanupFunc = func() { c.detachAndRemoveFile(oldDB, filepath.Join(sourceDir, fmt.Sprintf("%s.db", oldVersion))) }
}
return nil
})
if cleanupFunc != nil {
cleanupFunc()
}
return err
}

// DropTable implements drivers.OLAPStore.
Expand Down Expand Up @@ -381,7 +386,7 @@ func (c *connection) DropTable(ctx context.Context, name string, view bool) erro
})
}

err = c.WithConnection(ctx, 100, true, false, func(ctx, ensuredCtx context.Context, _ *dbsql.Conn) error {
err = c.WithConnection(ctx, 100, true, true, func(ctx, ensuredCtx context.Context, _ *dbsql.Conn) error {
// drop view
err = c.Exec(ctx, &drivers.Statement{Query: fmt.Sprintf("DROP VIEW IF EXISTS %s", safeSQLName(name))})
if err != nil {
Expand Down Expand Up @@ -464,13 +469,6 @@ func (c *connection) RenameTable(ctx context.Context, oldName, newName string, v
return c.dropAndReplace(ctx, oldName, newName, view)
}

// reopen duckdb connections which should delete any temporary files built up during ingestion
// making an empty call so that stop the world call with tx=true is very fast and only blocks for the duration of close and open db hanle call
err = c.WithConnection(ctx, 100, false, true, func(_, _ context.Context, _ *dbsql.Conn) error { return nil })
if err != nil {
return err
}

oldVersionInNewDir, replaceInNewTable, err := c.tableVersion(newName)
if err != nil {
return err
Expand All @@ -479,7 +477,9 @@ func (c *connection) RenameTable(ctx context.Context, oldName, newName string, v
newSrcDir := filepath.Join(c.config.DBStoragePath, newName)
oldSrcDir := filepath.Join(c.config.DBStoragePath, oldName)

return c.WithConnection(ctx, 100, true, false, func(currentCtx, ctx context.Context, conn *dbsql.Conn) error {
// reopen duckdb connections which should delete any temporary files built up during ingestion
// need to do detach using tx=true to isolate it from other queries
err = c.WithConnection(ctx, 100, true, true, func(currentCtx, ctx context.Context, conn *dbsql.Conn) error {
err = os.Mkdir(newSrcDir, fs.ModePerm)
if err != nil && !errors.Is(err, fs.ErrExist) {
return err
Expand Down Expand Up @@ -535,11 +535,17 @@ func (c *connection) RenameTable(ctx context.Context, oldName, newName string, v
return fmt.Errorf("rename: create %q view failed: %w", newName, err)
}

if replaceInNewTable { // new table had some other file previously
c.detachAndRemoveFile(ctx, dbName(newName, oldVersionInNewDir), filepath.Join(newSrcDir, fmt.Sprintf("%s.db", oldVersionInNewDir)))
if !replaceInNewTable {
return nil
}
// new table had some other file previously
if err := c.Exec(ctx, &drivers.Statement{Query: fmt.Sprintf("DETACH %s", safeSQLName(dbName(newName, oldVersionInNewDir)))}); err != nil {
return err
}
removeDBFile(filepath.Join(newSrcDir, fmt.Sprintf("%s.db", oldVersionInNewDir)))
return nil
})
return err
}

func (c *connection) dropAndReplace(ctx context.Context, oldName, newName string, view bool) error {
Expand Down Expand Up @@ -580,12 +586,15 @@ func (c *connection) dropAndReplace(ctx context.Context, oldName, newName string
})
}

func (c *connection) detachAndRemoveFile(ctx context.Context, db, dbFile string) {
err := c.Exec(ctx, &drivers.Statement{Query: fmt.Sprintf("DETACH %s", safeSQLName(db)), Priority: 100})
func (c *connection) detachAndRemoveFile(db, dbFile string) {
err := c.WithConnection(context.Background(), 100, false, true, func(ctx, ensuredCtx context.Context, conn *dbsql.Conn) error {
err := c.Exec(ctx, &drivers.Statement{Query: fmt.Sprintf("DETACH %s", safeSQLName(db)), Priority: 100})
removeDBFile(dbFile)
return err
})
if err != nil {
c.logger.Error("detach failed", zap.String("db", db), zap.Error(err))
}
removeDBFile(dbFile)
}

func (c *connection) tableVersion(name string) (string, bool, error) {
Expand Down Expand Up @@ -696,7 +705,8 @@ func (c *connection) convertToEnum(ctx context.Context, table string, cols []str
newVersion := fmt.Sprint(time.Now().UnixMilli())
newDBFile := filepath.Join(sourceDir, fmt.Sprintf("%s.db", newVersion))
newDB := dbName(table, newVersion)
return c.WithConnection(ctx, 100, true, false, func(ctx, ensuredCtx context.Context, _ *dbsql.Conn) error {
var cleanupFunc func()
err = c.WithConnection(ctx, 100, true, false, func(ctx, ensuredCtx context.Context, _ *dbsql.Conn) error {
// attach new db
err = c.Exec(ctx, &drivers.Statement{Query: fmt.Sprintf("ATTACH %s AS %s", safeSQLString(newDBFile), safeSQLName(newDB))})
if err != nil {
Expand All @@ -709,15 +719,15 @@ func (c *connection) convertToEnum(ctx context.Context, table string, cols []str
// TODO: remove this when https://github.com/duckdb/duckdb/pull/9622 is released
err = c.Exec(ctx, &drivers.Statement{Query: fmt.Sprintf("USE %s", safeSQLName(newDB))})
if err != nil {
c.detachAndRemoveFile(ctx, newDB, newDBFile)
cleanupFunc = func() { c.detachAndRemoveFile(newDB, newDBFile) }
return fmt.Errorf("failed switch db %q: %w", newDB, err)
}
defer func() {
// switch to original db, notice `db.schema` just doing USE db switches context to `main` schema in the current db if doing `USE main`
// we want to switch to original db and schema
err = c.Exec(ensuredCtx, &drivers.Statement{Query: fmt.Sprintf("USE %s.%s", safeSQLName(mainDB), safeSQLName(mainSchema))})
if err != nil {
c.detachAndRemoveFile(ctx, newDB, newDBFile)
cleanupFunc = func() { c.detachAndRemoveFile(newDB, newDBFile) }
// This should NEVER happen
c.fatalInternalError(fmt.Errorf("failed to switch back from db %q: %w", mainDB, err))
}
Expand All @@ -727,7 +737,7 @@ func (c *connection) convertToEnum(ctx context.Context, table string, cols []str
for _, col := range cols {
enum := fmt.Sprintf("%s_enum", col)
if err = c.Exec(ensuredCtx, &drivers.Statement{Query: fmt.Sprintf("CREATE TYPE %s AS ENUM (SELECT DISTINCT %s FROM %s.default WHERE %s IS NOT NULL)", safeSQLName(enum), safeSQLName(col), safeSQLName(oldDB), safeSQLName(col))}); err != nil {
c.detachAndRemoveFile(ctx, newDB, newDBFile)
cleanupFunc = func() { c.detachAndRemoveFile(newDB, newDBFile) }
return fmt.Errorf("failed to create enum %q: %w", enum, err)
}
}
Expand All @@ -740,7 +750,7 @@ func (c *connection) convertToEnum(ctx context.Context, table string, cols []str
selectQry += fmt.Sprintf("* EXCLUDE(%s)", strings.Join(cols, ","))

if err := c.Exec(ensuredCtx, &drivers.Statement{Query: fmt.Sprintf("CREATE OR REPLACE TABLE \"default\" AS SELECT %s FROM %s.default", selectQry, safeSQLName(oldDB))}); err != nil {
c.detachAndRemoveFile(ctx, newDB, newDBFile)
cleanupFunc = func() { c.detachAndRemoveFile(newDB, newDBFile) }
return fmt.Errorf("failed to create table with enum values: %w", err)
}

Expand All @@ -752,19 +762,25 @@ func (c *connection) convertToEnum(ctx context.Context, table string, cols []str

// NOTE :: db name need to be appened in the view query else query fails when switching to main db
if err := c.Exec(ensuredCtx, &drivers.Statement{Query: fmt.Sprintf("CREATE OR REPLACE VIEW %s.%s.%s AS %s", safeSQLName(mainDB), safeSQLName(mainSchema), safeSQLName(table), selectQry)}); err != nil {
c.detachAndRemoveFile(ctx, newDB, newDBFile)
cleanupFunc = func() { c.detachAndRemoveFile(newDB, newDBFile) }
return fmt.Errorf("failed to create view %q: %w", table, err)
}

// update version and detach old db
if err := c.updateVersion(table, newVersion); err != nil {
c.detachAndRemoveFile(ctx, newDB, newDBFile)
cleanupFunc = func() { c.detachAndRemoveFile(newDB, newDBFile) }
return fmt.Errorf("failed to update version: %w", err)
}

c.detachAndRemoveFile(ensuredCtx, oldDB, filepath.Join(sourceDir, fmt.Sprintf("%s.db", oldVersion)))
cleanupFunc = func() {
c.detachAndRemoveFile(oldDB, filepath.Join(sourceDir, fmt.Sprintf("%s.db", oldVersion)))
}
return nil
})
if cleanupFunc != nil {
cleanupFunc()
}
return err
}

// duckDB raises Contents of view were altered: types don't match! error even when number of columns are same but sequence of column changes in underlying table.
Expand Down
17 changes: 13 additions & 4 deletions runtime/drivers/duckdb/transporter_duckDB_to_duckDB.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ func (t *duckDBToDuckDB) Transfer(ctx context.Context, srcProps, sinkProps map[s
}

func (t *duckDBToDuckDB) transferFromExternalDB(ctx context.Context, srcProps *dbSourceProperties, sinkProps *sinkProperties) error {
return t.to.WithConnection(ctx, 1, true, false, func(ctx, ensuredCtx context.Context, _ *sql.Conn) error {
var cleanupFunc func()
err := t.to.WithConnection(ctx, 1, true, false, func(ctx, ensuredCtx context.Context, _ *sql.Conn) error {
res, err := t.to.Execute(ctx, &drivers.Statement{Query: "SELECT current_database(),current_schema();"})
if err != nil {
return err
Expand All @@ -142,11 +143,15 @@ func (t *duckDBToDuckDB) transferFromExternalDB(ctx context.Context, srcProps *d
return fmt.Errorf("failed to attach db %q: %w", srcProps.Database, err)
}

defer func() {
if err = t.to.Exec(ensuredCtx, &drivers.Statement{Query: fmt.Sprintf("DETACH %s;", safeSQLName(dbName))}); err != nil {
// make sure that dbName is not updated in any code below
cleanupFunc = func() {
err := t.to.WithConnection(context.Background(), 100, false, true, func(wrappedCtx, ensuredCtx context.Context, conn *sql.Conn) error {
return t.to.Exec(ensuredCtx, &drivers.Statement{Query: fmt.Sprintf("DETACH %s;", safeSQLName(dbName))})
})
if err != nil {
t.logger.Error("failed to detach db", zap.Error(err))
}
}()
}

if err := t.to.Exec(ctx, &drivers.Statement{Query: fmt.Sprintf("USE %s;", safeName(dbName))}); err != nil {
return err
Expand All @@ -163,6 +168,10 @@ func (t *duckDBToDuckDB) transferFromExternalDB(ctx context.Context, srcProps *d
query := fmt.Sprintf("CREATE OR REPLACE TABLE %s.%s.%s AS (%s\n);", safeName(localDB), safeName(localSchema), safeName(sinkProps.Table), userQuery)
return t.to.Exec(ctx, &drivers.Statement{Query: query})
})
if cleanupFunc != nil {
cleanupFunc()
}
return err
}

// rewriteLocalPaths rewrites a DuckDB SQL statement such that relative paths become absolute paths relative to the basePath,
Expand Down
3 changes: 0 additions & 3 deletions runtime/reconcilers/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,6 @@ func (r *ModelReconciler) Reconcile(ctx context.Context, n *runtimev1.ResourceNa
createErr := r.createModel(ctx, self, stagingTableName, !materialize)
if createErr != nil {
createErr = fmt.Errorf("failed to create model: %w", createErr)
} else if !r.C.Runtime.AllowHostAccess() {
// temporarily for debugging
logTableNameAndType(ctx, r.C, connector, stagingTableName)
}

if createErr == nil && stage {
Expand Down
3 changes: 0 additions & 3 deletions runtime/reconcilers/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,6 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, n *runtimev1.ResourceN
ingestErr := r.ingestSource(ctx, self, stagingTableName)
if ingestErr != nil {
ingestErr = fmt.Errorf("failed to ingest source: %w", ingestErr)
} else if !r.C.Runtime.AllowHostAccess() {
// temporarily for debugging
logTableNameAndType(ctx, r.C, connector, stagingTableName)
}

if ingestErr == nil && src.Spec.StageChanges {
Expand Down
30 changes: 0 additions & 30 deletions runtime/reconcilers/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
compilerv1 "github.com/rilldata/rill/runtime/compilers/rillv1"
"github.com/rilldata/rill/runtime/drivers"
"github.com/robfig/cron/v3"
"go.uber.org/zap"
)

// checkRefs checks that all refs exist, are idle, and have no errors.
Expand Down Expand Up @@ -171,35 +170,6 @@ func olapForceRenameTable(ctx context.Context, c *runtime.Controller, connector,
return olap.RenameTable(ctx, fromName, toName, fromIsView)
}

func logTableNameAndType(ctx context.Context, c *runtime.Controller, connector, name string) {
olap, release, err := c.AcquireOLAP(ctx, connector)
if err != nil {
c.Logger.Warn("LogTableNameAndType: failed to acquire OLAP", zap.Error(err))
return
}
defer release()

res, err := olap.Execute(context.Background(), &drivers.Statement{Query: "SELECT column_name, data_type FROM information_schema.columns WHERE table_name=? ORDER BY column_name ASC", Args: []any{name}})
if err != nil {
c.Logger.Warn("LogTableNameAndType: failed information_schema.columns", zap.Error(err))
return
}
defer res.Close()

colTyp := make([]string, 0)
var col, typ string
for res.Next() {
err = res.Scan(&col, &typ)
if err != nil {
c.Logger.Warn("LogTableNameAndType: failed scan", zap.Error(err))
return
}
colTyp = append(colTyp, fmt.Sprintf("%s:%s", col, typ))
}

c.Logger.Info("LogTableNameAndType: ", zap.String("name", name), zap.String("schema", strings.Join(colTyp, ", ")))
}

func resolveTemplatedProps(ctx context.Context, c *runtime.Controller, self compilerv1.TemplateResource, props map[string]any) (map[string]any, error) {
inst, err := c.Runtime.Instance(ctx, c.InstanceID)
if err != nil {
Expand Down

0 comments on commit e008a51

Please sign in to comment.