Skip to content

Commit

Permalink
Make certain no unitilized VU will be left unreturned
Browse files Browse the repository at this point in the history
  • Loading branch information
mstoykov committed Jun 1, 2020
1 parent dc6bb35 commit 5705785
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 34 deletions.
6 changes: 3 additions & 3 deletions lib/executor/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func setupExecutor(t *testing.T, config lib.ExecutorConfig, es *lib.ExecutionSta
et, err := lib.NewExecutionTuple(es.Options.ExecutionSegment, es.Options.ExecutionSegmentSequence)
require.NoError(t, err)

maxVUs := lib.GetMaxPossibleVUs(config.GetExecutionRequirements(et))
initializeVUs(ctx, t, logEntry, es, maxVUs)
maxPlannedVUs := lib.GetMaxPlannedVUs(config.GetExecutionRequirements(et))
initializeVUs(ctx, t, logEntry, es, maxPlannedVUs)

executor, err := config.NewExecutor(es, logEntry)
require.NoError(t, err)
Expand All @@ -79,6 +79,6 @@ func initializeVUs(
for i := uint64(0); i < number; i++ {
vu, err := es.InitializeNewVU(ctx, logEntry)
require.NoError(t, err)
es.AddInitializedVU(vu)
es.ReturnVU(vu, false)
}
}
56 changes: 31 additions & 25 deletions lib/executor/variable_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,21 +299,20 @@ func (varr VariableArrivalRate) Run(ctx context.Context, out chan<- stats.Sample
}).Debug("Starting executor run...")

activeVUsWg := &sync.WaitGroup{}
defer activeVUsWg.Wait()

returnedVUs := make(chan struct{})
startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, duration, gracefulStop)
defer cancel()

// Pre-allocate the VUs local shared buffer
activeVUs := make(chan lib.ActiveVU, maxVUs)
activeVUsCount := uint64(0)
// Make sure all VUs aren't executing iterations anymore, for the cancel()
// above to deactivate them.

defer func() {
for i := uint64(0); i < activeVUsCount; i++ {
<-activeVUs
}
// Make sure all VUs aren't executing iterations anymore, for the cancel()
// above to deactivate them.
<-returnedVUs
cancel()
activeVUsWg.Wait()
}()
activeVUs := make(chan lib.ActiveVU, maxVUs)
activeVUsCount := uint64(0)

activationParams := getVUActivationParams(maxDurationCtx, varr.config.BaseConfig,
func(u lib.InitializedVU) {
Expand All @@ -328,6 +327,28 @@ func (varr VariableArrivalRate) Run(ctx context.Context, out chan<- stats.Sample
return activeVU
}

remainingUnplannedVUs := maxVUs - preAllocatedVUs
makeUnplannedVUCh := make(chan struct{})
defer close(makeUnplannedVUCh)
go func() {
defer close(returnedVUs)
defer func() {
// this is done here as to not have an unplannedVU in the middle of initialization when
// starting to return activeVUs
for i := uint64(0); i < atomic.LoadUint64(&activeVUsCount); i++ {
<-activeVUs
}
}()
for range makeUnplannedVUCh {
initVU, err := varr.executionState.GetUnplannedVU(maxDurationCtx, varr.logger)
if err != nil {
// TODO figure out how to return it to the Run goroutine
varr.logger.WithError(err).Error("Error while allocating unplanned VU")
}
activeVUs <- activateVU(initVU)
}
}()

// Get the pre-allocated VUs in the local buffer
for i := int64(0); i < preAllocatedVUs; i++ {
initVU, err := varr.executionState.GetPlannedVU(varr.logger, false)
Expand Down Expand Up @@ -379,21 +400,6 @@ func (varr VariableArrivalRate) Run(ctx context.Context, out chan<- stats.Sample
activeVUs <- vu
}

remainingUnplannedVUs := maxVUs - preAllocatedVUs
makeUnplannedVUCh := make(chan struct{})
defer close(makeUnplannedVUCh)

go func() {
for range makeUnplannedVUCh {
initVU, err := varr.executionState.GetUnplannedVU(maxDurationCtx, varr.logger)
if err != nil {
// TODO figure out how to return it to the Run goroutine
varr.logger.WithError(err).Error("Error while allocating unplanned VU")
}
activeVUs <- activateVU(initVU)
}
}()

timer := time.NewTimer(time.Hour)
start := time.Now()
ch := make(chan time.Duration, 10) // buffer 10 iteration times ahead
Expand Down
63 changes: 57 additions & 6 deletions lib/executor/variable_arrival_rate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ func TestVariableArrivalRateRunUnplannedVUs(t *testing.T) {
TimeUnit: types.NullDurationFrom(time.Second),
Stages: []Stage{
{
Duration: types.NullDurationFrom(time.Second * 2),
// the minus one makes it so only 9 iterations will be started instead of 10
// as the 10th happens to be just at the end and sometimes doesn't get executed :(
Duration: types.NullDurationFrom(time.Second*2 - 1),
Target: null.IntFrom(10),
},
},
Expand Down Expand Up @@ -187,6 +189,55 @@ func TestVariableArrivalRateRunUnplannedVUs(t *testing.T) {
assert.Equal(t, count, int64(9))
}

func TestVariableArrivalRateUnplannedVUsDontGetLeftBehind(t *testing.T) {
t.Parallel()
et, err := lib.NewExecutionTuple(nil, nil)
require.NoError(t, err)
es := lib.NewExecutionState(lib.Options{}, et, 1, 2)
var count int64
var ch = make(chan struct{}) // closed when new unplannedVU is started and signal to get to next iterations
runner := simpleRunner(func(ctx context.Context) error {
cur := atomic.AddInt64(&count, 1)
if cur == 1 {
<-ch // wait to start again
}

return nil
})
var ctx, cancel, executor, logHook = setupExecutor(
t, VariableArrivalRateConfig{
TimeUnit: types.NullDurationFrom(time.Second),
Stages: []Stage{
{
Duration: types.NullDurationFrom(time.Second * 2),
Target: null.IntFrom(10),
},
},
PreAllocatedVUs: null.IntFrom(1),
MaxVUs: null.IntFrom(3),
},
es, runner)
defer cancel()
var engineOut = make(chan stats.SampleContainer, 1000)
es.SetInitVUFunc(func(_ context.Context, logger *logrus.Entry) (lib.InitializedVU, error) {
t.Log("init")
cur := atomic.LoadInt64(&count)
require.Equal(t, cur, int64(1))
time.Sleep(time.Millisecond * 200)
close(ch)
time.Sleep(time.Millisecond * 20)
cur = atomic.LoadInt64(&count)
require.NotEqual(t, cur, int64(1))

return runner.NewVU(int64(es.GetUniqueVUIdentifier()), engineOut)
})
err = executor.Run(ctx, engineOut)
assert.NoError(t, err)
assert.Empty(t, logHook.Drain())
assert.Equal(t, int64(0), es.GetCurrentlyActiveVUsCount())
assert.Equal(t, int64(2), es.GetInitializedVUsCount())
}

func TestVariableArrivalRateRunCorrectRateWithSlowRate(t *testing.T) {
t.Parallel()
var count int64
Expand Down Expand Up @@ -214,8 +265,8 @@ func TestVariableArrivalRateRunCorrectRateWithSlowRate(t *testing.T) {
Target: null.IntFrom(0),
},
},
PreAllocatedVUs: null.IntFrom(10),
MaxVUs: null.IntFrom(20),
PreAllocatedVUs: null.IntFrom(1),
MaxVUs: null.IntFrom(2),
},
es,
simpleRunner(func(ctx context.Context) error {
Expand All @@ -236,9 +287,9 @@ func TestVariableArrivalRateRunCorrectRateWithSlowRate(t *testing.T) {
defer cancel()
engineOut := make(chan stats.SampleContainer, 1000)
err = executor.Run(ctx, engineOut)
require.NoError(t, err)
require.Equal(t, int64(len(expectedTimes)), count)
require.Empty(t, logHook.Drain())
assert.NoError(t, err)
assert.Equal(t, int64(len(expectedTimes)), count)
assert.Empty(t, logHook.Drain())
}

func mustNewExecutionTuple(seg *lib.ExecutionSegment, seq *lib.ExecutionSegmentSequence) *lib.ExecutionTuple {
Expand Down

0 comments on commit 5705785

Please sign in to comment.