Skip to content

Commit

Permalink
migrate: Actually stop waiting when job complete
Browse files Browse the repository at this point in the history
Co-authored-by: Trong Nguyen <trong.huu.nguyen@nav.no>
  • Loading branch information
mortenlj and tronghn committed Oct 8, 2024
1 parent bbc80fb commit 8e648e5
Showing 1 changed file with 19 additions and 13 deletions.
32 changes: 19 additions & 13 deletions pkg/postgres/migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func (m *Migrator) getJobLogs(ctx context.Context, command Command, jobName stri
for scanner.Scan() {
logChannel <- scanner.Text()
}
close(logChannel)
}
}

Expand All @@ -179,46 +180,51 @@ func (m *Migrator) waitForJobCompletion(ctx context.Context, jobName string, com
logChannel := make(chan string)
go m.getJobLogs(ctx, command, jobName, logChannel)

l, err := m.waitForStartingMessage(logChannel)
startingMessage, err := m.waitForStartingMessage(logChannel)
if err != nil {
return err
}

multi := pterm.DefaultMultiPrinter
logOutput := pterm.DefaultLogger.WithMaxWidth(120).WithWriter(multi.NewWriter())
logOutput.Info(l.Msg)
progress, _ := pterm.DefaultProgressbar.WithTotal(l.MigrationStepsTotal).WithMaxWidth(120).WithWriter(multi.NewWriter()).Start()
logOutput.Info(startingMessage.Msg)
progress, _ := pterm.DefaultProgressbar.WithTotal(startingMessage.MigrationStepsTotal).WithMaxWidth(120).WithWriter(multi.NewWriter()).Start()
multi.Start()
defer multi.Stop()

if m.dryRun {
logOutput.Info(fmt.Sprintf("Dry run: Artificial waiting for job %s/%s to complete, 5 seconds\n", m.cfg.Namespace, jobName))
time.Sleep(5 * time.Second)
progress.Add(l.MigrationStepsTotal)
progress.Add(startingMessage.MigrationStepsTotal)
return nil
}

ctx, cancel := context.WithCancel(ctx)
eg := errgroup.Group{}
eg.Go(func() error {
return m.pollJobCompletion(ctx, jobName, command)
err = m.pollJobCompletion(ctx, jobName, command)
cancel()
return err
})
eg.Go(func() error {
defer cancel()
for line := range logChannel {
err = json.Unmarshal([]byte(line), &l)
le := logEntry{}
err = json.Unmarshal([]byte(line), &le)
if err != nil {
return err
}
if l.MigrationStep > 0 {
progress.Current = l.MigrationStep
progress.UpdateTitle(l.Msg)
if le.MigrationStep > 0 {
progress.Current = le.MigrationStep
progress.UpdateTitle(le.Msg)
} else {
switch strings.ToLower(l.Level) {
switch strings.ToLower(le.Level) {
case "error":
logOutput.Error(l.Msg)
logOutput.Error(le.Msg)
case "warn":
logOutput.Warn(l.Msg)
logOutput.Warn(le.Msg)
default:
logOutput.Info(l.Msg)
logOutput.Info(le.Msg)
}
}
}
Expand Down

0 comments on commit 8e648e5

Please sign in to comment.