Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrival rate fixes #2038

Merged
merged 3 commits into from
May 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 9 additions & 22 deletions lib/executor/constant_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,25 +231,29 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
returnedVUs := make(chan struct{})
startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, duration, gracefulStop)

vusPool := newActiveVUPool()
defer func() {
// Make sure all VUs aren't executing iterations anymore, for the cancel()
// below to deactivate them.
<-returnedVUs
// first close the vusPool so we wait for the gracefulShutdown
vusPool.Close()
cancel()
activeVUsWg.Wait()
}()
activeVUs := make(chan lib.ActiveVU, maxVUs)
activeVUsCount := uint64(0)

returnVU := func(u lib.InitializedVU) {
car.executionState.ReturnVU(u, true)
activeVUsWg.Done()
}
runIterationBasic := getIterationRunner(car.executionState, car.logger)
activateVU := func(initVU lib.InitializedVU) lib.ActiveVU {
activeVUsWg.Add(1)
activeVU := initVU.Activate(getVUActivationParams(maxDurationCtx, car.config.BaseConfig, returnVU))
car.executionState.ModCurrentlyActiveVUsCount(+1)
atomic.AddUint64(&activeVUsCount, 1)
vusPool.AddVU(maxDurationCtx, activeVU, runIterationBasic)
return activeVU
}

Expand All @@ -258,13 +262,6 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
defer close(makeUnplannedVUCh)
go func() {
defer close(returnedVUs)
defer func() {
// this is done here as to not have an unplannedVU in the middle of initialization when
// starting to return activeVUs
for i := uint64(0); i < atomic.LoadUint64(&activeVUsCount); i++ {
<-activeVUs
}
}()
for range makeUnplannedVUCh {
car.logger.Debug("Starting initialization of an unplanned VU...")
initVU, err := car.executionState.GetUnplannedVU(maxDurationCtx, car.logger)
Expand All @@ -273,7 +270,7 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
car.logger.WithError(err).Error("Error while allocating unplanned VU")
} else {
car.logger.Debug("The unplanned VU finished initializing successfully!")
activeVUs <- activateVU(initVU)
activateVU(initVU)
}
}
}()
Expand All @@ -284,7 +281,7 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
if err != nil {
return err
}
activeVUs <- activateVU(initVU)
activateVU(initVU)
}

vusFmt := pb.GetFixedLengthIntFormat(maxVUs)
Expand All @@ -293,9 +290,8 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
progressFn := func() (float64, []string) {
spent := time.Since(startTime)
currActiveVUs := atomic.LoadUint64(&activeVUsCount)
vusInBuffer := uint64(len(activeVUs))
progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs",
currActiveVUs-vusInBuffer, currActiveVUs)
vusPool.Running(), currActiveVUs)

right := []string{progVUs, duration.String(), progIters}

Expand All @@ -312,12 +308,6 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
car.progress.Modify(pb.WithProgress(progressFn))
go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, &car, progressFn)

runIterationBasic := getIterationRunner(car.executionState, car.logger)
runIteration := func(vu lib.ActiveVU) {
runIterationBasic(maxDurationCtx, vu)
activeVUs <- vu
}

start, offsets, _ := car.et.GetStripedOffsets()
timer := time.NewTimer(time.Hour * 24)
// here the we need the not scaled one
Expand All @@ -335,11 +325,8 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
timer.Reset(t)
select {
case <-timer.C:
select {
case vu := <-activeVUs: // ideally, we get the VU from the buffer without any issues
go runIteration(vu) //TODO: refactor so we dont spin up a goroutine for each iteration
if vusPool.TryRunIteration() {
continue
default: // no free VUs currently available
}

// Since there aren't any free VUs available, consider this iteration
Expand Down
6 changes: 5 additions & 1 deletion lib/executor/ramping_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,11 @@ func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- stats.S
vusPool := newActiveVUPool()

defer func() {
vusPool.Close()
// Make sure all VUs aren't executing iterations anymore, for the cancel()
// below to deactivate them.
<-returnedVUs
// first close the vusPool so we wait for the gracefulShutdown
vusPool.Close()
cancel()
activeVUsWg.Wait()
}()
Expand Down Expand Up @@ -517,15 +518,18 @@ func (p *activeVUPool) Running() uint64 {
// When a new request is accepted the runfn function is executed.
func (p *activeVUPool) AddVU(ctx context.Context, avu lib.ActiveVU, runfn func(context.Context, lib.ActiveVU) bool) {
p.wg.Add(1)
ch := make(chan struct{})
go func() {
defer p.wg.Done()

close(ch)
for range p.iterations {
atomic.AddUint64(&p.running, uint64(1))
runfn(ctx, avu)
atomic.AddUint64(&p.running, ^uint64(0))
}
}()
<-ch
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused by this 😕 I read your explanation in 5df0e6b, but if this extra bit of waiting helps all that much, then we probably have a bigger conceptual problem, since this is not very different from sleep(time.Millisecond). Can you point me to what exactly failed when this wasn't present?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is basically waiting for the goroutine to have started and then it tells you it has Added a VU.

What fails are all the usual TestConstantArrivalRateRunCorrectTiming as shown in https://github.com/k6io/k6/runs/2640090549?check_suite_focus=true, which at least in my testing happens as we :

  1. add all the VUs
  2. try to start an iteration
  3. none of the VUs has already gotten to the line for range p.iterations as it probably didn't even initialize the goroutine yet.

This is particularly easy to reproduce with db89d213 and go test -count 1 -cpu 1 -race -run TestConstantArrivalRateRunCorrectTiming, specifically the -cpu 1 as with -cpu 8 I in practice can't reproduce it - probably as there is a free slot for that goroutine to always start immediately or at least fast enough.

This as we now no longer have a "semapthore" (using the channel activeVUs) but instead have workers. The semaphore was initialized synchronously while the workers are asynchronous for the most part - we just say "start this goroutine" and with this change at least wait for the "start the goroutine" part instead of hoping the go will schedule it fast enough. Which is totally different from what time.Sleep(time....) will do

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point was that if the CPU contention was so heavy, it's possible that a goroutine wouldn't be allowed to run for a long while, even after it was started, so it might not get to run the for loop after it closes the channel 🤷‍♂️

I get how this is different from a Sleep, but as far as I understand, it's not actually locking anything that raced before... So if it failed before, this doesn't guarantee it will work afterwards? I guess it would just greatly reduce the changes of a test failing, which might be good enough, though a better solution would be to improve the test, with #1986 or something else...

So yeah, I don't mind this code, I just didn't understand what it solved.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The go statements explicitly don't wait for the goroutine to start. So with -cpu 1 (what the windows tests effectively do) there is 1 CPU that can execute 1 goroutine and it already is executing the code that initializes the VUs and calls addVU (which starts a goroutine with the go statement) and then it goes and says "begin an iteration" at which point there is nothing guaranteeing that the executing goroutine would've yielded between calling addVU and this moment so that any of the addVU's goroutine would've had the CPU to run on at all at which point the select with default just goes to the default case. This is also not guaranteed if we sleep for an hour or something like that (although I would expect it will happen ;) )

The same way that currently nothing guarantees that after calling close(ch) it will actually get to the range p.iterations without yielding it just makes a lot more likely. (to be honest as far as I know close(ch) should not yield so we should always get to range p.iterations although I have no proof so 🤷, but IMO it makes more sense to execute until you need to block instead of yield whenever ) So much more that I haven't had a failure of this kind at all after these changes and even parallelizing the tests has been more stable (having run them 5 times).

On the other hand, the change to not start a goroutine on each iteration makes it less likely each of those starting of goroutines will be a bit slow (because of GC or ... just CPU contention) and will not manage to execute an iteration within 15ms of the time it needs to, which was the original issue in #1715 instead the currently fixing one where we just miss the first iteration.

While I think #1986 will make this even less possible to fail in the test it will not fix the underlying problem that this patch tries to fix in a production call. As it will still there be nothing to guarantee the goroutine will be waiting on the channel to receive the "start an iteration" event. But it will mask it ;). Also to be honest, given the current congestion on time. calls that I see on each profile I make I expect that adding an interface call in the middle will have .... detrimental outcomes for the performance, but that will need to be checked whenever someone finds the time to rewrite the code to use it.

}

// Close stops the pool from accepting requests
Expand Down