Skip to content

Commit

Permalink
Send custom arguments to worker. The solution actually sends ALL argu…
Browse files Browse the repository at this point in the history
…ments to workers, but only the custom arguments (not built-in ones) are actually processed by the worker. In the future we could (should?) make it so that all parameters will be accepted.
  • Loading branch information
cyberw committed Aug 7, 2021
1 parent ae78ad0 commit e3d969d
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 8 deletions.
7 changes: 7 additions & 0 deletions locust/argument_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,3 +443,10 @@ def parse_options(args=None):
if parsed_opts.stats_history_enabled and (parsed_opts.csv_prefix is None):
parser.error("'--csv-full-history' requires '--csv'.")
return parsed_opts


def default_args_dict():
# returns a dict containing the default arguments (before any custom arguments are added)
default_parser = get_empty_argument_parser()
setup_parser_arguments(default_parser)
return vars(default_parser.parse([]))
15 changes: 15 additions & 0 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
RequestStats,
setup_distributed_stats_event_listeners,
)
from . import argument_parser

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -690,6 +691,9 @@ def start(self, user_count: int, spawn_rate: float, **kwargs) -> None:
"user_classes_count": worker_user_classes_count,
"host": self.environment.host,
"stop_timeout": self.environment.stop_timeout,
"parsed_options": vars(self.environment.parsed_options)
if self.environment.parsed_options
else {},
}
dispatch_greenlets.add(
gevent.spawn_later(
Expand Down Expand Up @@ -1112,6 +1116,17 @@ def worker(self):
continue
self.environment.host = job["host"]
self.environment.stop_timeout = job["stop_timeout"]

# receive custom arguments
if self.environment.parsed_options is None:
default_parser = argument_parser.get_empty_argument_parser()
argument_parser.setup_parser_arguments(default_parser)
self.environment.parsed_options = default_parser.parse(args=[])
custom_args_from_master = {
k: v for k, v in job["parsed_options"].items() if k not in argument_parser.default_args_dict()
}
vars(self.environment.parsed_options).update(custom_args_from_master)

if self.spawning_greenlet:
# kill existing spawning greenlet before we launch new one
self.spawning_greenlet.kill(block=True)
Expand Down
84 changes: 82 additions & 2 deletions locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
runners,
__version__,
)
from locust.argument_parser import parse_options
from locust.env import Environment
from locust.exception import (
RPCError,
Expand Down Expand Up @@ -615,8 +616,8 @@ class TestUser(User):
wait_time = constant(0.1)

@task
def incr_stats(l):
l.environment.events.request.fire(
def incr_stats(self):
self.environment.events.request.fire(
request_type="GET",
name="/",
response_time=1337,
Expand Down Expand Up @@ -659,6 +660,69 @@ def incr_stats(l):
"For some reason the master node's stats has not come in",
)

def test_distributed_run_with_custom_args(self):
"""
Full integration test that starts both a MasterRunner and three WorkerRunner instances
and makes sure that their stats is sent to the Master.
"""

class TestUser(User):
wait_time = constant(0.1)

@task
def incr_stats(self):
self.environment.events.request.fire(
request_type="GET",
name=self.environment.parsed_options.my_str_argument,
response_time=self.environment.parsed_options.my_int_argument,
response_length=666,
exception=None,
context={},
)

@locust.events.init_command_line_parser.add_listener
def _(parser, **kw):
parser.add_argument("--my-int-argument", type=int)
parser.add_argument("--my-str-argument", type=str, default="NOOOO")

with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):
# start a Master runner
master_env = Environment(user_classes=[TestUser])
master = master_env.create_master_runner("*", 0)
master_env.parsed_options = parse_options(
[
"--my-int-argument",
"42",
"--my-str-argument",
"cool-string",
]
)
sleep(0)
# start 3 Worker runners
workers = []
for i in range(3):
worker_env = Environment(user_classes=[TestUser])
worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)
workers.append(worker)

# give workers time to connect
sleep(0.1)
# issue start command that should trigger TestUsers to be spawned in the Workers
master.start(6, spawn_rate=1000)
sleep(0.1)
# check that worker nodes have started locusts
for worker in workers:
self.assertEqual(2, worker.user_count)
# give time for users to generate stats, and stats to be sent to master
sleep(1)
master.quit()
# make sure users are killed
for worker in workers:
self.assertEqual(0, worker.user_count)

self.assertEqual(master_env.runner.stats.total.max_response_time, 42)
self.assertEqual(master_env.runner.stats.get("cool-string", "GET").avg_response_time, 42)

def test_test_stop_event(self):
class TestUser(User):
wait_time = constant(0.1)
Expand Down Expand Up @@ -2267,6 +2331,7 @@ def the_task(self):
"user_classes_count": {"MyTestUser": 1},
"host": "",
"stop_timeout": 1,
"parsed_options": {},
},
"dummy_client_id",
)
Expand Down Expand Up @@ -2307,6 +2372,7 @@ def the_task(self):
"user_classes_count": {"MyTestUser": 1},
"host": "",
"stop_timeout": None,
"parsed_options": {},
},
"dummy_client_id",
)
Expand Down Expand Up @@ -2351,6 +2417,7 @@ def my_task(self):
"user_classes_count": {"MyUser": 10},
"host": "",
"stop_timeout": None,
"parsed_options": {},
},
"dummy_client_id",
)
Expand All @@ -2369,6 +2436,7 @@ def my_task(self):
"user_classes_count": {"MyUser": 9},
"host": "",
"stop_timeout": None,
"parsed_options": {},
},
"dummy_client_id",
)
Expand All @@ -2386,6 +2454,7 @@ def my_task(self):
"user_classes_count": {"MyUser": 2},
"host": "",
"stop_timeout": None,
"parsed_options": {},
},
"dummy_client_id",
)
Expand All @@ -2403,6 +2472,7 @@ def my_task(self):
"user_classes_count": {"MyUser": 2},
"host": "",
"stop_timeout": None,
"parsed_options": {},
},
"dummy_client_id",
)
Expand Down Expand Up @@ -2443,6 +2513,7 @@ def my_task(self):
"user_classes_count": {"MyUser": 10},
"host": "",
"stop_timeout": None,
"parsed_options": {},
},
"dummy_client_id",
)
Expand Down Expand Up @@ -2524,6 +2595,7 @@ def my_task(self):
"user_classes_count": {"MyUser": 10},
"host": "",
"stop_timeout": None,
"parsed_options": {},
},
"dummy_client_id",
)
Expand All @@ -2538,6 +2610,7 @@ def my_task(self):
"user_classes_count": {"MyUser": 9},
"host": "",
"stop_timeout": None,
"parsed_options": {},
},
"dummy_client_id",
)
Expand Down Expand Up @@ -2574,6 +2647,7 @@ def my_task(self):
"user_classes_count": {"MyUser1": 10, "MyUser2": 10},
"host": "",
"stop_timeout": None,
"parsed_options": {},
},
"dummy_client_id",
)
Expand All @@ -2591,6 +2665,7 @@ def my_task(self):
"user_classes_count": {"MyUser1": 1, "MyUser2": 2},
"host": "",
"stop_timeout": None,
"parsed_options": {},
},
"dummy_client_id",
)
Expand Down Expand Up @@ -2702,6 +2777,7 @@ def on_test_start(*args, **kw):
"num_users": 1,
"host": "",
"stop_timeout": None,
"parsed_options": {},
},
"dummy_client_id",
)
Expand All @@ -2727,6 +2803,7 @@ def on_test_start(*args, **kw):
"num_users": 1,
"host": "",
"stop_timeout": None,
"parsed_options": {},
},
"dummy_client_id",
)
Expand All @@ -2745,6 +2822,7 @@ def on_test_start(*args, **kw):
"num_users": 1,
"host": "",
"stop_timeout": None,
"parsed_options": {},
},
"dummy_client_id",
)
Expand Down Expand Up @@ -2785,6 +2863,7 @@ def on_test_stop(*args, **kw):
"num_users": 1,
"host": "",
"stop_timeout": None,
"parsed_options": {},
},
"dummy_client_id",
)
Expand Down Expand Up @@ -2820,6 +2899,7 @@ def on_test_stop(*args, **kw):
"num_users": 1,
"host": "",
"stop_timeout": None,
"parsed_options": {},
},
"dummy_client_id",
)
Expand Down
11 changes: 5 additions & 6 deletions locust/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,13 +410,12 @@ def update_template_args(self):

stats = self.environment.runner.stats

# this is a somewhat cumbersome way to get the built-in arguments
default_parser = argument_parser.get_empty_argument_parser()
argument_parser.setup_parser_arguments(default_parser)
default_args_dict = vars(default_parser.parse(args=[]))

extra_options = (
{k: v for k, v in vars(self.environment.parsed_options).items() if k not in default_args_dict}
{
k: v
for k, v in vars(self.environment.parsed_options).items()
if k not in argument_parser.default_args_dict()
}
if self.environment.parsed_options
else {}
)
Expand Down

0 comments on commit e3d969d

Please sign in to comment.