diff --git a/core/sling/task_run.go b/core/sling/task_run.go index 8b7209d8..03e9b7b9 100644 --- a/core/sling/task_run.go +++ b/core/sling/task_run.go @@ -4,9 +4,11 @@ import ( "context" "net/http" "os" + "os/signal" "runtime" "runtime/debug" "strings" + "syscall" "time" _ "net/http/pprof" @@ -609,10 +611,29 @@ func (t *TaskExecution) createIntermediateConfig() *Config { } func (t *TaskExecution) runProtonToProton(srcConn, tgtConn database.Connection) (err error) { + if t.Config.Target.Type == dbio.TypeDbProton && t.Config.Mode == IncrementalMode { + existed, err := database.TableExists(tgtConn, t.Config.Target.Object) + if err != nil { + return g.Error(err, "could not check if final table exists in incremental mode") + } + if !existed { + return g.Error(err, "final table %s not found in incremental mode, please create table %s first", t.Config.Target.Object, t.Config.Target.Object) + } + } + start := time.Now() maxRetries := 3 retryDelay := time.Second * 5 + // Set up interrupt handling + interruptChan := make(chan os.Signal, 1) + signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) + defer signal.Stop(interruptChan) + + // Create a context that can be cancelled + ctx, cancel := context.WithCancel(t.Context.Ctx) + defer cancel() + // Step 1: Create intermediate config t.SetProgress("Creating intermediate configuration") intermediateConfig := t.createIntermediateConfig() @@ -620,14 +641,55 @@ func (t *TaskExecution) runProtonToProton(srcConn, tgtConn database.Connection) err = g.Error("Failed to create intermediate config") return } - defer os.Remove(intermediateConfig.Target.Object) // Clean up the temp file + + cleanup := func() { + if intermediateConfig != nil && intermediateConfig.Target.Object != "" { + os.Remove(intermediateConfig.Target.Object) + } + } + defer cleanup() + + // Function to handle interrupts + go func() { + select { + case <-interruptChan: + t.SetProgress("Interrupt received, cleaning up and exiting") + cancel() + cleanup() + os.Exit(1) + case <-ctx.Done(): + return + } + }() + + // Retry function with cleanup + retryWithCleanup := func(operation func() error) error { + for attempt := 1; attempt <= maxRetries; attempt++ { + select { + case <-ctx.Done(): + return ctx.Err() + default: + err := operation() + if err == nil { + return nil + } + if attempt == maxRetries { + return err + } + t.SetProgress("Attempt %d failed, retrying in %v", attempt, retryDelay) + cleanup() + time.Sleep(retryDelay) + } + } + return g.Error("Max retries reached") + } // Step 2: Run DbToFile with retries t.SetProgress("Exporting data from source Proton database") // t.PBar.Start() originalConfig := t.Config t.Config = intermediateConfig - err = retry(maxRetries, retryDelay, func() error { + err = retryWithCleanup(func() error { return t.runDbToFile() }) t.Config = originalConfig // Restore original config @@ -691,18 +753,14 @@ func (t *TaskExecution) runProtonToProton(srcConn, tgtConn database.Connection) } func retry(attempts int, sleep time.Duration, f func() error) (err error) { - for i := 0; ; i++ { - err = f() - if err == nil { - return + for i := 0; i < attempts; i++ { + if err = f(); err == nil { + return nil } - - if i >= (attempts - 1) { - break + if i < attempts-1 { + time.Sleep(sleep) + g.Debug("Retrying after error: %v", err) } - - time.Sleep(sleep) - g.Debug("Retrying after error: %v", err) } return err }