Skip to content

Commit

Permalink
Added the ability to specify the exact number of users
Browse files Browse the repository at this point in the history
for spawning of each class with <fixed_count> class field.
Fixed users spawn first. The weight parameter for them is ignored.
Also added unit-tests for cases with fixed users.
  • Loading branch information
EzR1d3r committed Dec 25, 2021
1 parent cce0d29 commit 8007d44
Show file tree
Hide file tree
Showing 3 changed files with 315 additions and 23 deletions.
96 changes: 74 additions & 22 deletions locust/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
from collections.abc import Iterator
from operator import attrgetter
from typing import Dict, Generator, List, TYPE_CHECKING, Tuple, Type
from typing import Dict, Generator, List, TYPE_CHECKING, Optional, Tuple, Type

import gevent
import typing
Expand Down Expand Up @@ -98,6 +98,10 @@ def __init__(self, worker_nodes: "List[WorkerNode]", user_classes: List[Type[Use

self._rebalance = False

self._try_dispatch_fixed = True

self._no_user_to_spawn = False

@property
def dispatch_in_progress(self):
return self._dispatch_in_progress
Expand Down Expand Up @@ -132,6 +136,9 @@ def _dispatcher(self) -> Generator[Dict[str, Dict[str, int]], None, None]:
if self._rebalance:
self._rebalance = False
yield self._users_on_workers
if self._no_user_to_spawn:
self._no_user_to_spawn = False
break

while self._current_user_count > self._target_user_count:
with self._wait_between_dispatch_iteration_context():
Expand Down Expand Up @@ -242,12 +249,17 @@ def _add_users_on_workers(self) -> Dict[str, Dict[str, int]]:
self._current_user_count + self._user_count_per_dispatch_iteration, self._target_user_count
)
for user in self._user_generator:
if not user:
self._no_user_to_spawn = True
break
worker_node = next(self._worker_node_generator)
self._users_on_workers[worker_node.id][user] += 1
self._current_user_count += 1
self._active_users.append((worker_node, user))
if self._current_user_count >= current_user_count_target:
return self._users_on_workers
break

return self._users_on_workers

def _remove_users_from_workers(self) -> Dict[str, Dict[str, int]]:
"""Remove users from the workers until the target number of users is reached for the current dispatch iteration
Expand All @@ -264,9 +276,17 @@ def _remove_users_from_workers(self) -> Dict[str, Dict[str, int]]:
return self._users_on_workers
self._users_on_workers[worker_node.id][user] -= 1
self._current_user_count -= 1
self._try_dispatch_fixed = True
if self._current_user_count == 0 or self._current_user_count <= current_user_count_target:
return self._users_on_workers

def _get_user_current_count(self, user: str) -> int:
count = 0
for users_on_node in self._users_on_workers.values():
count += users_on_node.get(user, 0)

return count

def _distribute_users(
self, target_user_count: int
) -> Tuple[dict, Generator[str, None, None], typing.Iterator["WorkerNode"], List[Tuple["WorkerNode", str]]]:
Expand Down Expand Up @@ -307,26 +327,58 @@ def _user_gen(self) -> Generator[str, None, None]:
weighted round-robin algorithm, we'd get AAAAABAAAAAB which would make the distribution
less accurate during ramp-up/down.
"""
# Normalize the weights so that the smallest weight will be equal to "target_min_weight".
# The value "2" was experimentally determined because it gave a better distribution especially
# when dealing with weights which are close to each others, e.g. 1.5, 2, 2.4, etc.
target_min_weight = 2
min_weight = min(u.weight for u in self._user_classes)
normalized_weights = [
(user_class.__name__, round(target_min_weight * user_class.weight / min_weight))
for user_class in self._user_classes
]
gen = smooth(normalized_weights)
# Instead of calling `gen()` for each user, we cycle through a generator of fixed-length
# `generation_length_to_get_proper_distribution`. Doing so greatly improves performance because
# we only ever need to call `gen()` a relatively small number of times. The length of this generator
# is chosen as the sum of the normalized weights. So, for users A, B, C of weights 2, 5, 6, the length is
# 2 + 5 + 6 = 13 which would yield the distribution `CBACBCBCBCABC` that gets repeated over and over
# until the target user count is reached.
generation_length_to_get_proper_distribution = sum(
normalized_weight[1] for normalized_weight in normalized_weights
)
yield from itertools.cycle(gen() for _ in range(generation_length_to_get_proper_distribution))

def infinite_cycle_gen(users: List[Tuple[User, int]]) -> Generator[Optional[str], None, None]:
if not users:
return itertools.cycle([None])

# Normalize the weights so that the smallest weight will be equal to "target_min_weight".
# The value "2" was experimentally determined because it gave a better distribution especially
# when dealing with weights which are close to each others, e.g. 1.5, 2, 2.4, etc.
target_min_weight = 2

# 'Value' here means weight or fixed count
normalized_values = [
(
user.__name__,
round(target_min_weight * value / min([u[1] for u in users])),
)
for user, value in users
]
generation_length_to_get_proper_distribution = sum(
normalized_val[1] for normalized_val in normalized_values
)
gen = smooth(normalized_values)
# Instead of calling `gen()` for each user, we cycle through a generator of fixed-length
# `generation_length_to_get_proper_distribution`. Doing so greatly improves performance because
# we only ever need to call `gen()` a relatively small number of times. The length of this generator
# is chosen as the sum of the normalized weights. So, for users A, B, C of weights 2, 5, 6, the length is
# 2 + 5 + 6 = 13 which would yield the distribution `CBACBCBCBCABC` that gets repeated over and over
# until the target user count is reached.
return itertools.cycle(gen() for _ in range(generation_length_to_get_proper_distribution))

fixed_users = {u.__name__: u for u in self._user_classes if u.fixed_count}

cycle_fixed_gen = infinite_cycle_gen([(u, u.fixed_count) for u in fixed_users.values()])
cycle_weighted_gen = infinite_cycle_gen([(u, u.weight) for u in self._user_classes if not u.fixed_count])

# Spawn users
while True:
if self._try_dispatch_fixed:
spawned_classes = set()
while True:
user_name = next(cycle_fixed_gen)
if not user_name:
break
if self._get_user_current_count(user_name) >= fixed_users[user_name].fixed_count:
spawned_classes.add(user_name)
else:
yield user_name
if len(spawned_classes) == len(fixed_users):
break
self._try_dispatch_fixed = False

yield next(cycle_weighted_gen)

@staticmethod
def _fast_users_on_workers_copy(users_on_workers: Dict[str, Dict[str, int]]) -> Dict[str, Dict[str, int]]:
Expand Down
235 changes: 234 additions & 1 deletion locust/test/test_dispatch.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import time
import unittest
from operator import attrgetter
from typing import Dict
from typing import Dict, List, Tuple

from locust import User
from locust.dispatch import UsersDispatcher
Expand Down Expand Up @@ -3291,6 +3291,239 @@ class User3(User):
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 3)


class TestRampUpUsersFromZeroWithFixed(unittest.TestCase):
class Case:
def __init__(self, fixed_counts: Tuple[int], weights: Tuple[int], target_user_count: int) -> None:
self.fixed_counts = fixed_counts
self.weights = weights
self.target_user_count = target_user_count

def case_handler(self, cases: List[Case], expected: Dict[str, int], user_classes=List[User]):
assert len(cases) == len(expected), "Different len of 'cases' and 'expected'"

for case_num in range(len(cases)):
# Reset to defaul values
for user_class in user_classes:
user_class.weight, user_class.fixed_count = 1, 0

case = cases[case_num]
assert (len(case.fixed_counts) + len(case.weights)) == len(
user_classes
), "Different number of values and user classes, unable to match"

fixed_users_list = user_classes[: len(case.fixed_counts)]
weighted_users_list = user_classes[len(case.fixed_counts) :]

for user, fixed_count in zip(fixed_users_list, case.fixed_counts):
user.fixed_count = fixed_count

for user, weight in zip(weighted_users_list, case.weights):
user.weight = weight

worker_node1 = WorkerNode("1")

users_dispatcher = UsersDispatcher(worker_nodes=[worker_node1], user_classes=user_classes)
users_dispatcher.new_dispatch(target_user_count=case.target_user_count, spawn_rate=0.5)
users_dispatcher._wait_between_dispatch = 0

iterations = list(users_dispatcher)
self.assertDictEqual(iterations[-1]["1"], expected[case_num])

def test_ramp_up_2_weigted_user_with_1_fixed_user(self):
class User1(User):
...

class User2(User):
...

class User3(User):
...

self.case_handler(
cases=[
self.Case(fixed_counts=(), weights=(1, 1, 1), target_user_count=3),
self.Case(fixed_counts=(1,), weights=(1, 1), target_user_count=3),
self.Case(fixed_counts=(1,), weights=(1, 1), target_user_count=9),
self.Case(fixed_counts=(8,), weights=(1, 1), target_user_count=10),
self.Case(fixed_counts=(2,), weights=(1, 1), target_user_count=1000),
self.Case(fixed_counts=(100,), weights=(1, 1), target_user_count=1000),
self.Case(fixed_counts=(960,), weights=(1, 1), target_user_count=1000),
self.Case(fixed_counts=(9990,), weights=(1, 1), target_user_count=10000),
self.Case(fixed_counts=(100,), weights=(1, 1), target_user_count=100),
],
expected=[
{"User1": 1, "User2": 1, "User3": 1},
{"User1": 1, "User2": 1, "User3": 1},
{"User1": 1, "User2": 4, "User3": 4},
{"User1": 8, "User2": 1, "User3": 1},
{"User1": 2, "User2": 499, "User3": 499},
{"User1": 100, "User2": 450, "User3": 450},
{"User1": 960, "User2": 20, "User3": 20},
{"User1": 9990, "User2": 5, "User3": 5},
{"User1": 100, "User2": 0, "User3": 0},
],
user_classes=[User1, User2, User3],
)

def test_ramp_up_various_count_weigted_and_fixed_users(self):
class User1(User):
...

class User2(User):
...

class User3(User):
...

class User4(User):
...

class User5(User):
...

self.case_handler(
cases=[
self.Case(fixed_counts=(1, 1), weights=(1, 1, 1), target_user_count=5),
self.Case(fixed_counts=(5, 2), weights=(1, 1, 1), target_user_count=10),
self.Case(fixed_counts=(9, 1), weights=(5, 3, 2), target_user_count=20),
self.Case(fixed_counts=(996,), weights=(1, 1, 1, 1), target_user_count=1000),
self.Case(fixed_counts=(500,), weights=(2, 1, 1, 1), target_user_count=1000),
self.Case(fixed_counts=(250, 250), weights=(3, 1, 1), target_user_count=1000),
self.Case(fixed_counts=(1, 1, 1, 1), weights=(100,), target_user_count=1000),
],
expected=[
{"User1": 1, "User2": 1, "User3": 1, "User4": 1, "User5": 1},
{"User1": 5, "User2": 2, "User3": 1, "User4": 1, "User5": 1},
{"User1": 9, "User2": 1, "User3": 5, "User4": 3, "User5": 2},
{"User1": 996, "User2": 1, "User3": 1, "User4": 1, "User5": 1},
{"User1": 500, "User2": 200, "User3": 100, "User4": 100, "User5": 100},
{"User1": 250, "User2": 250, "User3": 300, "User4": 100, "User5": 100},
{"User1": 1, "User2": 1, "User3": 1, "User4": 1, "User5": 996},
],
user_classes=[User1, User2, User3, User4, User5],
)

def test_ramp_up_only_fixed_users(self):
class User1(User):
...

class User2(User):
...

class User3(User):
...

class User4(User):
...

class User5(User):
...

self.case_handler(
cases=[
self.Case(fixed_counts=(1, 1, 1, 1, 1), weights=(), target_user_count=5),
self.Case(fixed_counts=(13, 26, 39, 52, 1), weights=(), target_user_count=131),
self.Case(fixed_counts=(10, 10, 10, 10, 10), weights=(), target_user_count=100),
self.Case(fixed_counts=(10, 10, 10, 10, 10), weights=(), target_user_count=50),
],
expected=[
{"User1": 1, "User2": 1, "User3": 1, "User4": 1, "User5": 1},
{"User1": 13, "User2": 26, "User3": 39, "User4": 52, "User5": 1},
{"User1": 10, "User2": 10, "User3": 10, "User4": 10, "User5": 10},
{"User1": 10, "User2": 10, "User3": 10, "User4": 10, "User5": 10},
],
user_classes=[User1, User2, User3, User4, User5],
)

def test_ramp_up_ramp_down_and_rump_up_again_fixed(self):
for weights, fixed_counts in [
[(1, 1, 1, 1, 1), (100, 100, 50, 50, 200)],
[(1, 1, 1, 1, 1), (100, 150, 50, 50, 0)],
[(1, 1, 1, 1, 1), (200, 100, 50, 0, 0)],
[(1, 1, 1, 1, 1), (200, 100, 0, 0, 0)],
[(1, 1, 1, 1, 1), (200, 0, 0, 0, 0)],
[(1, 1, 1, 1, 1), (0, 0, 0, 0, 0)],
]:

u1_weight, u2_weight, u3_weight, u4_weight, u5_weight = weights
u1_fixed_count, u2_fixed_count, u3_fixed_count, u4_fixed_count, u5_fixed_count = fixed_counts

class User1(User):
weight = u1_weight
fixed_count = u1_fixed_count

class User2(User):
weight = u2_weight
fixed_count = u2_fixed_count

class User3(User):
weight = u3_weight
fixed_count = u3_fixed_count

class User4(User):
weight = u4_weight
fixed_count = u4_fixed_count

class User5(User):
weight = u5_weight
fixed_count = u5_fixed_count

target_user_counts = [sum(fixed_counts), sum(fixed_counts) + 100]
down_counts = [0, max(min(fixed_counts) - 1, 0)]
user_classes = [User1, User2, User3, User4, User5]

for worker_count in [3, 5, 9]:
workers = [WorkerNode(str(i + 1)) for i in range(worker_count)]
users_dispatcher = UsersDispatcher(worker_nodes=workers, user_classes=user_classes)

for down_to_count in down_counts:
for target_user_count in target_user_counts:

# Ramp-up to go to `target_user_count` #########

users_dispatcher.new_dispatch(target_user_count=target_user_count, spawn_rate=1)
users_dispatcher._wait_between_dispatch = 0

list(users_dispatcher)

for user_class in user_classes:
if user_class.fixed_count:
self.assertEqual(
users_dispatcher._get_user_current_count(user_class.__name__),
user_class.fixed_count,
)

# Ramp-down to go to `down_to_count`
# and ensure what the fixed users was decreased too

users_dispatcher.new_dispatch(target_user_count=down_to_count, spawn_rate=1)
users_dispatcher._wait_between_dispatch = 0

list(users_dispatcher)

for user_class in user_classes:
if user_class.fixed_count:
self.assertNotEqual(
users_dispatcher._get_user_current_count(user_class.__name__),
user_class.fixed_count,
)

# Ramp-up go back to `target_user_count` and ensure
# what the fixed users return to their counts

users_dispatcher.new_dispatch(target_user_count=target_user_count, spawn_rate=1)
users_dispatcher._wait_between_dispatch = 0

list(users_dispatcher)

for user_class in user_classes:
if user_class.fixed_count:
self.assertEqual(
users_dispatcher._get_user_current_count(user_class.__name__),
user_class.fixed_count,
)


def _aggregate_dispatched_users(d: Dict[str, Dict[str, int]]) -> Dict[str, int]:
user_classes = list(next(iter(d.values())).keys())
return {u: sum(d[u] for d in d.values()) for u in user_classes}
Expand Down
Loading

0 comments on commit 8007d44

Please sign in to comment.