Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable statistically meaningful constant RPS load generation distributions #1281

Closed
wants to merge 9 commits into from
2 changes: 1 addition & 1 deletion locust/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .core import HttpLocust, Locust, TaskSet, TaskSequence, task, seq_task
from .exception import InterruptTaskSet, ResponseError, RescheduleTaskImmediately
from .wait_time import between, constant, constant_pacing
from .wait_time import between, constant, constant_pacing, poisson, constant_uniform
from .event import Events
events = Events()

Expand Down
3 changes: 3 additions & 0 deletions locust/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,9 @@ class ForumPage(TaskSet):
tasks = {ThreadPage:15, write_post:1}
"""

id = 0
"""ID of this locust object used for spacing out tasks in time."""

weight = 10
"""Probability of locust being chosen. The higher the weight, the greater is the chance of it being chosen."""

Expand Down
31 changes: 31 additions & 0 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,19 +128,24 @@ def spawn_locusts(self, spawn_count, hatch_rate, wait=False):

def hatch():
sleep_time = 1.0 / hatch_rate
hatch_count = 0
while True:
if not bucket:
logger.info("All locusts hatched: %s (%i already running)" % (
", ".join(["%s: %d" % (name, count) for name, count in occurrence_count.items()]),
existing_count,
))
if not sorted([g.args[0].id for g in self.locusts]) == list(range(len(self.locusts))):
logger.warning("Locust IDs are not consecutive.")
heyman marked this conversation as resolved.
Show resolved Hide resolved
self.environment.events.hatch_complete.fire(user_count=len(self.locusts))
return

locust_class = bucket.pop(random.randint(0, len(bucket)-1))
occurrence_count[locust_class.__name__] += 1
new_locust = locust_class(self.environment)
new_locust.id = existing_count + hatch_count
new_locust.start(self.locusts)
hatch_count += 1
if len(self.locusts) % 10 == 0:
logger.debug("%i locusts hatched" % len(self.locusts))
if bucket:
Expand All @@ -166,7 +171,17 @@ def kill_locusts(self, kill_count):
to_kill.append(user)
bucket.remove(l)
break

to_kill_ids = sorted([user.id for user in to_kill])
remaining_count = len(self.locusts) - kill_count
for g in self.locusts:
if g.args[0].id >= remaining_count:
g.args[0].id = to_kill_ids.pop()

self.kill_locust_instances(to_kill)
if not sorted([g.args[0].id for g in self.locusts]) == list(range(len(self.locusts))):
logger.warning("Locust IDs are not consecutive.")
heyman marked this conversation as resolved.
Show resolved Hide resolved

self.environment.events.hatch_complete.fire(user_count=self.user_count)


Expand Down Expand Up @@ -432,6 +447,14 @@ def heartbeat_worker(self):
else:
client.heartbeat -= 1

def broadcast_timeslots(self):
index = 0
num_clients = self.worker_count
for client in self.clients.all:
timeslot_ratio = index/num_clients
self.server.send_to_client(Message("timeslot_ratio", timeslot_ratio, client.id))
index += 1

def reset_connection(self):
logger.info("Reset connection to slave")
try:
Expand Down Expand Up @@ -461,6 +484,7 @@ def client_listener(self):
## emit a warning if the worker's clock seem to be out of sync with our clock
#if abs(time() - msg.data["time"]) > 5.0:
# warnings.warn("The worker node's clock seem to be out of sync. For the statistics to be correct the different locust servers need to have synchronized clocks.")
self.broadcast_timeslots()
elif msg.type == "client_stopped":
del self.clients[msg.node_id]
logger.info("Removing %s client from running clients" % (msg.node_id))
Expand Down Expand Up @@ -488,6 +512,10 @@ def client_listener(self):
if msg.node_id in self.clients:
del self.clients[msg.node_id]
logger.info("Client %r quit. Currently %i clients connected." % (msg.node_id, len(self.clients.ready)))
if self.state == STATE_RUNNING or self.state == STATE_HATCHING:
# balance the load distribution when a client quits
self.start(self.target_user_count, self.hatch_rate)
self.broadcast_timeslots()
elif msg.type == "exception":
self.log_exception(msg.node_id, msg.data["msg"], msg.data["traceback"])

Expand All @@ -504,6 +532,7 @@ def __init__(self, *args, master_host, master_port, **kwargs):
self.client_id = socket.gethostname() + "_" + uuid4().hex
self.master_host = master_host
self.master_port = master_port
self.timeslot_ratio = 0
self.client = rpc.Client(master_host, master_port, self.client_id)
self.greenlet.spawn(self.heartbeat)
self.greenlet.spawn(self.worker)
Expand Down Expand Up @@ -578,6 +607,8 @@ def worker(self):
self.stop()
self._send_stats() # send a final report, in case there were any samples not yet reported
self.greenlet.kill(block=True)
elif msg.type == "timeslot_ratio":
self.timeslot_ratio = msg.data

def stats_reporter(self):
while True:
Expand Down
45 changes: 32 additions & 13 deletions locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,15 +472,23 @@ def test_rebalance_locust_users_on_worker_connect(self):
self.assertTrue("zeh_fake_client1" in master.clients, "Could not find fake client in master instance's clients dict")

master.start(100, 20)
self.assertEqual(1, len(server.outbox))
self.assertEqual(1*2, len(server.outbox))
client_id, msg = server.outbox.pop()
self.assertEqual(100, msg.data["num_clients"])
self.assertEqual(20, msg.data["hatch_rate"])

client_id, msg = server.outbox.pop()
self.assertEqual("timeslot_ratio", msg.type)

# let another worker connect
server.mocked_send(Message("client_ready", None, "zeh_fake_client2"))
self.assertEqual(2, len(master.clients))
self.assertEqual(2, len(server.outbox))
self.assertEqual(2*2, len(server.outbox))

client_id, msg = server.outbox.pop()
self.assertEqual("timeslot_ratio", msg.type)
client_id, msg = server.outbox.pop()
self.assertEqual("timeslot_ratio", msg.type)

client_id, msg = server.outbox.pop()
self.assertEqual(50, msg.data["num_clients"])
self.assertEqual(10, msg.data["hatch_rate"])
Expand Down Expand Up @@ -516,9 +524,10 @@ def on_test_start(*a, **kw):

for i in range(5):
server.mocked_send(Message("client_ready", None, "fake_client%i" % i))

n_timeslots = 5*(5+1)/2

master.start(7, 7)
self.assertEqual(5, len(server.outbox))
self.assertEqual(n_timeslots + 5, len(server.outbox))
self.assertEqual(1, run_count[0])

# change number of users and check that test_start isn't fired again
Expand All @@ -539,9 +548,10 @@ def on_test_stop(*a, **kw):

for i in range(5):
server.mocked_send(Message("client_ready", None, "fake_client%i" % i))

n_timeslots = 5*(5+1)/2

master.start(7, 7)
self.assertEqual(5, len(server.outbox))
self.assertEqual(n_timeslots + 5, len(server.outbox))
master.stop()
self.assertEqual(1, run_count[0])

Expand Down Expand Up @@ -578,12 +588,15 @@ def test_spawn_uneven_locusts(self):
master = self.get_runner()
for i in range(5):
server.mocked_send(Message("client_ready", None, "fake_client%i" % i))

n_timeslots = 5*(5+1)/2

master.start(7, 7)
self.assertEqual(5, len(server.outbox))
self.assertEqual(n_timeslots + 5, len(server.outbox))

num_clients = 0
for _, msg in server.outbox:
if msg.type == "timeslot_ratio":
continue
num_clients += msg.data["num_clients"]

self.assertEqual(7, num_clients, "Total number of locusts that would have been spawned is not 7")
Expand All @@ -593,12 +606,15 @@ def test_spawn_fewer_locusts_than_workers(self):
master = self.get_runner()
for i in range(5):
server.mocked_send(Message("client_ready", None, "fake_client%i" % i))

n_timeslots = 5*(5+1)/2

master.start(2, 2)
self.assertEqual(5, len(server.outbox))
self.assertEqual(n_timeslots + 5, len(server.outbox))

num_clients = 0
for _, msg in server.outbox:
if msg.type == "timeslot_ratio":
continue
num_clients += msg.data["num_clients"]

self.assertEqual(2, num_clients, "Total number of locusts that would have been spawned is not 2")
Expand All @@ -608,17 +624,20 @@ def test_spawn_locusts_in_stepload_mode(self):
master = self.get_runner()
for i in range(5):
server.mocked_send(Message("client_ready", None, "fake_client%i" % i))
n_timeslots = 5*(5+1)/2

# start a new swarming in Step Load mode: total locust count of 10, hatch rate of 2, step locust count of 5, step duration of 2s
master.start_stepload(10, 2, 5, 2)

# make sure the first step run is started
sleep(0.5)
self.assertEqual(5, len(server.outbox))
self.assertEqual(n_timeslots + 5, len(server.outbox))

num_clients = 0
end_of_last_step = len(server.outbox)
for _, msg in server.outbox:
if msg.type == "timeslot_ratio":
continue
num_clients += msg.data["num_clients"]

self.assertEqual(5, num_clients, "Total number of locusts that would have been spawned for first step is not 5")
Expand Down
104 changes: 100 additions & 4 deletions locust/wait_time.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import random
from time import time
import math
from time import monotonic

from . import runners

def between(min_wait, max_wait):
"""
Expand Down Expand Up @@ -49,11 +51,105 @@ def my_task(self):
def wait_time_func(self):
if not hasattr(self,"_cp_last_run"):
self._cp_last_wait_time = wait_time
self._cp_last_run = time()
self._cp_last_run = monotonic()
return wait_time
else:
run_time = time() - self._cp_last_run - self._cp_last_wait_time
run_time = monotonic() - self._cp_last_run - self._cp_last_wait_time
self._cp_last_wait_time = max(0, wait_time - run_time)
self._cp_last_run = time()
self._cp_last_run = monotonic()
return self._cp_last_wait_time
return wait_time_func

def constant_uniform(wait_time):
"""
Returns a function that will track the run time of the tasks, and for each time it's
called it will return a wait time that will try to make the total time between task
execution equal to the time specified by the wait_time argument. The time between
tasks will be maintined.

Statistically, a constant inter-arrival time between tasks will be maintained
regardless of number of threads and workers, given that task run time is less than
specified wait_time.

In the following example the task will always be executed once every second, no matter
the task execution time. All the users will be synchronized to spread out the exact
execution trigger over the entire second::

class User(Locust):
wait_time = constant_uniform(1)
class task_set(TaskSet):
@task
def my_task(self):
time.sleep(random.random())

If a task execution exceeds the specified wait_time, the wait will be 0 before starting
the next task.
"""
def wait_time_func(self):

locust_id = self.id
n_locusts = self.environment.runner.user_count
locust_offset = wait_time / n_locusts * locust_id

worker_offset = 0
if (type(self.environment.runner) == runners.WorkerLocustRunner):
worker_offset = self.environment.runner.timeslot_ratio * wait_time / n_locusts

wall_clock = monotonic() + worker_offset + locust_offset
since_last_trigger = wall_clock % wait_time

time_remaining = max(0, wait_time - since_last_trigger)
return time_remaining
return wait_time_func

def poisson(lambda_value):
"""
Returns a function that will track the run time of the tasks, and for each time it's
called it will return a wait time that will try to achieve a poisson distribution of
tasks executions with lambda specified as lambda_value, given that task run time is
less than specified lambda_value.

Statistically, Locust will try to ensure:
RPS = 1/lambda
Inter-Arrival Mean = 1/lambda
Inter-Arrival Variance = 1/(lambda^2)

Note: A reasonably high number of tasks must be executed to achieve these statistics.

In the following example, a poisson task distribution with lambda = 1 will be executed::

class User(Locust):
wait_time = poisson(1)
class task_set(TaskSet):
@task
def my_task(self):
time.sleep(random.random())

"""

# Notes on statistics:
# Poisson Arrival Time Distribution = Exponential Inter-Arrival Distribution
wait_time = 1 / lambda_value

def random_exponential(lambda_value):
x = random.random()
return lambda_value * math.exp(-lambda_value * x)

def wait_time_func(self):

locust_id = self.id
n_locusts = self.environment.runner.user_count
locust_offset = wait_time / n_locusts * locust_id

worker_offset = 0
if (type(self.environment.runner) == runners.WorkerLocustRunner):
worker_offset = self.environment.runner.timeslot_ratio * wait_time / n_locusts

next_trigger_target = random_exponential(lambda_value) + lambda_value

wall_clock = monotonic() + worker_offset + locust_offset
since_last_trigger = wall_clock % wait_time

time_remaining = max(0, next_trigger_target - since_last_trigger)
return time_remaining
return wait_time_func