diff --git a/locust/__init__.py b/locust/__init__.py index 15da43b588..064f824f68 100644 --- a/locust/__init__.py +++ b/locust/__init__.py @@ -1,6 +1,6 @@ from .core import HttpLocust, Locust, TaskSet, TaskSequence, task, seq_task from .exception import InterruptTaskSet, ResponseError, RescheduleTaskImmediately -from .wait_time import between, constant, constant_pacing +from .wait_time import between, constant, constant_pacing, poisson, constant_uniform from .event import Events events = Events() diff --git a/locust/core.py b/locust/core.py index 50a4aec431..ee88557849 100644 --- a/locust/core.py +++ b/locust/core.py @@ -505,6 +505,9 @@ class ForumPage(TaskSet): tasks = {ThreadPage:15, write_post:1} """ + id = 0 + """ID of this locust object used for spacing out tasks in time.""" + weight = 10 """Probability of locust being chosen. The higher the weight, the greater is the chance of it being chosen.""" diff --git a/locust/runners.py b/locust/runners.py index b418621abf..dbe0930201 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -128,19 +128,24 @@ def spawn_locusts(self, spawn_count, hatch_rate, wait=False): def hatch(): sleep_time = 1.0 / hatch_rate + hatch_count = 0 while True: if not bucket: logger.info("All locusts hatched: %s (%i already running)" % ( ", ".join(["%s: %d" % (name, count) for name, count in occurrence_count.items()]), existing_count, )) + if not sorted([g.args[0].id for g in self.locusts]) == list(range(len(self.locusts))): + logger.warning("Locust IDs are not consecutive.") self.environment.events.hatch_complete.fire(user_count=len(self.locusts)) return locust_class = bucket.pop(random.randint(0, len(bucket)-1)) occurrence_count[locust_class.__name__] += 1 new_locust = locust_class(self.environment) + new_locust.id = existing_count + hatch_count new_locust.start(self.locusts) + hatch_count += 1 if len(self.locusts) % 10 == 0: logger.debug("%i locusts hatched" % len(self.locusts)) if bucket: @@ -166,7 +171,17 @@ def kill_locusts(self, kill_count): to_kill.append(user) bucket.remove(l) break + + to_kill_ids = sorted([user.id for user in to_kill]) + remaining_count = len(self.locusts) - kill_count + for g in self.locusts: + if g.args[0].id >= remaining_count: + g.args[0].id = to_kill_ids.pop() + self.kill_locust_instances(to_kill) + if not sorted([g.args[0].id for g in self.locusts]) == list(range(len(self.locusts))): + logger.warning("Locust IDs are not consecutive.") + self.environment.events.hatch_complete.fire(user_count=self.user_count) @@ -432,6 +447,14 @@ def heartbeat_worker(self): else: client.heartbeat -= 1 + def broadcast_timeslots(self): + index = 0 + num_clients = self.worker_count + for client in self.clients.all: + timeslot_ratio = index/num_clients + self.server.send_to_client(Message("timeslot_ratio", timeslot_ratio, client.id)) + index += 1 + def reset_connection(self): logger.info("Reset connection to slave") try: @@ -461,6 +484,7 @@ def client_listener(self): ## emit a warning if the worker's clock seem to be out of sync with our clock #if abs(time() - msg.data["time"]) > 5.0: # warnings.warn("The worker node's clock seem to be out of sync. For the statistics to be correct the different locust servers need to have synchronized clocks.") + self.broadcast_timeslots() elif msg.type == "client_stopped": del self.clients[msg.node_id] logger.info("Removing %s client from running clients" % (msg.node_id)) @@ -488,6 +512,10 @@ def client_listener(self): if msg.node_id in self.clients: del self.clients[msg.node_id] logger.info("Client %r quit. Currently %i clients connected." % (msg.node_id, len(self.clients.ready))) + if self.state == STATE_RUNNING or self.state == STATE_HATCHING: + # balance the load distribution when a client quits + self.start(self.target_user_count, self.hatch_rate) + self.broadcast_timeslots() elif msg.type == "exception": self.log_exception(msg.node_id, msg.data["msg"], msg.data["traceback"]) @@ -504,6 +532,7 @@ def __init__(self, *args, master_host, master_port, **kwargs): self.client_id = socket.gethostname() + "_" + uuid4().hex self.master_host = master_host self.master_port = master_port + self.timeslot_ratio = 0 self.client = rpc.Client(master_host, master_port, self.client_id) self.greenlet.spawn(self.heartbeat) self.greenlet.spawn(self.worker) @@ -578,6 +607,8 @@ def worker(self): self.stop() self._send_stats() # send a final report, in case there were any samples not yet reported self.greenlet.kill(block=True) + elif msg.type == "timeslot_ratio": + self.timeslot_ratio = msg.data def stats_reporter(self): while True: diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index 956fdf89aa..cf0011217a 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -472,15 +472,23 @@ def test_rebalance_locust_users_on_worker_connect(self): self.assertTrue("zeh_fake_client1" in master.clients, "Could not find fake client in master instance's clients dict") master.start(100, 20) - self.assertEqual(1, len(server.outbox)) + self.assertEqual(1*2, len(server.outbox)) client_id, msg = server.outbox.pop() self.assertEqual(100, msg.data["num_clients"]) self.assertEqual(20, msg.data["hatch_rate"]) - + client_id, msg = server.outbox.pop() + self.assertEqual("timeslot_ratio", msg.type) + # let another worker connect server.mocked_send(Message("client_ready", None, "zeh_fake_client2")) self.assertEqual(2, len(master.clients)) - self.assertEqual(2, len(server.outbox)) + self.assertEqual(2*2, len(server.outbox)) + + client_id, msg = server.outbox.pop() + self.assertEqual("timeslot_ratio", msg.type) + client_id, msg = server.outbox.pop() + self.assertEqual("timeslot_ratio", msg.type) + client_id, msg = server.outbox.pop() self.assertEqual(50, msg.data["num_clients"]) self.assertEqual(10, msg.data["hatch_rate"]) @@ -516,9 +524,10 @@ def on_test_start(*a, **kw): for i in range(5): server.mocked_send(Message("client_ready", None, "fake_client%i" % i)) - + n_timeslots = 5*(5+1)/2 + master.start(7, 7) - self.assertEqual(5, len(server.outbox)) + self.assertEqual(n_timeslots + 5, len(server.outbox)) self.assertEqual(1, run_count[0]) # change number of users and check that test_start isn't fired again @@ -539,9 +548,10 @@ def on_test_stop(*a, **kw): for i in range(5): server.mocked_send(Message("client_ready", None, "fake_client%i" % i)) - + n_timeslots = 5*(5+1)/2 + master.start(7, 7) - self.assertEqual(5, len(server.outbox)) + self.assertEqual(n_timeslots + 5, len(server.outbox)) master.stop() self.assertEqual(1, run_count[0]) @@ -578,12 +588,15 @@ def test_spawn_uneven_locusts(self): master = self.get_runner() for i in range(5): server.mocked_send(Message("client_ready", None, "fake_client%i" % i)) - + n_timeslots = 5*(5+1)/2 + master.start(7, 7) - self.assertEqual(5, len(server.outbox)) + self.assertEqual(n_timeslots + 5, len(server.outbox)) num_clients = 0 for _, msg in server.outbox: + if msg.type == "timeslot_ratio": + continue num_clients += msg.data["num_clients"] self.assertEqual(7, num_clients, "Total number of locusts that would have been spawned is not 7") @@ -593,12 +606,15 @@ def test_spawn_fewer_locusts_than_workers(self): master = self.get_runner() for i in range(5): server.mocked_send(Message("client_ready", None, "fake_client%i" % i)) - + n_timeslots = 5*(5+1)/2 + master.start(2, 2) - self.assertEqual(5, len(server.outbox)) - + self.assertEqual(n_timeslots + 5, len(server.outbox)) + num_clients = 0 for _, msg in server.outbox: + if msg.type == "timeslot_ratio": + continue num_clients += msg.data["num_clients"] self.assertEqual(2, num_clients, "Total number of locusts that would have been spawned is not 2") @@ -608,17 +624,20 @@ def test_spawn_locusts_in_stepload_mode(self): master = self.get_runner() for i in range(5): server.mocked_send(Message("client_ready", None, "fake_client%i" % i)) + n_timeslots = 5*(5+1)/2 # start a new swarming in Step Load mode: total locust count of 10, hatch rate of 2, step locust count of 5, step duration of 2s master.start_stepload(10, 2, 5, 2) # make sure the first step run is started sleep(0.5) - self.assertEqual(5, len(server.outbox)) + self.assertEqual(n_timeslots + 5, len(server.outbox)) num_clients = 0 end_of_last_step = len(server.outbox) for _, msg in server.outbox: + if msg.type == "timeslot_ratio": + continue num_clients += msg.data["num_clients"] self.assertEqual(5, num_clients, "Total number of locusts that would have been spawned for first step is not 5") diff --git a/locust/wait_time.py b/locust/wait_time.py index 1399f37810..bf298486c3 100644 --- a/locust/wait_time.py +++ b/locust/wait_time.py @@ -1,6 +1,8 @@ import random -from time import time +import math +from time import monotonic +from . import runners def between(min_wait, max_wait): """ @@ -49,11 +51,105 @@ def my_task(self): def wait_time_func(self): if not hasattr(self,"_cp_last_run"): self._cp_last_wait_time = wait_time - self._cp_last_run = time() + self._cp_last_run = monotonic() return wait_time else: - run_time = time() - self._cp_last_run - self._cp_last_wait_time + run_time = monotonic() - self._cp_last_run - self._cp_last_wait_time self._cp_last_wait_time = max(0, wait_time - run_time) - self._cp_last_run = time() + self._cp_last_run = monotonic() return self._cp_last_wait_time return wait_time_func + +def constant_uniform(wait_time): + """ + Returns a function that will track the run time of the tasks, and for each time it's + called it will return a wait time that will try to make the total time between task + execution equal to the time specified by the wait_time argument. The time between + tasks will be maintined. + + Statistically, a constant inter-arrival time between tasks will be maintained + regardless of number of threads and workers, given that task run time is less than + specified wait_time. + + In the following example the task will always be executed once every second, no matter + the task execution time. All the users will be synchronized to spread out the exact + execution trigger over the entire second:: + + class User(Locust): + wait_time = constant_uniform(1) + class task_set(TaskSet): + @task + def my_task(self): + time.sleep(random.random()) + + If a task execution exceeds the specified wait_time, the wait will be 0 before starting + the next task. + """ + def wait_time_func(self): + + locust_id = self.id + n_locusts = self.environment.runner.user_count + locust_offset = wait_time / n_locusts * locust_id + + worker_offset = 0 + if (type(self.environment.runner) == runners.WorkerLocustRunner): + worker_offset = self.environment.runner.timeslot_ratio * wait_time / n_locusts + + wall_clock = monotonic() + worker_offset + locust_offset + since_last_trigger = wall_clock % wait_time + + time_remaining = max(0, wait_time - since_last_trigger) + return time_remaining + return wait_time_func + +def poisson(lambda_value): + """ + Returns a function that will track the run time of the tasks, and for each time it's + called it will return a wait time that will try to achieve a poisson distribution of + tasks executions with lambda specified as lambda_value, given that task run time is + less than specified lambda_value. + + Statistically, Locust will try to ensure: + RPS = 1/lambda + Inter-Arrival Mean = 1/lambda + Inter-Arrival Variance = 1/(lambda^2) + + Note: A reasonably high number of tasks must be executed to achieve these statistics. + + In the following example, a poisson task distribution with lambda = 1 will be executed:: + + class User(Locust): + wait_time = poisson(1) + class task_set(TaskSet): + @task + def my_task(self): + time.sleep(random.random()) + + """ + + # Notes on statistics: + # Poisson Arrival Time Distribution = Exponential Inter-Arrival Distribution + wait_time = 1 / lambda_value + + def random_exponential(lambda_value): + x = random.random() + return lambda_value * math.exp(-lambda_value * x) + + def wait_time_func(self): + + locust_id = self.id + n_locusts = self.environment.runner.user_count + locust_offset = wait_time / n_locusts * locust_id + + worker_offset = 0 + if (type(self.environment.runner) == runners.WorkerLocustRunner): + worker_offset = self.environment.runner.timeslot_ratio * wait_time / n_locusts + + next_trigger_target = random_exponential(lambda_value) + lambda_value + + wall_clock = monotonic() + worker_offset + locust_offset + since_last_trigger = wall_clock % wait_time + + time_remaining = max(0, next_trigger_target - since_last_trigger) + return time_remaining + return wait_time_func