Skip to content

Commit

Permalink
Throttle requests from the beginning (#1199)
Browse files Browse the repository at this point in the history
With this commit we consult the scheduler prior to issuing a request
instead of afterwards. If we don't do that, clients can coordinate
creating a large initial load spike in Elasticsearch. With this
countermeasure, it is possible that clients avoid this initial spike if
a non-determistic scheduler, such as the poisson scheduler is chosen.

Relates #1195
  • Loading branch information
danielmitterdorfer authored Mar 10, 2021
1 parent 5d93d2b commit c0b33fb
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 2 deletions.
5 changes: 5 additions & 0 deletions docs/migrate.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ Migration Guide
Migrating to Rally 2.1.0
------------------------

Throttling is active from the beginning
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Previously Rally has issued the first request immediately regardless of the target throughput. With this release, Rally will defer the first request according to the target throughput and the scheduling policy. Together with a poisson schedule, this measure avoids coordination among clients that hit Elasticsearch at exactly the same time causing a large initial spike.

Custom bulk parameter sources need to provide a unit
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
4 changes: 2 additions & 2 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -1746,26 +1746,26 @@ async def __call__(self):
self.task_progress_control.start()
while True:
try:
next_scheduled = self.sched.next(next_scheduled)
# does not contribute at all to completion. Hence, we cannot define completion.
percent_completed = self.params.percent_completed if param_source_knows_progress else None
#current_params = await self.loop.run_in_executor(self.io_pool_exc, self.params.params)
yield (next_scheduled, self.task_progress_control.sample_type, percent_completed, self.runner,
self.params.params())
next_scheduled = self.sched.next(next_scheduled)
self.task_progress_control.next()
except StopIteration:
return
else:
self.task_progress_control.start()
while not self.task_progress_control.completed:
try:
next_scheduled = self.sched.next(next_scheduled)
#current_params = await self.loop.run_in_executor(self.io_pool_exc, self.params.params)
yield (next_scheduled,
self.task_progress_control.sample_type,
self.task_progress_control.percent_completed,
self.runner,
self.params.params())
next_scheduled = self.sched.next(next_scheduled)
self.task_progress_control.next()
except StopIteration:
return
Expand Down

0 comments on commit c0b33fb

Please sign in to comment.