Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(migration): improve job progress logging in MigrateStateTree #329

Merged
merged 7 commits into from
Dec 3, 2024
20 changes: 9 additions & 11 deletions migration/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,14 @@ func RunMigration(ctx context.Context, cfg Config, cache MigrationCache, store c
}); err != nil {
return xerrors.Errorf("error iterating actors: %w", err)
}
log.Log(rt.INFO, "Done creating %d migration jobs after %v", jobCount, time.Since(startTime))
log.Log(rt.INFO, "Done creating %d migration jobs after %v", jobCount, time.Since(startTime).Round(100*time.Millisecond))
return nil
})

// Worker threads run jobs.
var workerWg sync.WaitGroup
for i := uint(0); i < cfg.MaxWorkers; i++ {
workerWg.Add(1)
workerId := i
grp.Go(func() error {
defer workerWg.Done()
for job := range jobCh {
Expand All @@ -91,7 +90,6 @@ func RunMigration(ctx context.Context, cfg Config, cache MigrationCache, store c

atomic.AddUint32(&doneCount, 1)
}
log.Log(rt.INFO, "Worker %d done", workerId)
return nil
})
}
Expand All @@ -106,13 +104,13 @@ func RunMigration(ctx context.Context, cfg Config, cache MigrationCache, store c
for {
select {
case <-time.After(cfg.ProgressLogPeriod):
jobsNow := jobCount // Snapshot values to avoid incorrect-looking arithmetic if they change.
doneNow := doneCount
pendingNow := jobsNow - doneNow
jobsNow := atomic.LoadUint32(&jobCount)
doneNow := atomic.LoadUint32(&doneCount)
elapsed := time.Since(startTime)
rate := float64(doneNow) / elapsed.Seconds()
log.Log(rt.INFO, "%d jobs created, %d done, %d pending after %v (%.0f/s)",
jobsNow, doneNow, pendingNow, elapsed, rate)

log.Log(rt.INFO, "Performing migration: %d of %d jobs processed (%.0f/s) [%v elapsed]",
doneNow, jobsNow, rate, elapsed.Round(time.Second))
case <-workersFinished:
return
case <-ctx.Done():
Expand All @@ -127,7 +125,7 @@ func RunMigration(ctx context.Context, cfg Config, cache MigrationCache, store c
workerWg.Wait()
close(jobResultCh)
close(workersFinished)
log.Log(rt.INFO, "All workers done after %v", time.Since(startTime))
log.Log(rt.INFO, "All workers done after %v", time.Since(startTime).Round(100*time.Millisecond))
return nil
})

Expand All @@ -148,7 +146,7 @@ func RunMigration(ctx context.Context, cfg Config, cache MigrationCache, store c
}
resultCount++
}
log.Log(rt.INFO, "Result writer wrote %d results to state tree after %v", resultCount, time.Since(startTime))
log.Log(rt.INFO, "Result writer wrote %d results to state tree after %v", resultCount, time.Since(startTime).Round(100*time.Millisecond))
return nil
})

Expand Down Expand Up @@ -197,7 +195,7 @@ func RunMigration(ctx context.Context, cfg Config, cache MigrationCache, store c

elapsed := time.Since(startTime)
rate := float64(doneCount) / elapsed.Seconds()
log.Log(rt.INFO, "All %d done after %v (%.0f/s)", doneCount, elapsed, rate)
log.Log(rt.INFO, "All %d done after %v (%.0f/s)", doneCount, elapsed.Round(100*time.Millisecond), rate)

return actorsOut, nil
}
Loading