Skip to content

Commit

Permalink
Split varriable arrival rate Run method ... not very well
Browse files Browse the repository at this point in the history
  • Loading branch information
mstoykov committed Jun 1, 2020
1 parent 5705785 commit 8c25045
Showing 1 changed file with 131 additions and 102 deletions.
233 changes: 131 additions & 102 deletions lib/executor/variable_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,95 +278,110 @@ func (varc VariableArrivalRateConfig) cal(et *lib.ExecutionTuple, ch chan<- time
}

// Run executes a variable number of iterations per second.
func (varr VariableArrivalRate) Run(ctx context.Context, out chan<- stats.SampleContainer) (err error) { //nolint:funlen
func (varr VariableArrivalRate) Run(ctx context.Context, out chan<- stats.SampleContainer) (err error) {
segment := varr.executionState.ExecutionTuple.Segment
gracefulStop := varr.config.GetGracefulStop()
duration := sumStagesDuration(varr.config.Stages)
preAllocatedVUs := varr.config.GetPreAllocatedVUs(varr.executionState.ExecutionTuple)
maxVUs := varr.config.GetMaxVUs(varr.executionState.ExecutionTuple)

vm := &vuManager{
executionState: varr.executionState,
logger: varr.logger,
maxVUs: maxVUs,
activeVUs: make(chan lib.ActiveVU, maxVUs),
activeVUsWg: &sync.WaitGroup{},
returnedVUs: make(chan struct{}),
makeUnplannedVUCh: make(chan struct{}),
runIterationBasic: getIterationRunner(varr.executionState, varr.logger),
remainingUnplannedVUs: maxVUs - preAllocatedVUs,
}

// TODO: refactor and simplify
timeUnit := time.Duration(varr.config.TimeUnit.Duration)
startArrivalRate := getScaledArrivalRate(segment, varr.config.StartRate.Int64, timeUnit)
maxUnscaledRate := getStagesUnscaledMaxTarget(varr.config.StartRate.Int64, varr.config.Stages)
maxArrivalRatePerSec, _ := getArrivalRatePerSec(getScaledArrivalRate(segment, maxUnscaledRate, timeUnit)).Float64()
startTickerPeriod := getTickerPeriod(startArrivalRate)

// Make sure the log and the progress bar have accurate information
varr.logger.WithFields(logrus.Fields{
"maxVUs": maxVUs, "preAllocatedVUs": preAllocatedVUs, "duration": duration, "numStages": len(varr.config.Stages),
"maxVUs": vm.maxVUs, "preAllocatedVUs": preAllocatedVUs, "duration": duration, "numStages": len(varr.config.Stages),
"startTickerPeriod": startTickerPeriod.Duration, "type": varr.config.GetType(),
}).Debug("Starting executor run...")

activeVUsWg := &sync.WaitGroup{}

returnedVUs := make(chan struct{})
startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, duration, gracefulStop)
vm.ctx = maxDurationCtx
// Pre-allocate the VUs local shared buffer

defer func() {
// Make sure all VUs aren't executing iterations anymore, for the cancel()
// above to deactivate them.
<-returnedVUs
close(vm.makeUnplannedVUCh)
<-vm.returnedVUs
cancel()
activeVUsWg.Wait()
vm.activeVUsWg.Wait()
}()
activeVUs := make(chan lib.ActiveVU, maxVUs)
activeVUsCount := uint64(0)

activationParams := getVUActivationParams(maxDurationCtx, varr.config.BaseConfig,
func(u lib.InitializedVU) {
varr.executionState.ReturnVU(u, true)
activeVUsWg.Done()
})
activateVU := func(initVU lib.InitializedVU) lib.ActiveVU {
activeVUsWg.Add(1)
activeVU := initVU.Activate(activationParams)
varr.executionState.ModCurrentlyActiveVUsCount(+1)
atomic.AddUint64(&activeVUsCount, 1)
return activeVU

go vm.unplannedVUMaking()
// Get the pre-allocated VUs in the local buffer
if err := vm.getPreallocatedVUs(preAllocatedVUs); err != nil {
return err
}

remainingUnplannedVUs := maxVUs - preAllocatedVUs
makeUnplannedVUCh := make(chan struct{})
defer close(makeUnplannedVUCh)
go func() {
defer close(returnedVUs)
defer func() {
// this is done here as to not have an unplannedVU in the middle of initialization when
// starting to return activeVUs
for i := uint64(0); i < atomic.LoadUint64(&activeVUsCount); i++ {
<-activeVUs
}
}()
for range makeUnplannedVUCh {
initVU, err := varr.executionState.GetUnplannedVU(maxDurationCtx, varr.logger)
if err != nil {
// TODO figure out how to return it to the Run goroutine
varr.logger.WithError(err).Error("Error while allocating unplanned VU")
tickerPeriod := int64(startTickerPeriod.Duration)
progressFn := varr.progressFn(vm, timeUnit, duration, startTime, &tickerPeriod)
varr.progress.Modify(pb.WithProgress(progressFn))
go trackProgress(ctx, maxDurationCtx, regDurationCtx, varr, progressFn)

timer := time.NewTimer(time.Hour)
start := time.Now()
ch := make(chan time.Duration, 10) // buffer 10 iteration times ahead
var prevTime time.Duration
go varr.config.cal(varr.executionState.ExecutionTuple, ch)
for nextTime := range ch {
atomic.StoreInt64(&tickerPeriod, int64(nextTime-prevTime))
prevTime = nextTime
b := time.Until(start.Add(nextTime))
if b > 0 { // TODO: have a minimal ?
timer.Reset(b)
select {
case <-timer.C:
case <-regDurationCtx.Done():
return nil
}
activeVUs <- activateVU(initVU)
}
}()

// Get the pre-allocated VUs in the local buffer
for i := int64(0); i < preAllocatedVUs; i++ {
initVU, err := varr.executionState.GetPlannedVU(varr.logger, false)
if err != nil {
return err
}
activeVUs <- activateVU(initVU)
vm.startIteration()
}
return nil
}

tickerPeriod := int64(startTickerPeriod.Duration)
type vuManager struct { // TODO: add a factory
executionState *lib.ExecutionState
ctx context.Context
activeVUsCount uint64
activeVUs chan lib.ActiveVU
activeVUsWg *sync.WaitGroup
makeUnplannedVUCh chan struct{}
returnedVUs chan struct{}
remainingUnplannedVUs int64
maxVUs int64
logger *logrus.Entry
runIterationBasic func(context.Context, lib.ActiveVU)
}

vusFmt := pb.GetFixedLengthIntFormat(maxVUs)
itersFmt := pb.GetFixedLengthFloatFormat(maxArrivalRatePerSec, 0) + " iters/s"
func (varr VariableArrivalRate) progressFn(
vm *vuManager, timeUnit, duration time.Duration, startTime time.Time, tickerPeriod *int64,
) func() (float64, []string) {
segment := varr.executionState.ExecutionTuple.Segment

progresFn := func() (float64, []string) {
currActiveVUs := atomic.LoadUint64(&activeVUsCount)
currentTickerPeriod := atomic.LoadInt64(&tickerPeriod)
vusInBuffer := uint64(len(activeVUs))
maxUnscaledRate := getStagesUnscaledMaxTarget(varr.config.StartRate.Int64, varr.config.Stages)
maxArrivalRatePerSec, _ := getArrivalRatePerSec(getScaledArrivalRate(segment, maxUnscaledRate, timeUnit)).Float64()
vusFmt := pb.GetFixedLengthIntFormat(vm.maxVUs)
itersFmt := pb.GetFixedLengthFloatFormat(maxArrivalRatePerSec, 0) + " iters/s"
return func() (float64, []string) {
currActiveVUs := atomic.LoadUint64(&vm.activeVUsCount)
currentTickerPeriod := atomic.LoadInt64(tickerPeriod)
vusInBuffer := uint64(len(vm.activeVUs))
progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs",
currActiveVUs-vusInBuffer, currActiveVUs)

Expand All @@ -389,61 +404,75 @@ func (varr VariableArrivalRate) Run(ctx context.Context, out chan<- stats.Sample

return math.Min(1, float64(spent)/float64(duration)), right
}
}

varr.progress.Modify(pb.WithProgress(progresFn))
go trackProgress(ctx, maxDurationCtx, regDurationCtx, varr, progresFn)

regDurationDone := regDurationCtx.Done()
runIterationBasic := getIterationRunner(varr.executionState, varr.logger)
runIteration := func(vu lib.ActiveVU) {
runIterationBasic(maxDurationCtx, vu)
activeVUs <- vu
}

timer := time.NewTimer(time.Hour)
start := time.Now()
ch := make(chan time.Duration, 10) // buffer 10 iteration times ahead
var prevTime time.Duration
go varr.config.cal(varr.executionState.ExecutionTuple, ch)
for nextTime := range ch {
select {
case <-regDurationDone:
return nil
default:
func (vm *vuManager) unplannedVUMaking() {
defer close(vm.returnedVUs)
defer func() {
// this is done here as to not have an unplannedVU in the middle of initialization when
// starting to return activeVUs
for i := uint64(0); i < atomic.LoadUint64(&vm.activeVUsCount); i++ {
<-vm.activeVUs
}
atomic.StoreInt64(&tickerPeriod, int64(nextTime-prevTime))
prevTime = nextTime
b := time.Until(start.Add(nextTime))
if b > 0 { // TODO: have a minimal ?
timer.Reset(b)
select {
case <-timer.C:
case <-regDurationDone:
return nil
}
}()
for range vm.makeUnplannedVUCh {
initVU, err := vm.executionState.GetUnplannedVU(vm.ctx, vm.logger)
if err != nil {
// TODO figure out how to return it to the Run goroutine
vm.logger.WithError(err).Error("Error while allocating unplanned VU")
}
vm.activeVUs <- vm.activateVU(initVU)
}
}

var vu lib.ActiveVU
select {
case vu = <-activeVUs:
// ideally, we get the VU from the buffer without any issues
default:
if remainingUnplannedVUs == 0 {
// TODO: emit an error metric?
varr.logger.Warningf("Insufficient VUs, reached %d active VUs and cannot allocate more", maxVUs)
continue
}

func (vm *vuManager) startIteration() {
var vu lib.ActiveVU
select {
case vu = <-vm.activeVUs:
// ideally, we get the VU from the buffer without any issues
default:
if vm.remainingUnplannedVUs == 0 {
//TODO: emit an error metric?
vm.logger.Warningf("Insufficient VUs, reached %d active VUs and cannot allocate more", vm.maxVUs)
vu = <-vm.activeVUs // we just block while waiting to get a free vu
} else {
select {
case makeUnplannedVUCh <- struct{}{}:
case vm.makeUnplannedVUCh <- struct{}{}:
// this is the only goroutine that touches remainingUnplannedVUs and if we didn't
// send on the channel no new unplannedVU will be stared so no need to decrease it
remainingUnplannedVUs--
vu = <-activeVUs // just get any VU that gets activated, whether it is the unplanned or not doesn't matter
case vu = <-activeVUs: // a VU got freed while were waiting to start a new unplanned one
vm.remainingUnplannedVUs--
vu = <-vm.activeVUs // just get any VU that gets activated, whether it is the unplanned or not doesn't matter
case vu = <-vm.activeVUs: // a VU got freed while were waiting to start a new unplanned one
}
}
go runIteration(vu)
}
go func(vu lib.ActiveVU) {
vm.runIterationBasic(vm.ctx, vu)
vm.activeVUs <- vu
}(vu)
}

func (vm *vuManager) getPreallocatedVUs(preAllocatedVUs int64) error {
for i := int64(0); i < preAllocatedVUs; i++ {
initVU, err := vm.executionState.GetPlannedVU(vm.logger, false)
if err != nil {
return err
}
vm.activeVUs <- vm.activateVU(initVU)
}
return nil
}

func (vm *vuManager) activateVU(initVU lib.InitializedVU) lib.ActiveVU {
vm.activeVUsWg.Add(1)
activeVU := initVU.Activate(&lib.VUActivationParams{
RunContext: vm.ctx,
DeactivateCallback: func(initVU lib.InitializedVU) {
vm.executionState.ReturnVU(initVU, true)
vm.activeVUsWg.Done()
},
})
vm.executionState.ModCurrentlyActiveVUsCount(+1)
atomic.AddUint64(&vm.activeVUsCount, 1)
return activeVU
}

0 comments on commit 8c25045

Please sign in to comment.