diff --git a/wptrunner/testloader.py b/wptrunner/testloader.py index 9270bec2cc11d9..01ed0a81bb2d28 100644 --- a/wptrunner/testloader.py +++ b/wptrunner/testloader.py @@ -3,7 +3,7 @@ import urlparse from abc import ABCMeta, abstractmethod from Queue import Empty -from collections import defaultdict, OrderedDict +from collections import defaultdict, OrderedDict, deque from multiprocessing import Queue import manifestinclude @@ -25,6 +25,7 @@ def __init__(self, total_chunks, chunk_number): self.total_chunks = total_chunks self.chunk_number = chunk_number assert self.chunk_number <= self.total_chunks + self.logger = structured.get_default_logger() def __call__(self, manifest): raise NotImplementedError @@ -47,18 +48,15 @@ def __call__(self): if hash(test_path) % self.total_chunks == chunk_index: yield test_path, tests - class EqualTimeChunker(TestChunker): - """Chunker that uses the test timeout as a proxy for the running time of the test""" - - def _get_chunk(self, manifest_items): - # For each directory containing tests, calculate the maximum execution time after running all - # the tests in that directory. Then work out the index into the manifest corresponding to the - # directories at fractions of m/N of the running time where m=1..N-1 and N is the total number - # of chunks. Return an array of these indicies + def _group_by_directory(self, manifest_items): + """Split the list of manifest items into a ordered dict that groups tests in + so that anything in the same subdirectory beyond a depth of 3 is in the same + group. So all tests in a/b/c, a/b/c/d and a/b/c/e will be grouped together + and separate to tests in a/b/f - total_time = 0 - by_dir = OrderedDict() + Returns: tuple (ordered dict of {test_dir: PathData}, total estimated runtime) + """ class PathData(object): def __init__(self, path): @@ -66,73 +64,8 @@ def __init__(self, path): self.time = 0 self.tests = [] - class Chunk(object): - def __init__(self): - self.paths = [] - self.tests = [] - self.time = 0 - - def append(self, path_data): - self.paths.append(path_data.path) - self.tests.extend(path_data.tests) - self.time += path_data.time - - class ChunkList(object): - def __init__(self, total_time, n_chunks): - self.total_time = total_time - self.n_chunks = n_chunks - - self.remaining_chunks = n_chunks - - self.chunks = [] - - self.update_time_per_chunk() - - def __iter__(self): - for item in self.chunks: - yield item - - def __getitem__(self, i): - return self.chunks[i] - - def sort_chunks(self): - self.chunks = sorted(self.chunks, key=lambda x:x.paths[0]) - - def get_tests(self, chunk_number): - return self[chunk_number - 1].tests - - def append(self, chunk): - if len(self.chunks) == self.n_chunks: - raise ValueError("Tried to create more than %n chunks" % self.n_chunks) - self.chunks.append(chunk) - self.remaining_chunks -= 1 - - @property - def current_chunk(self): - if self.chunks: - return self.chunks[-1] - - def update_time_per_chunk(self): - self.time_per_chunk = (self.total_time - sum(item.time for item in self)) / self.remaining_chunks - - def create(self): - rv = Chunk() - self.append(rv) - return rv - - def add_path(self, path_data): - sum_time = self.current_chunk.time + path_data.time - if sum_time > self.time_per_chunk and self.remaining_chunks > 0: - overshoot = sum_time - self.time_per_chunk - undershoot = self.time_per_chunk - self.current_chunk.time - if overshoot < undershoot: - self.create() - self.current_chunk.append(path_data) - else: - self.current_chunk.append(path_data) - self.create() - else: - self.current_chunk.append(path_data) + by_dir = OrderedDict() + total_time = 0 for i, (test_path, tests) in enumerate(manifest_items): test_dir = tuple(os.path.split(test_path)[0].split(os.path.sep)[:3]) @@ -144,42 +77,238 @@ def add_path(self, path_data): time = sum(wpttest.DEFAULT_TIMEOUT if test.timeout != "long" else wpttest.LONG_TIMEOUT for test in tests) data.time += time + total_time += time data.tests.append((test_path, tests)) - total_time += time + return by_dir, total_time + + def _maybe_remove(self, chunks, i, direction): + """Trial removing a chunk from one chunk to an adjacent one. + + :param chunks: - the list of all chunks + :param i: - the chunk index in the list of chunks to try removing from + :param direction: either "next" if we are going to move from the end to + the subsequent chunk, or "prev" if we are going to move + from the start into the previous chunk. + + :returns bool: Did a chunk get moved?""" + source_chunk = chunks[i] + if direction == "next": + target_chunk = chunks[i+1] + path_index = -1 + move_func = lambda: target_chunk.appendleft(source_chunk.pop()) + elif direction == "prev": + target_chunk = chunks[i-1] + path_index = 0 + move_func = lambda: target_chunk.append(source_chunk.popleft()) + else: + raise ValueError("Unexpected move direction %s" % direction) + + return self._maybe_move(source_chunk, target_chunk, path_index, move_func) + + def _maybe_add(self, chunks, i, direction): + """Trial adding a chunk from one chunk to an adjacent one. + + :param chunks: - the list of all chunks + :param i: - the chunk index in the list of chunks to try adding to + :param direction: either "next" if we are going to remove from the + the subsequent chunk, or "prev" if we are going to remove + from the the previous chunk. + + :returns bool: Did a chunk get moved?""" + target_chunk = chunks[i] + if direction == "next": + source_chunk = chunks[i+1] + path_index = 0 + move_func = lambda: target_chunk.append(source_chunk.popleft()) + elif direction == "prev": + source_chunk = chunks[i-1] + path_index = -1 + move_func = lambda: target_chunk.appendleft(source_chunk.pop()) + else: + raise ValueError("Unexpected move direction %s" % direction) + + return self._maybe_move(source_chunk, target_chunk, path_index, move_func) + + def _maybe_move(self, source_chunk, target_chunk, path_index, move_func): + """Move from one chunk to another, assess the change in badness, + and keep the move iff it decreases the badness score. - chunk_list = ChunkList(total_time, self.total_chunks) + :param source_chunk: chunk to move from + :param target_chunk: chunk to move to + :param path_index: 0 if we are moving from the start or -1 if we are moving from the + end + :param move_func: Function that actually moves between chunks""" + if len(source_chunk.paths) <= 1: + return False + + move_time = source_chunk.paths[path_index].time + + new_source_badness = self._badness(source_chunk.time - move_time) + new_target_badness = self._badness(target_chunk.time + move_time) + + delta_badness = ((new_source_badness + new_target_badness) - + (source_chunk.badness + target_chunk.badness)) + if delta_badness < 0: + move_func() + return True + + return False + + def _badness(self, time): + """Metric of badness for a specific chunk + + :param time: the time for a specific chunk""" + return (time - self.expected_time)**2 + + def _get_chunk(self, manifest_items): + by_dir, total_time = self._group_by_directory(manifest_items) if len(by_dir) < self.total_chunks: raise ValueError("Tried to split into %i chunks, but only %i subdirectories included" % ( self.total_chunks, len(by_dir))) - # Put any individual dirs with a time greater than the time per chunk into their own - # chunk + self.expected_time = float(total_time) / self.total_chunks + + chunks = self._create_initial_chunks(by_dir) + while True: - to_remove = [] - for path_data in by_dir.itervalues(): - if path_data.time > chunk_list.time_per_chunk: - to_remove.append(path_data) - if to_remove: - for path_data in to_remove: - chunk = chunk_list.create() - chunk.append(path_data) - del by_dir[path_data.path] - chunk_list.update_time_per_chunk() + # Move a test from one chunk to the next until doing so no longer + # reduces the badness + got_improvement = self._update_chunks(chunks) + if not got_improvement: + break + + self.logger.debug(self.expected_time) + for i, chunk in chunks.iteritems(): + self.logger.debug("%i: %i, %i" % (i + 1, chunk.time, chunk.badness)) + + assert self._all_tests(by_dir) == self._chunked_tests(chunks) + + return self._get_tests(chunks) + + @staticmethod + def _all_tests(by_dir): + """Return a set of all tests in the manifest from a grouping by directory""" + return set(x[0] for item in by_dir.itervalues() + for x in item.tests) + + @staticmethod + def _chunked_tests(chunks): + """Return a set of all tests in the manifest from the chunk list""" + return set(x[0] for chunk in chunks.itervalues() + for path in chunk.paths + for x in path.tests) + + + def _create_initial_chunks(self, by_dir): + """Create an initial unbalanced list of chunks. + + :param by_dir: All tests in the manifest grouped by subdirectory + :returns list: A list of Chunk objects""" + + class Chunk(object): + def __init__(self, paths, index): + """List of PathData objects that together form a single chunk of + tests""" + self.paths = deque(paths) + self.time = sum(item.time for item in paths) + self.index = index + + def appendleft(self, path): + """Add a PathData object to the start of the chunk""" + self.paths.appendleft(path) + self.time += path.time + + def append(self, path): + """Add a PathData object to the end of the chunk""" + self.paths.append(path) + self.time += path.time + + def pop(self): + """Remove PathData object from the end of the chunk""" + assert len(self.paths) > 1 + self.time -= self.paths[-1].time + return self.paths.pop() + + def popleft(self): + """Remove PathData object from the start of the chunk""" + assert len(self.paths) > 1 + self.time -= self.paths[0].time + return self.paths.popleft() + + @property + def badness(self_): + """Badness metric for this chunk""" + return self._badness(self_.time) + + initial_size = len(by_dir) / self.total_chunks + chunk_boundaries = [initial_size * i + for i in xrange(self.total_chunks)] + [len(by_dir)] + + chunks = OrderedDict() + for i, lower in enumerate(chunk_boundaries[:-1]): + upper = chunk_boundaries[i + 1] + paths = by_dir.values()[lower:upper] + chunks[i] = Chunk(paths, i) + + assert self._all_tests(by_dir) == self._chunked_tests(chunks) + + return chunks + + def _update_chunks(self, chunks): + """Run a single iteration of the chunk update algorithm. + + :param chunks: - List of chunks + """ + #TODO: consider replacing this with a heap + sorted_chunks = sorted(chunks.values(), key=lambda x:-x.badness) + got_improvement = False + for chunk in sorted_chunks: + if chunk.time < self.expected_time: + f = self._maybe_add else: + f = self._maybe_remove + + if chunk.index == 0: + order = ["next"] + elif chunk.index == self.total_chunks - 1: + order = ["prev"] + else: + if chunk.time < self.expected_time: + # First try to add a test from the neighboring chunk with the + # greatest total time + if chunks[chunk.index + 1].time > chunks[chunk.index - 1].time: + order = ["next", "prev"] + else: + order = ["prev", "next"] + else: + # First try to remove a test and add to the neighboring chunk with the + # lowest total time + if chunks[chunk.index + 1].time > chunks[chunk.index - 1].time: + order = ["prev", "next"] + else: + order = ["next", "prev"] + + for direction in order: + if f(chunks, chunk.index, direction): + got_improvement = True + break + + if got_improvement: break - chunk = chunk_list.create() - for path_data in by_dir.itervalues(): - chunk_list.add_path(path_data) + return got_improvement - assert len(chunk_list.chunks) == self.total_chunks, len(chunk_list.chunks) - assert sum(item.time for item in chunk_list) == chunk_list.total_time + def _get_tests(self, chunks): + """Return the list of tests corresponding to the chunk number we are running. - chunk_list.sort_chunks() + :param chunks: List of chunks""" + tests = [] + for path in chunks[self.chunk_number - 1].paths: + tests.extend(path.tests) - return chunk_list.get_tests(self.chunk_number) + return tests def __call__(self, manifest_iter): manifest = list(manifest_iter)