Skip to content

Commit

Permalink
Merge pull request #2038 from k6io/arrivalRateFixes
Browse files Browse the repository at this point in the history
Arrival rate fixes
  • Loading branch information
mstoykov committed May 25, 2021
2 parents 5602c42 + 5df0e6b commit 632774a
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 23 deletions.
31 changes: 9 additions & 22 deletions lib/executor/constant_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,25 +231,29 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
returnedVUs := make(chan struct{})
startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, duration, gracefulStop)

vusPool := newActiveVUPool()
defer func() {
// Make sure all VUs aren't executing iterations anymore, for the cancel()
// below to deactivate them.
<-returnedVUs
// first close the vusPool so we wait for the gracefulShutdown
vusPool.Close()
cancel()
activeVUsWg.Wait()
}()
activeVUs := make(chan lib.ActiveVU, maxVUs)
activeVUsCount := uint64(0)

returnVU := func(u lib.InitializedVU) {
car.executionState.ReturnVU(u, true)
activeVUsWg.Done()
}
runIterationBasic := getIterationRunner(car.executionState, car.logger)
activateVU := func(initVU lib.InitializedVU) lib.ActiveVU {
activeVUsWg.Add(1)
activeVU := initVU.Activate(getVUActivationParams(maxDurationCtx, car.config.BaseConfig, returnVU))
car.executionState.ModCurrentlyActiveVUsCount(+1)
atomic.AddUint64(&activeVUsCount, 1)
vusPool.AddVU(maxDurationCtx, activeVU, runIterationBasic)
return activeVU
}

Expand All @@ -258,13 +262,6 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
defer close(makeUnplannedVUCh)
go func() {
defer close(returnedVUs)
defer func() {
// this is done here as to not have an unplannedVU in the middle of initialization when
// starting to return activeVUs
for i := uint64(0); i < atomic.LoadUint64(&activeVUsCount); i++ {
<-activeVUs
}
}()
for range makeUnplannedVUCh {
car.logger.Debug("Starting initialization of an unplanned VU...")
initVU, err := car.executionState.GetUnplannedVU(maxDurationCtx, car.logger)
Expand All @@ -273,7 +270,7 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
car.logger.WithError(err).Error("Error while allocating unplanned VU")
} else {
car.logger.Debug("The unplanned VU finished initializing successfully!")
activeVUs <- activateVU(initVU)
activateVU(initVU)
}
}
}()
Expand All @@ -284,7 +281,7 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
if err != nil {
return err
}
activeVUs <- activateVU(initVU)
activateVU(initVU)
}

vusFmt := pb.GetFixedLengthIntFormat(maxVUs)
Expand All @@ -293,9 +290,8 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
progressFn := func() (float64, []string) {
spent := time.Since(startTime)
currActiveVUs := atomic.LoadUint64(&activeVUsCount)
vusInBuffer := uint64(len(activeVUs))
progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs",
currActiveVUs-vusInBuffer, currActiveVUs)
vusPool.Running(), currActiveVUs)

right := []string{progVUs, duration.String(), progIters}

Expand All @@ -312,12 +308,6 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
car.progress.Modify(pb.WithProgress(progressFn))
go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, &car, progressFn)

runIterationBasic := getIterationRunner(car.executionState, car.logger)
runIteration := func(vu lib.ActiveVU) {
runIterationBasic(maxDurationCtx, vu)
activeVUs <- vu
}

start, offsets, _ := car.et.GetStripedOffsets()
timer := time.NewTimer(time.Hour * 24)
// here the we need the not scaled one
Expand All @@ -335,11 +325,8 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
timer.Reset(t)
select {
case <-timer.C:
select {
case vu := <-activeVUs: // ideally, we get the VU from the buffer without any issues
go runIteration(vu) //TODO: refactor so we dont spin up a goroutine for each iteration
if vusPool.TryRunIteration() {
continue
default: // no free VUs currently available
}

// Since there aren't any free VUs available, consider this iteration
Expand Down
6 changes: 5 additions & 1 deletion lib/executor/ramping_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,11 @@ func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
vusPool := newActiveVUPool()

defer func() {
vusPool.Close()
// Make sure all VUs aren't executing iterations anymore, for the cancel()
// below to deactivate them.
<-returnedVUs
// first close the vusPool so we wait for the gracefulShutdown
vusPool.Close()
cancel()
activeVUsWg.Wait()
}()
Expand Down Expand Up @@ -517,15 +518,18 @@ func (p *activeVUPool) Running() uint64 {
// When a new request is accepted the runfn function is executed.
func (p *activeVUPool) AddVU(ctx context.Context, avu lib.ActiveVU, runfn func(context.Context, lib.ActiveVU) bool) {
p.wg.Add(1)
ch := make(chan struct{})
go func() {
defer p.wg.Done()

close(ch)
for range p.iterations {
atomic.AddUint64(&p.running, uint64(1))
runfn(ctx, avu)
atomic.AddUint64(&p.running, ^uint64(0))
}
}()
<-ch
}

// Close stops the pool from accepting requests
Expand Down

0 comments on commit 632774a

Please sign in to comment.