Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to downsample request metrics #1001

Merged
merged 1 commit into from
Jun 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 56 additions & 32 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,8 @@ def __init__(self, target, config, es_client_factory_class=client.EsClientFactor
self.quiet = False
self.allocations = None
self.raw_samples = []
self.throughput_calculator = None
self.most_recent_sample_per_client = {}
self.sample_post_processor = None

self.number_of_steps = 0
self.currently_completed = 0
Expand Down Expand Up @@ -427,11 +427,17 @@ def prepare_benchmark(self, t):
self.track = t
self.challenge = select_challenge(self.config, self.track)
self.quiet = self.config.opts("system", "quiet.mode", mandatory=False, default_value=False)
self.throughput_calculator = ThroughputCalculator()
downsample_factor = int(self.config.opts("reporting", "metrics.request.downsample.factor", mandatory=False, default_value=1))
self.metrics_store = metrics.metrics_store(cfg=self.config,
track=self.track.name,
challenge=self.challenge.name,
read_only=False)

self.sample_post_processor = SamplePostprocessor(self.metrics_store,
downsample_factor,
self.track.meta_data,
self.challenge.meta_data)

es_clients = self.create_es_clients()
self.wait_for_rest_api(es_clients)
self.prepare_telemetry(es_clients)
Expand Down Expand Up @@ -613,37 +619,54 @@ def update_progress_message(self, task_finished=False):
self.progress_reporter.finish()

def post_process_samples(self):
if len(self.raw_samples) == 0:
return
total_start = time.perf_counter()
start = total_start
# we do *not* do this here to avoid concurrent updates (actors are single-threaded) but rather to make it clear that we use
# only a snapshot and that new data will go to a new sample set.
raw_samples = self.raw_samples
self.raw_samples = []
for sample in raw_samples:
meta_data = self.merge(
self.track.meta_data,
self.challenge.meta_data,
sample.operation.meta_data,
sample.task.meta_data,
sample.request_meta_data)

self.metrics_store.put_value_cluster_level(name="latency", value=sample.latency_ms, unit="ms", task=sample.task.name,
operation=sample.operation.name, operation_type=sample.operation.type,
sample_type=sample.sample_type, absolute_time=sample.absolute_time,
relative_time=sample.relative_time, meta_data=meta_data)

self.metrics_store.put_value_cluster_level(name="service_time", value=sample.service_time_ms, unit="ms", task=sample.task.name,
operation=sample.task.name, operation_type=sample.operation.type,
sample_type=sample.sample_type, absolute_time=sample.absolute_time,
relative_time=sample.relative_time, meta_data=meta_data)

self.metrics_store.put_value_cluster_level(name="processing_time", value=sample.processing_time_ms,
unit="ms", task=sample.task.name,
operation=sample.task.name, operation_type=sample.operation.type,
sample_type=sample.sample_type, absolute_time=sample.absolute_time,
relative_time=sample.relative_time, meta_data=meta_data)
self.sample_post_processor(raw_samples)


class SamplePostprocessor:
def __init__(self, metrics_store, downsample_factor, track_meta_data, challenge_meta_data):
self.logger = logging.getLogger(__name__)
self.metrics_store = metrics_store
self.track_meta_data = track_meta_data
self.challenge_meta_data = challenge_meta_data
self.throughput_calculator = ThroughputCalculator()
self.downsample_factor = downsample_factor

def __call__(self, raw_samples):
if len(raw_samples) == 0:
return
total_start = time.perf_counter()
start = total_start
final_sample_count = 0
for idx, sample in enumerate(raw_samples):
if idx % self.downsample_factor == 0:
final_sample_count += 1
meta_data = self.merge(
self.track_meta_data,
self.challenge_meta_data,
sample.operation.meta_data,
sample.task.meta_data,
sample.request_meta_data)

self.metrics_store.put_value_cluster_level(name="latency", value=sample.latency_ms, unit="ms", task=sample.task.name,
operation=sample.operation.name, operation_type=sample.operation.type,
sample_type=sample.sample_type, absolute_time=sample.absolute_time,
relative_time=sample.relative_time, meta_data=meta_data)

self.metrics_store.put_value_cluster_level(name="service_time", value=sample.service_time_ms,
unit="ms", task=sample.task.name,
operation=sample.task.name, operation_type=sample.operation.type,
sample_type=sample.sample_type, absolute_time=sample.absolute_time,
relative_time=sample.relative_time, meta_data=meta_data)

self.metrics_store.put_value_cluster_level(name="processing_time", value=sample.processing_time_ms,
unit="ms", task=sample.task.name,
operation=sample.task.name, operation_type=sample.operation.type,
sample_type=sample.sample_type, absolute_time=sample.absolute_time,
relative_time=sample.relative_time, meta_data=meta_data)

end = time.perf_counter()
self.logger.debug("Storing latency and service time took [%f] seconds.", (end - start))
Expand All @@ -654,8 +677,8 @@ def post_process_samples(self):
start = end
for task, samples in aggregates.items():
meta_data = self.merge(
self.track.meta_data,
self.challenge.meta_data,
self.track_meta_data,
self.challenge_meta_data,
task.operation.meta_data,
task.meta_data
)
Expand All @@ -676,7 +699,8 @@ def post_process_samples(self):
self.metrics_store.flush(refresh=False)
end = time.perf_counter()
self.logger.debug("Flushing the metrics store took [%f] seconds.", (end - start))
self.logger.debug("Postprocessing [%d] raw samples took [%f] seconds in total.", len(raw_samples), (end - total_start))
self.logger.debug("Postprocessing [%d] raw samples (downsampled to [%d] samples) took [%f] seconds in total.",
len(raw_samples), final_sample_count, (end - total_start))

def merge(self, *args):
result = {}
Expand Down
83 changes: 83 additions & 0 deletions tests/driver/driver_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,89 @@ def op(name, operation_type):
return track.Operation(name, operation_type, param_source="driver-test-param-source")


class SamplePostprocessorTests(TestCase):
def throughput(self, absolute_time, relative_time, value):
return mock.call(name="throughput",
value=value,
unit="docs/s",
task="index",
operation="index",
operation_type=track.OperationType.Bulk,
sample_type=metrics.SampleType.Normal,
absolute_time=absolute_time,
relative_time=relative_time,
meta_data={})

def service_time(self, absolute_time, relative_time, value):
return self.request_metric(absolute_time, relative_time, "service_time", value)

def processing_time(self, absolute_time, relative_time, value):
return self.request_metric(absolute_time, relative_time, "processing_time", value)

def latency(self, absolute_time, relative_time, value):
return self.request_metric(absolute_time, relative_time, "latency", value)

def request_metric(self, absolute_time, relative_time, name, value):
return mock.call(name=name,
value=value,
unit="ms",
task="index",
operation="index",
operation_type=track.OperationType.Bulk,
sample_type=metrics.SampleType.Normal,
absolute_time=absolute_time,
relative_time=relative_time,
meta_data={})

@mock.patch("esrally.metrics.MetricsStore")
def test_all_samples(self, metrics_store):
post_process = driver.SamplePostprocessor(metrics_store,
downsample_factor=1,
track_meta_data={},
challenge_meta_data={})

task = track.Task("index",
track.Operation("index", track.OperationType.Bulk, param_source="driver-test-param-source"))
samples = [
driver.Sample(0, 38598, 24, task, metrics.SampleType.Normal, None, 10, 7, 9, 5000, "docs", 1, 1 / 2),
driver.Sample(0, 38599, 25, task, metrics.SampleType.Normal, None, 10, 7, 9, 5000, "docs", 2, 2 / 2),
]

post_process(samples)

calls = [
self.latency(38598, 24, 10), self.service_time(38598, 24, 7), self.processing_time(38598, 24, 9),
self.latency(38599, 25, 10), self.service_time(38599, 25, 7), self.processing_time(38599, 25, 9),
self.throughput(38598, 24, 5000),
self.throughput(38599, 25, 5000),
]
metrics_store.put_value_cluster_level.assert_has_calls(calls)

@mock.patch("esrally.metrics.MetricsStore")
def test_downsamples(self, metrics_store):
post_process = driver.SamplePostprocessor(metrics_store,
downsample_factor=2,
track_meta_data={},
challenge_meta_data={})

task = track.Task("index",
track.Operation("index", track.OperationType.Bulk, param_source="driver-test-param-source"))
samples = [
driver.Sample(0, 38598, 24, task, metrics.SampleType.Normal, None, 10, 7, 9, 5000, "docs", 1, 1 / 2),
driver.Sample(0, 38599, 25, task, metrics.SampleType.Normal, None, 10, 7, 9, 5000, "docs", 2, 2 / 2),
]

post_process(samples)

calls = [
# only the first out of two request samples is included, throughput metrics are still complete
self.latency(38598, 24, 10), self.service_time(38598, 24, 7), self.processing_time(38598, 24, 9),
self.throughput(38598, 24, 5000),
self.throughput(38599, 25, 5000),
]
metrics_store.put_value_cluster_level.assert_has_calls(calls)


class WorkerAssignmentTests(TestCase):
def test_single_host_assignment_clients_matches_cores(self):
host_configs = [{
Expand Down