From af31eb9f183d083419f3376340d4fb4e72794a5a Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Wed, 1 Apr 2020 08:10:04 +0200 Subject: [PATCH 1/9] Change client assignment (WIP --- esrally/driver/driver.py | 409 ++++++++++++++++++++++-------------- tests/driver/driver_test.py | 28 +-- 2 files changed, 263 insertions(+), 174 deletions(-) diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index b847a5e9f..a0c6fc5f1 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -16,9 +16,11 @@ # under the License. import asyncio +import collections import concurrent.futures import datetime import logging +import multiprocessing import queue import threading import time @@ -78,22 +80,22 @@ def __init__(self, distribution_flavor, distribution_version, revision): self.revision = revision -class StartLoadGenerator: +class StartWorker: """ - Starts a load generator. + Starts a worker. """ - def __init__(self, client_id, config, track, tasks): + def __init__(self, worker_id, config, track, client_allocations): """ - :param client_id: Client id of the load generator. + :param worker_id: Unique (numeric) id of the worker. :param config: Rally internal configuration object. :param track: The track to use. - :param tasks: Tasks to run. + :param client_allocations: A structure describing which clients need to run which tasks. """ - self.client_id = client_id + self.worker_id = worker_id self.config = config self.track = track - self.tasks = tasks + self.client_allocations = client_allocations class Drive: @@ -129,11 +131,12 @@ class JoinPointReached: Tells the master that a load generator has reached a join point. Used for coordination across multiple load generators. """ - def __init__(self, client_id, task): - self.client_id = client_id - # Using perf_counter here is fine even in the distributed case. Although we "leak" this value to other machines, we will only - # ever interpret this value on the same machine (see `Drive` and the implementation in `Driver#joinpoint_reached()`). - self.client_local_timestamp = time.perf_counter() + def __init__(self, worker_id, task): + self.worker_id = worker_id + # Using perf_counter here is fine even in the distributed case. Although we "leak" this value to other + # machines, we will only ever interpret this value on the same machine (see `Drive` and the implementation + # in `Driver#joinpoint_reached()`). + self.worker_timestamp = time.perf_counter() self.task = task @@ -161,7 +164,7 @@ class DriverActor(actor.RallyActor): POST_PROCESS_INTERVAL_SECONDS = 30 """ - Coordinates all worker drivers. This is actually only a thin actor wrapper layer around ``Driver`` which does the actual work. + Coordinates all workers. This is actually only a thin actor wrapper layer around ``Driver`` which does the actual work. """ def __init__(self): @@ -192,14 +195,14 @@ def receiveMsg_ActorExitRequest(self, msg, sender): self.status = "exiting" def receiveMsg_ChildActorExited(self, msg, sender): - # is it a driver? - if msg.childAddress in self.coordinator.drivers: - driver_index = self.coordinator.drivers.index(msg.childAddress) + # is it a worker? + if msg.childAddress in self.coordinator.workers: + worker_index = self.coordinator.workers.index(msg.childAddress) if self.status == "exiting": - self.logger.info("Load generator [%d] has exited.", driver_index) + self.logger.info("Worker [%d] has exited.", worker_index) else: - self.logger.error("Load generator [%d] has exited prematurely. Aborting benchmark.", driver_index) - self.send(self.start_sender, actor.BenchmarkFailure("Load generator [{}] has exited prematurely.".format(driver_index))) + self.logger.error("Worker [%d] has exited prematurely. Aborting benchmark.", worker_index) + self.send(self.start_sender, actor.BenchmarkFailure("Worker [{}] has exited prematurely.".format(worker_index))) else: self.logger.info("A track preparator has exited.") @@ -225,7 +228,7 @@ def receiveMsg_TrackPrepared(self, msg, sender): @actor.no_retry("driver") def receiveMsg_JoinPointReached(self, msg, sender): - self.coordinator.joinpoint_reached(msg.client_id, msg.client_local_timestamp, msg.task) + self.coordinator.joinpoint_reached(msg.worker_id, msg.worker_timestamp, msg.task) @actor.no_retry("driver") def receiveMsg_UpdateSamples(self, msg, sender): @@ -243,13 +246,11 @@ def receiveMsg_WakeupMessage(self, msg, sender): self.coordinator.update_progress_message() self.wakeupAfter(datetime.timedelta(seconds=DriverActor.WAKEUP_INTERVAL_SECONDS)) - def create_client(self, client_id, host): - return self.createActor(LoadGenerator, - #globalName="/rally/driver/worker/%s" % str(client_id), - targetActorRequirements=self._requirements(host)) + def create_client(self, host): + return self.createActor(Worker, targetActorRequirements=self._requirements(host)) - def start_load_generator(self, driver, client_id, cfg, track, allocations): - self.send(driver, StartLoadGenerator(client_id, cfg, track, allocations)) + def start_worker(self, driver, worker_id, cfg, track, allocations): + self.send(driver, StartWorker(worker_id, cfg, track, allocations)) def drive_at(self, driver, client_start_timestamp): self.send(driver, Drive(client_start_timestamp)) @@ -351,7 +352,9 @@ def __init__(self, target, config, es_client_factory_class=client.EsClientFactor self.challenge = None self.metrics_store = None self.load_driver_hosts = [] - self.drivers = [] + self.workers = [] + # which client ids are assigned to which workers? + self.clients_per_worker = {} self.progress_reporter = console.progress() self.progress_counter = 0 @@ -363,7 +366,7 @@ def __init__(self, target, config, es_client_factory_class=client.EsClientFactor self.number_of_steps = 0 self.currently_completed = 0 - self.clients_completed_current_step = {} + self.workers_completed_current_step = {} self.current_step = -1 self.tasks_per_join_point = None self.complete_current_task_sent = False @@ -431,12 +434,18 @@ def prepare_benchmark(self, t): self.prepare_telemetry(es_clients) self.target.on_cluster_details_retrieved(self.retrieve_cluster_info(es_clients)) for host in self.config.opts("driver", "load_driver_hosts"): + host_config = { + # for simplicity we assume that all benchmark machines have the same specs + "cores": multiprocessing.cpu_count() + } if host != "localhost": - self.load_driver_hosts.append(net.resolve(host)) + host_config["host"] = net.resolve(host) else: - self.load_driver_hosts.append(host) + host_config["host"] = host + + self.load_driver_hosts.append(host_config) - preps = [self.target.create_track_preparator(h) for h in self.load_driver_hosts] + preps = [self.target.create_track_preparator(h["host"]) for h in self.load_driver_hosts] self.target.on_prepare_track(preps, self.config, self.track) def start_benchmark(self): @@ -452,33 +461,43 @@ def start_benchmark(self): self.number_of_steps = len(allocator.join_points) - 1 self.tasks_per_join_point = allocator.tasks_per_joinpoint - self.logger.info("Benchmark consists of [%d] steps executed by (at most) [%d] clients as specified by the allocation matrix:\n%s", - self.number_of_steps, len(self.allocations), self.allocations) - - for client_id in range(allocator.clients): - # allocate clients round-robin to all defined hosts - host = self.load_driver_hosts[client_id % len(self.load_driver_hosts)] - self.logger.info("Allocating load generator [%d] on [%s]", client_id, host) - self.drivers.append(self.target.create_client(client_id, host)) - for client_id, driver in enumerate(self.drivers): - self.logger.info("Starting load generator [%d].", client_id) - self.target.start_load_generator(driver, client_id, self.config, self.track, self.allocations[client_id]) + self.logger.info("Benchmark consists of [%d] steps executed by [%d] clients.", + self.number_of_steps, len(self.allocations)) + # avoid flooding the log if there are too many clients + if allocator.clients < 128: + self.logger.info("Allocation matrix:\n%s", "\n".join([str(a) for a in self.allocations])) + + worker_assignments = calculate_worker_assignments(self.load_driver_hosts, allocator.clients) + for assignment in worker_assignments: + host = assignment["host"] + for worker_id, clients in enumerate(assignment["workers"]): + # don't assign workers without any clients + if len(clients) > 0: + self.logger.info("Allocating worker [%d] on [%s] with [%d] clients.", worker_id, host, len(clients)) + worker = self.target.create_client(host) + + client_allocations = ClientAllocations() + for client_id in clients: + client_allocations.add(client_id, self.allocations[client_id]) + self.clients_per_worker[client_id] = worker_id + self.target.start_worker(worker, worker_id, self.config, self.track, client_allocations) + self.workers.append(worker) self.update_progress_message() - def joinpoint_reached(self, client_id, client_local_timestamp, task): + def joinpoint_reached(self, worker_id, worker_local_timestamp, task_allocations): self.currently_completed += 1 - self.clients_completed_current_step[client_id] = (client_local_timestamp, time.perf_counter()) - self.logger.info("[%d/%d] drivers reached join point [%d/%d].", - self.currently_completed, len(self.drivers), self.current_step + 1, self.number_of_steps) - if self.currently_completed == len(self.drivers): - self.logger.info("All drivers completed their tasks until join point [%d/%d].", self.current_step + 1, self.number_of_steps) + self.workers_completed_current_step[worker_id] = (worker_local_timestamp, time.perf_counter()) + self.logger.info("[%d/%d] workers reached join point [%d/%d].", + self.currently_completed, len(self.workers), self.current_step + 1, self.number_of_steps) + if self.currently_completed == len(self.workers): + self.logger.info("All workers completed their tasks until join point [%d/%d].", self.current_step + 1, self.number_of_steps) # we can go on to the next step self.currently_completed = 0 self.complete_current_task_sent = False # make a copy and reset early to avoid any race conditions from clients that reach a join point already while we are sending... - clients_curr_step = self.clients_completed_current_step - self.clients_completed_current_step = {} + workers_curr_step = self.workers_completed_current_step + self.workers_completed_current_step = {} self.update_progress_message(task_finished=True) # clear per step self.most_recent_sample_per_client = {} @@ -511,32 +530,43 @@ def joinpoint_reached(self, client_id, client_local_timestamp, task): # Using a perf_counter here is fine also in the distributed case as we subtract it from `master_received_msg_at` making it # a relative instead of an absolute value. start_next_task = time.perf_counter() + waiting_period - for client_id, driver in enumerate(self.drivers): - client_ended_task_at, master_received_msg_at = clients_curr_step[client_id] - client_start_timestamp = client_ended_task_at + (start_next_task - master_received_msg_at) - self.logger.info("Scheduling next task for client id [%d] at their timestamp [%f] (master timestamp [%f])", - client_id, client_start_timestamp, start_next_task) - self.target.drive_at(driver, client_start_timestamp) + for worker_id, driver in enumerate(self.workers): + worker_ended_task_at, master_received_msg_at = workers_curr_step[worker_id] + worker_start_timestamp = worker_ended_task_at + (start_next_task - master_received_msg_at) + self.logger.info("Scheduling next task for worker id [%d] at their timestamp [%f] (master timestamp [%f])", + worker_id, worker_start_timestamp, start_next_task) + self.target.drive_at(driver, worker_start_timestamp) else: - current_join_point = task - # we need to actively send CompleteCurrentTask messages to all remaining clients. - if current_join_point.preceding_task_completes_parent and not self.complete_current_task_sent: - self.logger.info("Tasks before [%s] are able to complete the parent structure. Checking if clients [%s] have finished yet.", - current_join_point, current_join_point.clients_executing_completing_task) - # are all clients executing said task already done? if so we need to notify the remaining clients - all_clients_finished = True + joinpoints_completing_parent = [a for a in task_allocations if a.task.preceding_task_completes_parent] + # we need to actively send CompleteCurrentTask messages to all remaining workers. + if len(joinpoints_completing_parent) > 0 and not self.complete_current_task_sent: + # while this list could contain multiple items, it should always be the same task (but multiple + # different clients) so any item is sufficient. + current_join_point = joinpoints_completing_parent[0].task + self.logger.info("Tasks before join point [%s] are able to complete the parent structure. Checking " + "if all [%d] clients have finished yet.", + current_join_point, len(current_join_point.clients_executing_completing_task)) + + pending_client_ids = [] for client_id in current_join_point.clients_executing_completing_task: - if client_id not in self.clients_completed_current_step: - self.logger.info("Client id [%s] did not yet finish.", client_id) - # do not break here so we can see all remaining clients in the log output. - all_clients_finished = False - if all_clients_finished: - # As we are waiting for other clients to finish, we would send this message over and over again. Hence we need to - # memorize whether we have already sent it for the current step. + # We assume that all clients have finished if their corresponding worker has finished + worker_id = self.clients_per_worker[client_id] + if worker_id not in self.workers_completed_current_step: + pending_client_ids.append(client_id) + + # are all clients executing said task already done? if so we need to notify the remaining clients + if len(pending_client_ids) == 0: + # As we are waiting for other clients to finish, we would send this message over and over again. + # Hence we need to memorize whether we have already sent it for the current step. self.complete_current_task_sent = True self.logger.info("All affected clients have finished. Notifying all clients to complete their current tasks.") - for client_id, driver in enumerate(self.drivers): - self.target.complete_current_task(driver) + for worker_id, worker in enumerate(self.workers): + self.target.complete_current_task(worker) + else: + if len(pending_client_ids) > 32: + self.logger.info("[%d] clients did not yet finish.", len(pending_client_ids)) + else: + self.logger.info("Client id(s) [%s] did not yet finish.", ",".join(map(str, pending_client_ids))) def reset_relative_time(self): self.logger.debug("Resetting relative time of request metrics store.") @@ -554,6 +584,7 @@ def update_samples(self, samples): self.raw_samples += samples if len(samples) > 0: most_recent = samples[-1] + # TODO: We need to change this similarly to async_driver as there are more clients now per worker self.most_recent_sample_per_client[most_recent.client_id] = most_recent def update_progress_message(self, task_finished=False): @@ -649,9 +680,64 @@ def merge(self, *args): return result -class LoadGenerator(actor.RallyActor): +def calculate_worker_assignments(host_configs, client_count): + """ + Assigns clients to workers on the provided hosts. + + :param host_configs: A list of dicts where each dict contains the host name (key: ``host``) and the number of + available CPU cores (key: ``cores``). + :param client_count: The number of clients that should be used at most. + :return: A list of dicts containing the host (key: ``host``) and a list of workers (key ``workers``). Each entry + in that list contains another list with the clients that should be assigned to these workers. """ - The actual driver that applies load against the cluster(s). + assignments = [] + + for host_config in host_configs: + assignments.append({ + "host": host_config["host"], + "workers": [[] for _ in range(host_config["cores"])], + "worker": 0 + }) + + for client_idx in range(client_count): + host = assignments[client_idx % len(assignments)] + workers = host["workers"] + worker_idx = host["worker"] + worker = host["workers"][worker_idx % len(workers)] + worker.append(client_idx) + host["worker"] = worker_idx + 1 + + return assignments + + +ClientAllocation = collections.namedtuple("ClientAllocation", ["client_id", "task"]) + + +class ClientAllocations: + def __init__(self): + self.allocations = [] + + def add(self, client_id, tasks): + self.allocations.append({ + "client_id": client_id, + "tasks": tasks + }) + + def is_joinpoint(self, task_index): + return all(isinstance(t.task, JoinPoint) for t in self.tasks(task_index)) + + def tasks(self, task_index, remove_empty=True): + current_tasks = [] + for allocation in self.allocations: + tasks_at_index = allocation["tasks"][task_index] + if remove_empty and tasks_at_index is not None: + current_tasks.append(ClientAllocation(allocation["client_id"], tasks_at_index)) + return current_tasks + + +class Worker(actor.RallyActor): + """ + The actual worker that applies load against the cluster(s). It will also regularly send measurements to the master node so it can consolidate them. """ @@ -661,45 +747,36 @@ class LoadGenerator(actor.RallyActor): def __init__(self): super().__init__() self.master = None - self.client_id = None - self.es = None + self.worker_id = None self.config = None self.track = None - self.tasks = None + self.client_allocations = None self.current_task_index = 0 - self.current_task = None + self.next_task_index = 0 self.abort_on_error = False self.pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) # cancellation via future does not work, hence we use our own mechanism with a shared variable and polling self.cancel = threading.Event() - # used to indicate that we want to prematurely consider this completed. This is *not* due to cancellation but a regular event in - # a benchmark and used to model task dependency of parallel tasks. + # used to indicate that we want to prematurely consider this completed. This is *not* due to cancellation + # but a regular event in a benchmark and used to model task dependency of parallel tasks. self.complete = threading.Event() self.executor_future = None self.sampler = None self.start_driving = False - self.wakeup_interval = LoadGenerator.WAKEUP_INTERVAL_SECONDS - - @actor.no_retry("load generator") - def receiveMsg_StartLoadGenerator(self, msg, sender): - def es_clients(all_hosts, all_client_options): - es = {} - for cluster_name, cluster_hosts in all_hosts.items(): - es[cluster_name] = client.EsClientFactory(cluster_hosts, all_client_options[cluster_name]).create() - return es + self.wakeup_interval = Worker.WAKEUP_INTERVAL_SECONDS - self.logger.info("LoadGenerator[%d] is about to start.", msg.client_id) + @actor.no_retry("worker") + def receiveMsg_StartWorker(self, msg, sender): + self.logger.info("Worker[%d] is about to start.", msg.worker_id) self.master = sender - self.client_id = msg.client_id + self.worker_id = msg.worker_id self.config = load_local_config(msg.config) self.abort_on_error = self.config.opts("driver", "on.error") == "abort" - self.es = es_clients(self.config.opts("client", "hosts").all_hosts, self.config.opts("client", "options").all_client_options) self.track = msg.track track.set_absolute_data_path(self.config, self.track) - self.tasks = msg.tasks + self.client_allocations = msg.client_allocations self.current_task_index = 0 self.cancel.clear() - self.current_task = None # we need to wake up more often in test mode if self.config.opts("track", "test.mode.enabled"): self.wakeup_interval = 0.5 @@ -708,65 +785,65 @@ def es_clients(all_hosts, all_client_options): track.load_track_plugins(self.config, runner.register_runner, scheduler.register_scheduler) self.drive() - @actor.no_retry("load generator") + @actor.no_retry("worker") def receiveMsg_Drive(self, msg, sender): sleep_time = datetime.timedelta(seconds=msg.client_start_timestamp - time.perf_counter()) - self.logger.info("LoadGenerator[%d] is continuing its work at task index [%d] on [%f], that is in [%s].", - self.client_id, self.current_task_index, msg.client_start_timestamp, sleep_time) + self.logger.info("Worker[%d] is continuing its work at task index [%d] on [%f], that is in [%s].", + self.worker_id, self.current_task_index, msg.client_start_timestamp, sleep_time) self.start_driving = True self.wakeupAfter(sleep_time) - @actor.no_retry("load generator") + @actor.no_retry("worker") def receiveMsg_CompleteCurrentTask(self, msg, sender): # finish now ASAP. Remaining samples will be sent with the next WakeupMessage. We will also need to skip to the next # JoinPoint. But if we are already at a JoinPoint at the moment, there is nothing to do. if self.at_joinpoint(): - self.logger.info("LoadGenerator[%s] has received CompleteCurrentTask but is currently at [%s]. Ignoring.", - str(self.client_id), self.current_task) + self.logger.info("Worker[%s] has received CompleteCurrentTask but is currently at join point at index [%d]. Ignoring.", + str(self.worker_id), self.current_task_index) else: - self.logger.info("LoadGenerator[%s] has received CompleteCurrentTask. Completing current task [%s].", - str(self.client_id), self.current_task) + self.logger.info("Worker[%s] has received CompleteCurrentTask. Completing tasks at index [%d].", + str(self.worker_id), self.current_task_index) self.complete.set() - @actor.no_retry("load generator") + @actor.no_retry("worker") def receiveMsg_WakeupMessage(self, msg, sender): # it would be better if we could send ourselves a message at a specific time, simulate this with a boolean... if self.start_driving: - self.logger.info("LoadGenerator[%s] starts driving now.", str(self.client_id)) self.start_driving = False self.drive() else: current_samples = self.send_samples() if self.cancel.is_set(): - self.logger.info("LoadGenerator[%s] has detected that benchmark has been cancelled. Notifying master...", - str(self.client_id)) + self.logger.info("Worker[%s] has detected that benchmark has been cancelled. Notifying master...", + str(self.worker_id)) self.send(self.master, actor.BenchmarkCancelled()) elif self.executor_future is not None and self.executor_future.done(): e = self.executor_future.exception(timeout=0) if e: - self.logger.info("LoadGenerator[%s] has detected a benchmark failure. Notifying master...", str(self.client_id)) + self.logger.info("Worker[%s] has detected a benchmark failure. Notifying master...", str(self.worker_id)) # the exception might be user-defined and not be on the load path of the master driver. Hence, it cannot be # deserialized on the receiver so we convert it here to a plain string. - self.send(self.master, actor.BenchmarkFailure("Error in load generator [{}]".format(self.client_id), str(e))) + self.send(self.master, actor.BenchmarkFailure("Error in load generator [{}]".format(self.worker_id), str(e))) else: - self.logger.info("LoadGenerator[%s] is ready for the next task.", str(self.client_id)) + self.logger.info("Worker[%s] is ready for the next task.", str(self.worker_id)) self.executor_future = None self.drive() else: if current_samples and len(current_samples) > 0: most_recent_sample = current_samples[-1] if most_recent_sample.percent_completed is not None: - self.logger.debug("LoadGenerator[%s] is executing [%s] (%.2f%% complete).", - str(self.client_id), most_recent_sample.task, most_recent_sample.percent_completed * 100.0) + self.logger.debug("Worker[%s] is executing [%s] (%.2f%% complete).", + str(self.worker_id), most_recent_sample.task, most_recent_sample.percent_completed * 100.0) else: - self.logger.debug("LoadGenerator[%s] is executing [%s] (dependent eternal task).", - str(self.client_id), most_recent_sample.task) + # TODO: This could be misleading given that one worker could execute more than one task... + self.logger.debug("Worker[%s] is executing [%s] (dependent eternal task).", + str(self.worker_id), most_recent_sample.task) else: - self.logger.debug("LoadGenerator[%s] is executing (no samples).", str(self.client_id)) + self.logger.debug("Worker[%s] is executing (no samples).", str(self.worker_id)) self.wakeupAfter(datetime.timedelta(seconds=self.wakeup_interval)) def receiveMsg_ActorExitRequest(self, msg, sender): - self.logger.info("LoadGenerator[%s] is exiting due to ActorExitRequest.", str(self.client_id)) + self.logger.info("Worker[%s] is exiting due to ActorExitRequest.", str(self.worker_id)) if self.executor_future is not None and self.executor_future.running(): self.cancel.set() self.pool.shutdown() @@ -776,17 +853,16 @@ def receiveMsg_BenchmarkFailure(self, msg, sender): self.send(self.master, msg) def receiveUnrecognizedMessage(self, msg, sender): - self.logger.info("LoadGenerator[%d] received unknown message [%s] (ignoring).", self.client_id, str(msg)) + self.logger.info("Worker[%d] received unknown message [%s] (ignoring).", self.worker_id, str(msg)) def drive(self): - task_allocation = self.current_task_and_advance() + task_allocations = self.current_tasks_and_advance() # skip non-tasks in the task list - while task_allocation is None: - task_allocation = self.current_task_and_advance() - self.current_task = task_allocation + while len(task_allocations) == 0: + task_allocations = self.current_tasks_and_advance() - if isinstance(task_allocation, JoinPoint): - self.logger.info("LoadGenerator[%d] reached join point [%s].", self.client_id, task_allocation) + if self.at_joinpoint(): + self.logger.info("Worker[%d] reached join point at index [%d].", self.worker_id, self.current_task_index) # clients that don't execute tasks don't need to care about waiting if self.executor_future is not None: self.executor_future.result() @@ -795,48 +871,37 @@ def drive(self): self.complete.clear() self.executor_future = None self.sampler = None - self.send(self.master, JoinPointReached(self.client_id, task_allocation)) - elif isinstance(task_allocation, TaskAllocation): - task = task_allocation.task - # There may be a situation where there are more (parallel) tasks than clients. If we were asked to complete all tasks, we not + self.send(self.master, JoinPointReached(self.worker_id, task_allocations)) + else: + # There may be a situation where there are more (parallel) tasks than workers. If we were asked to complete all tasks, we not # only need to complete actively running tasks but actually all scheduled tasks until we reach the next join point. if self.complete.is_set(): - self.logger.info("LoadGenerator[%d] skips [%s] because it has been asked to complete all tasks until next join point.", - self.client_id, task) + self.logger.info("Worker[%d] skips tasks at index [%d] because it has been asked to complete all " + "tasks until next join point.", self.worker_id, self.current_task_index) else: - self.logger.info("LoadGenerator[%d] is executing [%s].", self.client_id, task) - self.sampler = Sampler(start_timestamp=time.perf_counter()) - # We cannot use the global client index here because we need to support parallel execution of tasks with multiple clients. - # - # Consider the following scenario: - # - # * Clients 0-3 bulk index into indexA - # * Clients 4-7 bulk index into indexB - # - # Now we need to ensure that we start partitioning parameters correctly in both cases. And that means we need to start - # from (client) index 0 in both cases instead of 0 for indexA and 4 for indexB. - schedule = schedule_for(self.track, task_allocation.task, task_allocation.client_index_in_task) - executor = AsyncIoAdapter( - self.config, self.client_id, task, schedule, self.sampler, self.cancel, self.complete, self.abort_on_error) + self.logger.info("Worker[%d] is executing tasks at index [%d].", self.worker_id, self.current_task_index) + # allow to buffer more events than by default as we expect to have way more clients. + self.sampler = Sampler(start_timestamp=time.perf_counter(), buffer_size=65536) + executor = AsyncIoAdapter(self.config, self.track, task_allocations, self.sampler, self.cancel, self.complete, self.abort_on_error) self.executor_future = self.pool.submit(executor) self.wakeupAfter(datetime.timedelta(seconds=self.wakeup_interval)) - else: - raise exceptions.RallyAssertionError("Unknown task type [%s]" % type(task_allocation)) def at_joinpoint(self): - return isinstance(self.current_task, JoinPoint) + return self.client_allocations.is_joinpoint(self.current_task_index) - def current_task_and_advance(self): - current = self.tasks[self.current_task_index] - self.current_task_index += 1 + def current_tasks_and_advance(self): + self.current_task_index = self.next_task_index + current = self.client_allocations.tasks(self.current_task_index) + self.next_task_index += 1 + self.logger.debug("Worker[%d] is at task index [%d].", self.worker_id, self.current_task_index) return current def send_samples(self): if self.sampler: samples = self.sampler.samples if len(samples) > 0: - self.send(self.master, UpdateSamples(self.client_id, samples)) + self.send(self.master, UpdateSamples(self.worker_id, samples)) return samples return None @@ -1026,17 +1091,17 @@ def calculate(self, samples, bucket_interval_secs=1): class AsyncIoAdapter: - def __init__(self, cfg, client_id, sub_task, schedule, sampler, cancel, complete, abort_on_error): + def __init__(self, cfg, track, task_allocations, sampler, cancel, complete, abort_on_error): self.cfg = cfg - self.client_id = client_id - self.sub_task = sub_task - self.schedule = schedule + self.track = track + self.task_allocations = task_allocations self.sampler = sampler self.cancel = cancel self.complete = complete self.abort_on_error = abort_on_error self.profiling_enabled = self.cfg.opts("driver", "profiling") self.debug_event_loop = self.cfg.opts("system", "async.debug", mandatory=False, default_value=False) + self.logger = logging.getLogger(__name__) def __call__(self, *args, **kwargs): # only possible in Python 3.7+ (has introduced get_running_loop) @@ -1055,7 +1120,7 @@ def __call__(self, *args, **kwargs): loop.close() def _logging_exception_handler(self, loop, context): - logging.getLogger(__name__).error("Uncaught exception in event loop: %s", context) + self.logger.error("Uncaught exception in event loop: %s", context) async def run(self): def es_clients(all_hosts, all_client_options): @@ -1065,11 +1130,24 @@ def es_clients(all_hosts, all_client_options): return es es = es_clients(self.cfg.opts("client", "hosts").all_hosts, self.cfg.opts("client", "options").all_client_options) - async_executor = AsyncExecutor( - self.client_id, self.sub_task, self.schedule, es, self.sampler, self.cancel, self.complete, self.abort_on_error) - final_executor = AsyncProfiler(async_executor) if self.profiling_enabled else async_executor + + aws = [] + for client_id, task in self.task_allocations: + # We cannot use the global client index here because we need to support parallel execution of tasks + # with multiple clients. Consider the following scenario: + # + # * Clients 0-3 bulk index into indexA + # * Clients 4-7 bulk index into indexB + # + # Now we need to ensure that we start partitioning parameters correctly in both cases. And that means we + # need to start from (client) index 0 in both cases instead of 0 for indexA and 4 for indexB. + schedule = schedule_for(self.track, task.task, task.client_index_in_task) + async_executor = AsyncExecutor( + client_id, task.task, schedule, es, self.sampler, self.cancel, self.complete, self.abort_on_error) + final_executor = AsyncProfiler(async_executor) if self.profiling_enabled else async_executor + aws.append(final_executor()) try: - return await final_executor() + _ = await asyncio.gather(*aws) finally: await asyncio.get_event_loop().shutdown_asyncgens() for e in es.values(): @@ -1131,6 +1209,7 @@ def __init__(self, client_id, task, schedule, es, sampler, cancel, complete, abo self.logger = logging.getLogger(__name__) async def __call__(self, *args, **kwargs): + task_completes_parent = self.task.completes_parent total_start = time.perf_counter() # lazily initialize the schedule self.logger.debug("Initializing schedule for client id [%s].", self.client_id) @@ -1157,8 +1236,16 @@ async def __call__(self, *args, **kwargs): processing_time = processing_end - processing_start # Do not calculate latency separately when we don't throttle throughput. This metric is just confusing then. latency = stop - absolute_expected_schedule_time if throughput_throttled else service_time + # If this task completes the parent task we should *not* check for completion by another client but + # instead continue until our own runner has completed. We need to do this because the current + # worker (process) could run multiple clients that execute the same task. We do not want all clients to + # finish this task as soon as the first of these clients has finished but rather continue until the last + # client has finished that task. + if task_completes_parent: + completed = runner.completed + else: + completed = self.complete.is_set() or runner.completed # last sample should bump progress to 100% if externally completed. - completed = self.complete.is_set() or runner.completed if completed: progress = 1.0 elif runner.percent_completed: @@ -1178,7 +1265,9 @@ async def __call__(self, *args, **kwargs): raise finally: # Actively set it if this task completes its parent - if self.task.completes_parent: + if task_completes_parent: + self.logger.info("Task [%s] completes parent. Client id [%s] is finished executing it and signals completion.", + self.task, self.client_id) self.complete.set() diff --git a/tests/driver/driver_test.py b/tests/driver/driver_test.py index 2f15a05b0..2b01cddf3 100644 --- a/tests/driver/driver_test.py +++ b/tests/driver/driver_test.py @@ -144,7 +144,7 @@ def test_start_benchmark_and_prepare_track(self, resolve): ]) # Did we start all load generators? There is no specific mock assert for this... - self.assertEqual(4, target.start_load_generator.call_count) + self.assertEqual(4, target.start_worker.call_count) def test_assign_drivers_round_robin(self): target = self.create_test_driver_target() @@ -165,7 +165,7 @@ def test_assign_drivers_round_robin(self): ]) # Did we start all load generators? There is no specific mock assert for this... - self.assertEqual(4, target.start_load_generator.call_count) + self.assertEqual(4, target.start_worker.call_count) def test_client_reaches_join_point_others_still_executing(self): target = self.create_test_driver_target() @@ -174,11 +174,11 @@ def test_client_reaches_join_point_others_still_executing(self): d.prepare_benchmark(t=self.track) d.start_benchmark() - self.assertEqual(0, len(d.clients_completed_current_step)) + self.assertEqual(0, len(d.workers_completed_current_step)) - d.joinpoint_reached(client_id=0, client_local_timestamp=10, task=driver.JoinPoint(id=0)) + d.joinpoint_reached(worker_id=0, worker_local_timestamp=10, task=driver.JoinPoint(id=0)) - self.assertEqual(1, len(d.clients_completed_current_step)) + self.assertEqual(1, len(d.workers_completed_current_step)) self.assertEqual(0, target.on_task_finished.call_count) self.assertEqual(0, target.drive_at.call_count) @@ -190,30 +190,30 @@ def test_client_reaches_join_point_which_completes_parent(self): d.prepare_benchmark(t=self.track) d.start_benchmark() - self.assertEqual(0, len(d.clients_completed_current_step)) + self.assertEqual(0, len(d.workers_completed_current_step)) # it does not matter what we put into `clients_executing_completing_task` We choose to put the client id into it. - d.joinpoint_reached(client_id=0, client_local_timestamp=10, task=driver.JoinPoint(id=0, clients_executing_completing_task=[0])) + d.joinpoint_reached(worker_id=0, worker_local_timestamp=10, task=driver.JoinPoint(id=0, clients_executing_completing_task=[0])) self.assertEqual(-1, d.current_step) - self.assertEqual(1, len(d.clients_completed_current_step)) + self.assertEqual(1, len(d.workers_completed_current_step)) # notified all drivers that they should complete the current task ASAP self.assertEqual(4, target.complete_current_task.call_count) # awaiting responses of other clients - d.joinpoint_reached(client_id=1, client_local_timestamp=11, task=driver.JoinPoint(id=0, clients_executing_completing_task=[0])) + d.joinpoint_reached(worker_id=1, worker_local_timestamp=11, task=driver.JoinPoint(id=0, clients_executing_completing_task=[0])) self.assertEqual(-1, d.current_step) - self.assertEqual(2, len(d.clients_completed_current_step)) + self.assertEqual(2, len(d.workers_completed_current_step)) - d.joinpoint_reached(client_id=2, client_local_timestamp=12, task=driver.JoinPoint(id=0, clients_executing_completing_task=[0])) + d.joinpoint_reached(worker_id=2, worker_local_timestamp=12, task=driver.JoinPoint(id=0, clients_executing_completing_task=[0])) self.assertEqual(-1, d.current_step) - self.assertEqual(3, len(d.clients_completed_current_step)) + self.assertEqual(3, len(d.workers_completed_current_step)) - d.joinpoint_reached(client_id=3, client_local_timestamp=13, task=driver.JoinPoint(id=0, clients_executing_completing_task=[0])) + d.joinpoint_reached(worker_id=3, worker_local_timestamp=13, task=driver.JoinPoint(id=0, clients_executing_completing_task=[0])) # by now the previous step should be considered completed and we are at the next one self.assertEqual(0, d.current_step) - self.assertEqual(0, len(d.clients_completed_current_step)) + self.assertEqual(0, len(d.workers_completed_current_step)) # this requires at least Python 3.6 # target.on_task_finished.assert_called_once() From 2ba292563f913261735416cd054487181b6a877c Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Wed, 1 Apr 2020 13:26:57 +0200 Subject: [PATCH 2/9] Fix existing tests --- esrally/driver/driver.py | 3 ++- tests/driver/driver_test.py | 51 +++++++++++++++++++++++++++---------- 2 files changed, 39 insertions(+), 15 deletions(-) diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index a0c6fc5f1..cc427e927 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -436,7 +436,8 @@ def prepare_benchmark(self, t): for host in self.config.opts("driver", "load_driver_hosts"): host_config = { # for simplicity we assume that all benchmark machines have the same specs - "cores": multiprocessing.cpu_count() + "cores": self.config.opts("system", "available.cores", mandatory=False, + default_value=multiprocessing.cpu_count()) } if host != "localhost": host_config["host"] = net.resolve(host) diff --git a/tests/driver/driver_test.py b/tests/driver/driver_test.py index 2b01cddf3..bb6efd99d 100644 --- a/tests/driver/driver_test.py +++ b/tests/driver/driver_test.py @@ -87,6 +87,7 @@ def setUp(self): self.cfg.add(config.Scope.application, "system", "env.name", "unittest") self.cfg.add(config.Scope.application, "system", "time.start", datetime(year=2017, month=8, day=20, hour=1, minute=0, second=0)) self.cfg.add(config.Scope.application, "system", "race.id", "6ebc6e53-ee20-4b0c-99b4-09697987e9f4") + self.cfg.add(config.Scope.application, "system", "available.cores", 8) self.cfg.add(config.Scope.application, "track", "challenge.name", "default") self.cfg.add(config.Scope.application, "track", "params", {}) self.cfg.add(config.Scope.application, "track", "test.mode.enabled", True) @@ -137,10 +138,10 @@ def test_start_benchmark_and_prepare_track(self, resolve): d.start_benchmark() target.create_client.assert_has_calls(calls=[ - mock.call(0, "10.5.5.1"), - mock.call(1, "10.5.5.2"), - mock.call(2, "10.5.5.1"), - mock.call(3, "10.5.5.2"), + mock.call("10.5.5.1"), + mock.call("10.5.5.1"), + mock.call("10.5.5.2"), + mock.call("10.5.5.2"), ]) # Did we start all load generators? There is no specific mock assert for this... @@ -158,10 +159,10 @@ def test_assign_drivers_round_robin(self): d.start_benchmark() target.create_client.assert_has_calls(calls=[ - mock.call(0, "localhost"), - mock.call(1, "localhost"), - mock.call(2, "localhost"), - mock.call(3, "localhost"), + mock.call("localhost"), + mock.call("localhost"), + mock.call("localhost"), + mock.call("localhost"), ]) # Did we start all load generators? There is no specific mock assert for this... @@ -176,7 +177,9 @@ def test_client_reaches_join_point_others_still_executing(self): self.assertEqual(0, len(d.workers_completed_current_step)) - d.joinpoint_reached(worker_id=0, worker_local_timestamp=10, task=driver.JoinPoint(id=0)) + d.joinpoint_reached(worker_id=0, + worker_local_timestamp=10, + task_allocations=[driver.ClientAllocation(client_id=0, task=driver.JoinPoint(id=0))]) self.assertEqual(1, len(d.workers_completed_current_step)) @@ -192,8 +195,12 @@ def test_client_reaches_join_point_which_completes_parent(self): self.assertEqual(0, len(d.workers_completed_current_step)) - # it does not matter what we put into `clients_executing_completing_task` We choose to put the client id into it. - d.joinpoint_reached(worker_id=0, worker_local_timestamp=10, task=driver.JoinPoint(id=0, clients_executing_completing_task=[0])) + d.joinpoint_reached(worker_id=0, + worker_local_timestamp=10, + task_allocations=[ + driver.ClientAllocation(client_id=0, + task=driver.JoinPoint(id=0, + clients_executing_completing_task=[0]))]) self.assertEqual(-1, d.current_step) self.assertEqual(1, len(d.workers_completed_current_step)) @@ -201,15 +208,31 @@ def test_client_reaches_join_point_which_completes_parent(self): self.assertEqual(4, target.complete_current_task.call_count) # awaiting responses of other clients - d.joinpoint_reached(worker_id=1, worker_local_timestamp=11, task=driver.JoinPoint(id=0, clients_executing_completing_task=[0])) + d.joinpoint_reached(worker_id=1, + worker_local_timestamp=11, + task_allocations=[ + driver.ClientAllocation(client_id=1, + task=driver.JoinPoint(id=0, + clients_executing_completing_task=[0]))]) + self.assertEqual(-1, d.current_step) self.assertEqual(2, len(d.workers_completed_current_step)) - d.joinpoint_reached(worker_id=2, worker_local_timestamp=12, task=driver.JoinPoint(id=0, clients_executing_completing_task=[0])) + d.joinpoint_reached(worker_id=2, + worker_local_timestamp=12, + task_allocations=[ + driver.ClientAllocation(client_id=2, + task=driver.JoinPoint(id=0, + clients_executing_completing_task=[0]))]) self.assertEqual(-1, d.current_step) self.assertEqual(3, len(d.workers_completed_current_step)) - d.joinpoint_reached(worker_id=3, worker_local_timestamp=13, task=driver.JoinPoint(id=0, clients_executing_completing_task=[0])) + d.joinpoint_reached(worker_id=3, + worker_local_timestamp=13, + task_allocations=[ + driver.ClientAllocation(client_id=3, + task=driver.JoinPoint(id=0, + clients_executing_completing_task=[0]))]) # by now the previous step should be considered completed and we are at the next one self.assertEqual(0, d.current_step) From e9d5d68428ec91bbfce02f96ffcc671dd33bc8fc Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Wed, 1 Apr 2020 13:37:37 +0200 Subject: [PATCH 3/9] Add test for worker assignments --- tests/driver/driver_test.py | 139 ++++++++++++++++++++++++++++++++++++ 1 file changed, 139 insertions(+) diff --git a/tests/driver/driver_test.py b/tests/driver/driver_test.py index bb6efd99d..06b40a1db 100644 --- a/tests/driver/driver_test.py +++ b/tests/driver/driver_test.py @@ -248,6 +248,145 @@ def op(name, operation_type): return track.Operation(name, operation_type, param_source="driver-test-param-source") +class WorkerAssignmentTests(TestCase): + def test_single_host_assignment_clients_matches_cores(self): + host_configs = [{ + "host": "localhost", + "cores": 4 + }] + + assignments = driver.calculate_worker_assignments(host_configs, client_count=4) + + self.assertEqual([ + { + "host": "localhost", + "workers": [ + [0], + [1], + [2], + [3] + ], + "worker": 4 + } + ], assignments) + + def test_single_host_assignment_more_clients_than_cores(self): + host_configs = [{ + "host": "localhost", + "cores": 4 + }] + + assignments = driver.calculate_worker_assignments(host_configs, client_count=6) + + self.assertEqual([ + { + "host": "localhost", + "workers": [ + [0, 4], + [1, 5], + [2], + [3] + ], + "worker": 6 + } + ], assignments) + + def test_single_host_assignment_less_clients_than_cores(self): + host_configs = [{ + "host": "localhost", + "cores": 4 + }] + + assignments = driver.calculate_worker_assignments(host_configs, client_count=2) + + self.assertEqual([ + { + "host": "localhost", + "workers": [ + [0], + [1], + [], + [] + ], + "worker": 2 + } + ], assignments) + + def test_multiple_host_assignment_more_clients_than_cores(self): + host_configs = [ + { + "host": "host-a", + "cores": 4 + }, + { + "host": "host-b", + "cores": 4 + } + ] + + assignments = driver.calculate_worker_assignments(host_configs, client_count=16) + + self.assertEqual([ + { + "host": "host-a", + "workers": [ + [0, 8], + [2, 10], + [4, 12], + [6, 14] + ], + "worker": 8 + }, + { + "host": "host-b", + "workers": [ + [1, 9], + [3, 11], + [5, 13], + [7, 15] + ], + "worker": 8 + } + ], assignments) + + def test_multiple_host_assignment_less_clients_than_cores(self): + host_configs = [ + { + "host": "host-a", + "cores": 4 + }, + { + "host": "host-b", + "cores": 4 + } + ] + + assignments = driver.calculate_worker_assignments(host_configs, client_count=4) + + self.assertEqual([ + { + "host": "host-a", + "workers": [ + [0], + [2], + [], + [] + ], + "worker": 2 + }, + { + "host": "host-b", + "workers": [ + [1], + [3], + [], + [] + ], + "worker": 2 + } + ], assignments) + + class AllocatorTests(TestCase): def setUp(self): params.register_param_source_for_name("driver-test-param-source", DriverTestParamSource) From bdee89c335c8f5c7bde39f8d499738dd175f2bcd Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Wed, 1 Apr 2020 13:41:23 +0200 Subject: [PATCH 4/9] Make the linter happy --- esrally/driver/driver.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index cc427e927..fe26c2304 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -883,7 +883,8 @@ def drive(self): self.logger.info("Worker[%d] is executing tasks at index [%d].", self.worker_id, self.current_task_index) # allow to buffer more events than by default as we expect to have way more clients. self.sampler = Sampler(start_timestamp=time.perf_counter(), buffer_size=65536) - executor = AsyncIoAdapter(self.config, self.track, task_allocations, self.sampler, self.cancel, self.complete, self.abort_on_error) + executor = AsyncIoAdapter(self.config, self.track, task_allocations, self.sampler, + self.cancel, self.complete, self.abort_on_error) self.executor_future = self.pool.submit(executor) self.wakeupAfter(datetime.timedelta(seconds=self.wakeup_interval)) From 0d6d1b4b81f11753fcc2bcc5959261a671583969 Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Thu, 2 Apr 2020 08:41:39 +0200 Subject: [PATCH 5/9] More logging --- esrally/driver/driver.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index fe26c2304..0db2d34f8 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -1148,12 +1148,19 @@ def es_clients(all_hosts, all_client_options): client_id, task.task, schedule, es, self.sampler, self.cancel, self.complete, self.abort_on_error) final_executor = AsyncProfiler(async_executor) if self.profiling_enabled else async_executor aws.append(final_executor()) + run_start = time.perf_counter() try: _ = await asyncio.gather(*aws) finally: + run_end = time.perf_counter() + self.logger.info(f"Total run duration: {run_end - run_start} seconds.") await asyncio.get_event_loop().shutdown_asyncgens() + shutdown_asyncgens_end = time.perf_counter() + self.logger.info(f"Total time to shutdown asyncgens: {run_end - shutdown_asyncgens_end} seconds.") for e in es.values(): await e.transport.close() + transport_close_end = time.perf_counter() + self.logger.info(f"Total time to close transports: {shutdown_asyncgens_end - transport_close_end} seconds.") class AsyncProfiler: From 2c29074ff407c79a91d19950c8861b579c193f09 Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Thu, 2 Apr 2020 09:10:26 +0200 Subject: [PATCH 6/9] even more logging --- esrally/metrics.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/esrally/metrics.py b/esrally/metrics.py index e9bb73440..d48f15b88 100644 --- a/esrally/metrics.py +++ b/esrally/metrics.py @@ -889,9 +889,11 @@ def _get_template(self): def flush(self, refresh=True): if self._docs: + start = time.perf_counter() self._client.bulk_index(index=self._index, doc_type=EsMetricsStore.METRICS_DOC_TYPE, items=self._docs) - self.logger.info("Successfully added %d metrics documents for race timestamp=[%s], track=[%s], challenge=[%s], car=[%s].", - len(self._docs), self._race_timestamp, self._track, self._challenge, self._car) + end = time.perf_counter() + self.logger.info("Successfully added %d metrics documents for race timestamp=[%s], track=[%s], challenge=[%s], car=[%s] in [%f] seconds.", + len(self._docs), self._race_timestamp, self._track, self._challenge, self._car, (end - start)) self._docs = [] # ensure we can search immediately after flushing if refresh: From b89f8759a59406e55284552b9fc2cc0f56f28d06 Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Thu, 2 Apr 2020 09:13:39 +0200 Subject: [PATCH 7/9] Use stopwatch --- esrally/metrics.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/esrally/metrics.py b/esrally/metrics.py index d48f15b88..05ce92d0e 100644 --- a/esrally/metrics.py +++ b/esrally/metrics.py @@ -889,11 +889,12 @@ def _get_template(self): def flush(self, refresh=True): if self._docs: - start = time.perf_counter() + sw = time.StopWatch() + sw.start() self._client.bulk_index(index=self._index, doc_type=EsMetricsStore.METRICS_DOC_TYPE, items=self._docs) - end = time.perf_counter() + sw.stop() self.logger.info("Successfully added %d metrics documents for race timestamp=[%s], track=[%s], challenge=[%s], car=[%s] in [%f] seconds.", - len(self._docs), self._race_timestamp, self._track, self._challenge, self._car, (end - start)) + len(self._docs), self._race_timestamp, self._track, self._challenge, self._car, sw.total_time()) self._docs = [] # ensure we can search immediately after flushing if refresh: From b5c435130296841bdf99c7a2fc585706f87e30fd Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Thu, 2 Apr 2020 09:34:55 +0200 Subject: [PATCH 8/9] Convert core count to int --- esrally/driver/driver.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index 0db2d34f8..1fa654fd8 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -436,8 +436,8 @@ def prepare_benchmark(self, t): for host in self.config.opts("driver", "load_driver_hosts"): host_config = { # for simplicity we assume that all benchmark machines have the same specs - "cores": self.config.opts("system", "available.cores", mandatory=False, - default_value=multiprocessing.cpu_count()) + "cores": int(self.config.opts("system", "available.cores", mandatory=False, + default_value=multiprocessing.cpu_count())) } if host != "localhost": host_config["host"] = net.resolve(host) From 27fa34fdf33483e8c52ae21f64a6f3a0de892c75 Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Thu, 2 Apr 2020 09:35:07 +0200 Subject: [PATCH 9/9] Fix linter issues --- esrally/driver/driver.py | 6 +++--- esrally/metrics.py | 5 +++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index 1fa654fd8..a179d3988 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -1153,14 +1153,14 @@ def es_clients(all_hosts, all_client_options): _ = await asyncio.gather(*aws) finally: run_end = time.perf_counter() - self.logger.info(f"Total run duration: {run_end - run_start} seconds.") + self.logger.info("Total run duration: %f seconds.", (run_end - run_start)) await asyncio.get_event_loop().shutdown_asyncgens() shutdown_asyncgens_end = time.perf_counter() - self.logger.info(f"Total time to shutdown asyncgens: {run_end - shutdown_asyncgens_end} seconds.") + self.logger.info("Total time to shutdown asyncgens: %f seconds.", (shutdown_asyncgens_end - run_end)) for e in es.values(): await e.transport.close() transport_close_end = time.perf_counter() - self.logger.info(f"Total time to close transports: {shutdown_asyncgens_end - transport_close_end} seconds.") + self.logger.info("Total time to close transports: %f seconds.", (shutdown_asyncgens_end - transport_close_end)) class AsyncProfiler: diff --git a/esrally/metrics.py b/esrally/metrics.py index 05ce92d0e..c99c6515e 100644 --- a/esrally/metrics.py +++ b/esrally/metrics.py @@ -893,8 +893,9 @@ def flush(self, refresh=True): sw.start() self._client.bulk_index(index=self._index, doc_type=EsMetricsStore.METRICS_DOC_TYPE, items=self._docs) sw.stop() - self.logger.info("Successfully added %d metrics documents for race timestamp=[%s], track=[%s], challenge=[%s], car=[%s] in [%f] seconds.", - len(self._docs), self._race_timestamp, self._track, self._challenge, self._car, sw.total_time()) + self.logger.info("Successfully added %d metrics documents for race timestamp=[%s], track=[%s], " + "challenge=[%s], car=[%s] in [%f] seconds.", len(self._docs), self._race_timestamp, + self._track, self._challenge, self._car, sw.total_time()) self._docs = [] # ensure we can search immediately after flushing if refresh: