From cabdc630e85b83938d9a6bac44e872d27c4e5d0f Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Wed, 24 Feb 2021 13:44:20 +0100 Subject: [PATCH] Throttle requests from the beginning With this commit we consult the scheduler prior to issuing a request instead of afterwards. If we don't do that, clients can coordinate creating a large initial load spike in Elasticsearch. With this countermeasure, it is possible that clients avoid this initial spike if a non-determistic scheduler, such as the poisson scheduler is chosen. Relates #1195 --- docs/migrate.rst | 5 +++++ esrally/driver/driver.py | 4 ++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/docs/migrate.rst b/docs/migrate.rst index c8fc7a192..ecb512dfb 100644 --- a/docs/migrate.rst +++ b/docs/migrate.rst @@ -4,6 +4,11 @@ Migration Guide Migrating to Rally 2.1.0 ------------------------ +Throttling is active from the beginning +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Previously Rally has issued the first request immediately regardless of the target throughput. With this release, Rally will defer the first request according to the target throughput and the scheduling policy. Together with a poisson schedule, this measure avoids coordination among clients that hit Elasticsearch at exactly the same time causing a large initial spike. + Custom bulk parameter sources need to provide a unit ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index 6b8e2e071..6f07e8805 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -1746,12 +1746,12 @@ async def __call__(self): self.task_progress_control.start() while True: try: + next_scheduled = self.sched.next(next_scheduled) # does not contribute at all to completion. Hence, we cannot define completion. percent_completed = self.params.percent_completed if param_source_knows_progress else None #current_params = await self.loop.run_in_executor(self.io_pool_exc, self.params.params) yield (next_scheduled, self.task_progress_control.sample_type, percent_completed, self.runner, self.params.params()) - next_scheduled = self.sched.next(next_scheduled) self.task_progress_control.next() except StopIteration: return @@ -1759,13 +1759,13 @@ async def __call__(self): self.task_progress_control.start() while not self.task_progress_control.completed: try: + next_scheduled = self.sched.next(next_scheduled) #current_params = await self.loop.run_in_executor(self.io_pool_exc, self.params.params) yield (next_scheduled, self.task_progress_control.sample_type, self.task_progress_control.percent_completed, self.runner, self.params.params()) - next_scheduled = self.sched.next(next_scheduled) self.task_progress_control.next() except StopIteration: return