Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow ramping down of users #1502

Merged
merged 6 commits into from
Aug 14, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 40 additions & 25 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,19 +168,21 @@ def hatch():
logger.debug("%i users hatched" % len(self.user_greenlets))
if bucket:
gevent.sleep(sleep_time)

hatch()
if wait:
self.user_greenlets.join()
logger.info("All users stopped\n")

def stop_users(self, user_count):
def stop_users(self, user_count, stop_rate=None):
"""
Stop a stop_count of weighted users from the Group() object in self.users
Stop `user_count` weighted users at a rate of `stop_rate`
"""
if user_count == 0 or stop_rate == 0:
return

bucket = self.weight_users(user_count)
user_count = len(bucket)
logger.info("Stopping %i users" % user_count)
to_stop = []
for g in self.user_greenlets:
for l in bucket:
Expand All @@ -189,25 +191,38 @@ def stop_users(self, user_count):
to_stop.append(user)
bucket.remove(l)
break
self.stop_user_instances(to_stop)
self.environment.events.hatch_complete.fire(user_count=self.user_count)


def stop_user_instances(self, users):
if self.environment.stop_timeout:
stopping = Group()
for user in users:
if not user.stop(self.user_greenlets, force=False):

if not to_stop:
return

if stop_rate == None or stop_rate >= user_count:
sleep_time = 0
logger.info("Stopping %i users immediately" % (user_count))
else:
sleep_time = 1.0 / stop_rate
logger.info("Stopping %i users at rate of %g users/s" % (user_count, stop_rate))

while True:
user_to_stop = to_stop.pop(random.randint(0, len(to_stop)-1))
logger.debug('Stopping %s' % user_to_stop._greenlet.name)
if self.environment.stop_timeout:
stop_group = Group()
if not user_to_stop.stop(self.user_greenlets, force=False):
# User.stop() returns False if the greenlet was not stopped, so we'll need
# to add it's greenlet to our stopping Group so we can wait for it to finish it's task
stopping.add(user._greenlet)
if not stopping.join(timeout=self.environment.stop_timeout):
logger.info("Not all users finished their tasks & terminated in %s seconds. Stopping them..." % self.environment.stop_timeout)
stopping.kill(block=True)
else:
for user in users:
user.stop(self.user_greenlets, force=True)

stop_group.add(user_to_stop._greenlet)
if not stop_group.join(timeout=self.environment.stop_timeout):
Copy link
Collaborator

@cyberw cyberw Aug 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This timeout will potentially prolong the iteration time for this while loop.

So if you had for example stop_rate 1 and stop_timeout 2, you would end up (if no users stopping within the timeout) taking 3 seconds to complete an iteration (giving you a stop rate of 1/3 users per second).

I dont think that is what our users would would expect.

Ideally these things should be done async (I guess you'll have to spawn "killer greenlets" or something)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I see what you mean.

What is we just do stop_group = Group() outside the loop and then stop_group.kill(block=True) after loop is done?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! Can you add a test for it? I think it is one of those things that could trip us up :) (now or in the future)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can but what exactly is the test? Ramping down with stop_timeout set? How does the test fail? If ramping down doesn't happen? Or it happens too slowly?

If I understand correctly, if stop_timeout is set, we can't really guarantee any ramp down rate accurately, but could do best effort.

logger.info("Not all users finished their tasks & terminated in %s seconds. Stopping them..." % self.environment.stop_timeout)
stop_group.kill(block=True)
else:
user_to_stop.stop(self.user_greenlets, force=True)
if to_stop:
gevent.sleep(sleep_time)
else:
logger.info("%i Users have been stopped" % user_count)
break


def monitor_cpu(self):
process = psutil.Process()
while True:
Expand All @@ -234,13 +249,13 @@ def start(self, user_count, hatch_rate, wait=False):
self.worker_cpu_warning_emitted = False
self.target_user_count = user_count

# Dynamically changing the user count
if self.state != STATE_INIT and self.state != STATE_STOPPED:
logger.debug("Updating running test with %d users, %.2f hatch rate and wait=%r" % (user_count, hatch_rate, wait))
self.state = STATE_HATCHING
if self.user_count > user_count:
# Stop some users
stop_count = self.user_count - user_count
self.stop_users(stop_count)
self.stop_users(stop_count, hatch_rate)
elif self.user_count < user_count:
# Spawn some users
spawn_count = user_count - self.user_count
Expand Down Expand Up @@ -284,10 +299,10 @@ def stop(self):
# if we are currently hatching users we need to kill the hatching greenlet first
if self.hatching_greenlet and not self.hatching_greenlet.ready():
self.hatching_greenlet.kill(block=True)
self.stop_user_instances([g.args[0] for g in self.user_greenlets])
self.stop_users(self.user_count)
self.state = STATE_STOPPED
self.cpu_log_warning()

def quit(self):
"""
Stop any running load test and kill all greenlets for the runner
Expand Down
31 changes: 30 additions & 1 deletion locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,35 @@ def trigger(self):
finally:
timeout.cancel()

def test_stop_users_with_hatch_rate(self):
class MyUser(User):
wait_time = constant(1)
@task
def my_task(self):
pass

environment = Environment(user_classes=[MyUser])
runner = LocalRunner(environment)

# Start a new load test
runner.start(10, 10)
sleep(0.5)

# Update the running test with less users and a slow hatch_rate
runner.start(2, 1)

# Wait a moment and then ensure the user count has started to drop but
# not immediately to user_count
sleep(1)
user_count = len(runner.user_greenlets)
self.assertTrue(user_count > 2, "User count has decreased too quickly: %i" % user_count)
self.assertTrue(user_count < 10, "User count has not decreased at all: %i" % user_count)

# Wait and ensure load test users eventually dropped to desired count
sleep(5)
user_count = len(runner.user_greenlets)
self.assertTrue(user_count == 2, "User count has not decreased correctly to 2, it is : %i" % user_count)


class TestMasterWorkerRunners(LocustTestCase):
def test_distributed_integration_run(self):
Expand Down Expand Up @@ -784,7 +813,7 @@ def test_spawn_fewer_locusts_than_workers(self):
num_users += msg.data["num_users"]

self.assertEqual(2, num_users, "Total number of locusts that would have been spawned is not 2")

def test_spawn_locusts_in_stepload_mode(self):
with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = self.get_runner()
Expand Down