diff --git a/das/checkpoint.go b/das/checkpoint.go index a38eca828c..bb023a19da 100644 --- a/das/checkpoint.go +++ b/das/checkpoint.go @@ -23,15 +23,15 @@ type workerCheckpoint struct { func newCheckpoint(stats SamplingStats) checkpoint { workers := make([]workerCheckpoint, 0, len(stats.Workers)) for _, w := range stats.Workers { - // no need to store retry jobs, since they will resume from failed heights map - if w.JobType == retryJob { - continue + // no need to resume recent jobs after restart. On the other hand, retry jobs will resume from + // failed heights map. it leaves only catchup jobs to be stored and resumed + if w.JobType == catchupJob { + workers = append(workers, workerCheckpoint{ + From: w.Curr, + To: w.To, + JobType: w.JobType, + }) } - workers = append(workers, workerCheckpoint{ - From: w.Curr, - To: w.To, - JobType: w.JobType, - }) } return checkpoint{ SampleFrom: stats.CatchupHead + 1, diff --git a/das/coordinator.go b/das/coordinator.go index 2184dce2c8..852a40d24d 100644 --- a/das/coordinator.go +++ b/das/coordinator.go @@ -81,7 +81,9 @@ func (sc *samplingCoordinator) run(ctx context.Context, cp checkpoint) { select { case head := <-sc.updHeadCh: if sc.state.isNewHead(head.Height()) { - sc.runWorker(ctx, sc.state.recentJob(head)) + if !sc.recentJobsLimitReached() { + sc.runWorker(ctx, sc.state.recentJob(head)) + } sc.state.updateHead(head.Height()) // run worker without concurrency limit restrictions to reduced delay sc.metrics.observeNewHead(ctx) @@ -146,3 +148,8 @@ func (sc *samplingCoordinator) getCheckpoint(ctx context.Context) (checkpoint, e func (sc *samplingCoordinator) concurrencyLimitReached() bool { return len(sc.state.inProgress) >= sc.concurrencyLimit } + +// recentJobsLimitReached indicates whether concurrency limit for recent jobs has been reached +func (sc *samplingCoordinator) recentJobsLimitReached() bool { + return len(sc.state.inProgress) >= 2*sc.concurrencyLimit +}