Skip to content

Commit

Permalink
fix(migration): improve job progress logging (#329)
Browse files Browse the repository at this point in the history
  • Loading branch information
virajbhartiya authored Dec 3, 2024
1 parent 111b58c commit 32f613e
Showing 1 changed file with 9 additions and 11 deletions.
20 changes: 9 additions & 11 deletions migration/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,14 @@ func RunMigration(ctx context.Context, cfg Config, cache MigrationCache, store c
}); err != nil {
return xerrors.Errorf("error iterating actors: %w", err)
}
log.Log(rt.INFO, "Done creating %d migration jobs after %v", jobCount, time.Since(startTime))
log.Log(rt.INFO, "Done creating %d migration jobs after %v", jobCount, time.Since(startTime).Round(100*time.Millisecond))
return nil
})

// Worker threads run jobs.
var workerWg sync.WaitGroup
for i := uint(0); i < cfg.MaxWorkers; i++ {
workerWg.Add(1)
workerId := i
grp.Go(func() error {
defer workerWg.Done()
for job := range jobCh {
Expand All @@ -91,7 +90,6 @@ func RunMigration(ctx context.Context, cfg Config, cache MigrationCache, store c

atomic.AddUint32(&doneCount, 1)
}
log.Log(rt.INFO, "Worker %d done", workerId)
return nil
})
}
Expand All @@ -106,13 +104,13 @@ func RunMigration(ctx context.Context, cfg Config, cache MigrationCache, store c
for {
select {
case <-time.After(cfg.ProgressLogPeriod):
jobsNow := jobCount // Snapshot values to avoid incorrect-looking arithmetic if they change.
doneNow := doneCount
pendingNow := jobsNow - doneNow
jobsNow := atomic.LoadUint32(&jobCount)
doneNow := atomic.LoadUint32(&doneCount)
elapsed := time.Since(startTime)
rate := float64(doneNow) / elapsed.Seconds()
log.Log(rt.INFO, "%d jobs created, %d done, %d pending after %v (%.0f/s)",
jobsNow, doneNow, pendingNow, elapsed, rate)

log.Log(rt.INFO, "Performing migration: %d of %d jobs processed (%.0f/s) [%v elapsed]",
doneNow, jobsNow, rate, elapsed.Round(time.Second))
case <-workersFinished:
return
case <-ctx.Done():
Expand All @@ -127,7 +125,7 @@ func RunMigration(ctx context.Context, cfg Config, cache MigrationCache, store c
workerWg.Wait()
close(jobResultCh)
close(workersFinished)
log.Log(rt.INFO, "All workers done after %v", time.Since(startTime))
log.Log(rt.INFO, "All workers done after %v", time.Since(startTime).Round(100*time.Millisecond))
return nil
})

Expand All @@ -148,7 +146,7 @@ func RunMigration(ctx context.Context, cfg Config, cache MigrationCache, store c
}
resultCount++
}
log.Log(rt.INFO, "Result writer wrote %d results to state tree after %v", resultCount, time.Since(startTime))
log.Log(rt.INFO, "Result writer wrote %d results to state tree after %v", resultCount, time.Since(startTime).Round(100*time.Millisecond))
return nil
})

Expand Down Expand Up @@ -197,7 +195,7 @@ func RunMigration(ctx context.Context, cfg Config, cache MigrationCache, store c

elapsed := time.Since(startTime)
rate := float64(doneCount) / elapsed.Seconds()
log.Log(rt.INFO, "All %d done after %v (%.0f/s)", doneCount, elapsed, rate)
log.Log(rt.INFO, "All %d done after %v (%.0f/s)", doneCount, elapsed.Round(100*time.Millisecond), rate)

return actorsOut, nil
}

0 comments on commit 32f613e

Please sign in to comment.