Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce memory usage and calculate statistics online/incrementally #103

Merged
merged 5 commits into from
Jul 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,6 @@ language: python
matrix:
include:
- python: "2.7"
virtualenv:
system_site_packages: true
addons:
apt:
packages:
- python-scipy

- python: "3.6"
# PyPy versions
- python: pypy
Expand Down
15 changes: 5 additions & 10 deletions rebench/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@

from . import subprocess_with_timeout as subprocess_timeout
from .interop.adapter import ExecutionDeliveredNoResults
from .statistics import StatisticProperties, mean


class FailedBuilding(Exception):
Expand Down Expand Up @@ -83,14 +82,10 @@ def _indicate_progress(self, completed_task, run):
if not self._ui.spinner_initialized():
return

totals = run.get_total_values()
if completed_task:
self._runs_completed += 1

if totals:
art_mean = mean(run.get_total_values())
else:
art_mean = 0
art_mean = run.get_mean_of_totals()

hour, minute, sec = self._estimate_time_left()

Expand Down Expand Up @@ -419,15 +414,15 @@ def execute_run(self, run_id):
terminate = self._generate_data_point(cmdline, gauge_adapter,
run_id, termination_check)

stats = StatisticProperties(run_id.get_total_values())
mean_of_totals = run_id.get_mean_of_totals()
if terminate:
run_id.report_run_completed(stats, cmdline)
run_id.report_run_completed(cmdline)
if (not run_id.is_failed() and run_id.min_iteration_time
and stats.mean < run_id.min_iteration_time):
and mean_of_totals < run_id.min_iteration_time):
self._ui.warning(
("{ind}Warning: Low mean run time.\n"
+ "{ind}{ind}The mean (%.1f) is lower than min_iteration_time (%d)\n")
% (stats.mean, run_id.min_iteration_time), run_id, cmdline)
% (mean_of_totals, run_id.min_iteration_time), run_id, cmdline)

return terminate

Expand Down
4 changes: 4 additions & 0 deletions rebench/model/data_point.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ def __init__(self, run_id):
self._total = None
self._invocation = -1

@property
def run_id(self):
return self._run_id

@property
def invocation(self):
return self._invocation
Expand Down
60 changes: 34 additions & 26 deletions rebench/model/run_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from .benchmark import Benchmark
from .termination_check import TerminationCheck
from ..statistics import StatisticProperties
from ..ui import UIError


Expand All @@ -34,7 +35,8 @@ def __init__(self, benchmark, cores, input_size, var_value):

self._reporters = set()
self._persistence = set()
self._data_points = []
self._statistics = StatisticProperties()
self._total_unit = None

self._termination_check = None
self._cmdline = None
Expand Down Expand Up @@ -136,9 +138,9 @@ def report_run_failed(self, cmdline, return_code, output):
for reporter in self._reporters:
reporter.run_failed(self, cmdline, return_code, output)

def report_run_completed(self, statistics, cmdline):
def report_run_completed(self, cmdline):
for reporter in self._reporters:
reporter.run_completed(self, statistics, cmdline)
reporter.run_completed(self, self._statistics, cmdline)

def report_job_completed(self, run_ids):
for reporter in self._reporters:
Expand All @@ -152,42 +154,44 @@ def report_start_run(self):
for reporter in self._reporters:
reporter.start_run(self)

def is_persisted_by(self, persistence):
return persistence in self._persistence

def add_persistence(self, persistence):
self._persistence.add(persistence)

def close_files(self):
for persistence in self._persistence:
persistence.close()

def loaded_data_point(self, data_point):
def _new_data_point(self, data_point):
self._max_invocation = max(self._max_invocation, data_point.invocation)
self._data_points.append(data_point)
if self._total_unit is None:
self._total_unit = data_point.get_total_unit()

def loaded_data_point(self, data_point):
self._new_data_point(data_point)
self._statistics.add_sample(data_point.get_total_value())

def add_data_point(self, data_point, warmup):
self._max_invocation = max(self._max_invocation, data_point.invocation)
self._new_data_point(data_point)

if not warmup:
self._data_points.append(data_point)
self._statistics.add_sample(data_point.get_total_value())
for persistence in self._persistence:
persistence.persist_data_point(data_point)

def get_number_of_data_points(self):
return len(self._data_points)
return self._statistics.num_samples

def get_data_points(self):
return self._data_points
def get_mean_of_totals(self):
return self._statistics.mean

def discard_data_points(self):
self._data_points = []
self._max_invocation = 0

def get_total_values(self):
return [dp.get_total_value() for dp in self._data_points]
def get_statistics(self):
return self._statistics

def get_total_unit(self):
if not self._data_points:
return None
return self._data_points[0].get_total_unit()
return self._total_unit

def get_termination_check(self, ui):
if self._termination_check is None:
Expand All @@ -202,7 +206,7 @@ def is_completed(self, ui):
def run_failed(self):
return (self._termination_check.fails_consecutively() or
self._termination_check.has_too_many_failures(
len(self._data_points)))
self.get_number_of_data_points()))

def __hash__(self):
return hash(self.cmdline())
Expand Down Expand Up @@ -236,18 +240,22 @@ def _expand_vars(self, string):
def cmdline(self):
if self._cmdline:
return self._cmdline
return self._construct_cmdline()

def _construct_cmdline(self):
cmdline = ""
if self._benchmark.suite.executor.path:
cmdline = "%s/" % (self._benchmark.suite.executor.path, )
cmdline = self._benchmark.suite.executor.path + "/"

cmdline += self._benchmark.suite.executor.executable

cmdline += "%s %s" % (self._benchmark.suite.executor.executable,
self._benchmark.suite.executor.args or '')
if self._benchmark.suite.executor.args:
cmdline += " " + str(self._benchmark.suite.executor.args)

cmdline += self._benchmark.suite.command
cmdline += " " + self._benchmark.suite.command

if self._benchmark.extra_args is not None:
cmdline += " %s" % self._benchmark.extra_args
if self._benchmark.extra_args:
cmdline += " " + str(self._benchmark.extra_args)

cmdline = self._expand_vars(cmdline)

Expand Down
70 changes: 29 additions & 41 deletions rebench/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
import os
import shutil
import subprocess
import sys
from datetime import datetime
from tempfile import NamedTemporaryFile
from threading import Lock

from .model.data_point import DataPoint
Expand All @@ -36,9 +38,9 @@ def __init__(self, ui):
self._bench_cfgs = {}
self._ui = ui

def load_data(self):
def load_data(self, runs, discard_run_data):
for persistence in list(self._files.values()):
persistence._load_data()
persistence.load_data(runs, discard_run_data)

def get(self, filename, discard_old_data):
if filename not in self._files:
Expand Down Expand Up @@ -81,41 +83,6 @@ def register_config(self, cfg):
self._bench_cfgs[key] = cfg
return cfg

@classmethod
def get_by_file(cls, runs):
by_file = {}
for run in runs:
points = run.get_data_points()
run.discard_data_points()
for point in points:
measurements = point.get_measurements()
for measure in measurements:
if measure.filename in by_file:
by_file[measure.filename].append(measure)
else:
by_file[measure.filename] = [measure]
return by_file

@classmethod
def discard_data_of_runs(cls, runs, ui):
by_file = cls.get_by_file(runs)
for filename, measures in by_file.items():
try:
with open(filename, 'r') as data_file:
lines = data_file.readlines()
except IOError:
ui.debug_error_info(
"Tried to discard old data, but file does not seem to exist: %s\n" % filename)
continue

for measure in measures:
lines[measure.line_number] = None

lines = filter(None, lines)

with open(filename, 'w') as data_file:
data_file.writelines(lines)


class _DataPointPersistence(object):

Expand All @@ -141,18 +108,30 @@ def _truncate_file(filename):
with open(filename, 'w'):
pass

def _load_data(self):
def load_data(self, runs, discard_run_data):
"""
Loads the data from the configured data file
"""
if discard_run_data:
current_runs = {run for run in runs if run.is_persisted_by(self)}
else:
current_runs = None

try:
with open(self._data_filename, 'r') as data_file:
self._process_lines(data_file)
if current_runs:
with NamedTemporaryFile('w', delete=False) as target:
with open(self._data_filename, 'r') as data_file:
self._process_lines(data_file, current_runs, target)
os.unlink(self._data_filename)
shutil.move(target.name, self._data_filename)
else:
with open(self._data_filename, 'r') as data_file:
self._process_lines(data_file, current_runs, None)
except IOError:
self._ui.debug_error_info("No data loaded, since %s does not exist.\n"
% self._data_filename)

def _process_lines(self, data_file):
def _process_lines(self, data_file, runs, filtered_data_file):
"""
The most important assumptions we make here is that the total
measurement is always the last one serialized for a data point.
Expand All @@ -165,6 +144,8 @@ def _process_lines(self, data_file):
for line in data_file:
if line.startswith('#'): # skip comments, and shebang lines
line_number += 1
if filtered_data_file:
filtered_data_file.write(line)
continue

try:
Expand All @@ -173,6 +154,13 @@ def _process_lines(self, data_file):
line_number, self._data_filename)

run_id = measurement.run_id
if filtered_data_file and runs and run_id in runs:
continue

# these are all the measurements that are not filtered out
if filtered_data_file:
filtered_data_file.write(line)

if previous_run_id is not run_id:
data_point = DataPoint(run_id)
previous_run_id = run_id
Expand Down
14 changes: 4 additions & 10 deletions rebench/rebench.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,22 +203,16 @@ def run(self, argv=None):
except ConfigurationError as exc:
raise UIError(exc.message + "\n", exc)

data_store.load_data()
return self.execute_experiment()
runs = self._config.get_runs()
data_store.load_data(runs, self._config.options.do_rerun)
return self.execute_experiment(runs)

def execute_experiment(self):
def execute_experiment(self, runs):
self._ui.verbose_output_info("Execute experiment: " + self._config.experiment_name + "\n")

# first load old data if available
if self._config.options.clean:
pass

scheduler_class = {'batch': BatchScheduler,
'round-robin': RoundRobinScheduler,
'random': RandomScheduler}.get(self._config.options.scheduler)
runs = self._config.get_runs()
if self._config.options.do_rerun:
DataStore.discard_data_of_runs(runs, self._ui)

executor = Executor(runs, self._config.use_nice, self._config.do_builds,
self._ui,
Expand Down
14 changes: 6 additions & 8 deletions rebench/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
from urllib import urlencode # pylint: disable=ungrouped-imports
from urllib2 import urlopen

from .statistics import StatisticProperties


class Reporter(object):

Expand Down Expand Up @@ -80,13 +78,14 @@ def _generate_all_output(run_ids):
rows = []

for run_id in run_ids:
stats = StatisticProperties(run_id.get_total_values())
mean = run_id.get_mean_of_totals()
num_samples = run_id.get_number_of_data_points()
out = run_id.as_str_list()
out.append(stats.num_samples)
if stats.num_samples == 0:
out.append(num_samples)
if num_samples == 0:
out.append("Failed")
else:
out.append(int(round(stats.mean, 0)))
out.append(int(round(mean, 0)))
rows.append(out)

return sorted(rows, key=itemgetter(2, 1, 3, 4, 5, 6, 7))
Expand Down Expand Up @@ -248,8 +247,7 @@ def _send_to_codespeed(self, results, run_id):
+ "{ind}{ind}" + msg + "\n", run_id)

def _prepare_result(self, run_id):
stats = StatisticProperties(run_id.get_total_values())
return self._format_for_codespeed(run_id, stats)
return self._format_for_codespeed(run_id, run_id.get_statistics())

def report_job_completed(self, run_ids):
if self._incremental_report:
Expand Down
Loading