Skip to content

Commit

Permalink
Merge pull request #114 from w3c/jgraham/equal_chunker_2
Browse files Browse the repository at this point in the history
Change implementation of EqualTimeChunker.
  • Loading branch information
jgraham committed Jun 3, 2015
2 parents b5afb22 + 821df06 commit 85935d4
Showing 1 changed file with 228 additions and 99 deletions.
327 changes: 228 additions & 99 deletions wptrunner/testloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import urlparse
from abc import ABCMeta, abstractmethod
from Queue import Empty
from collections import defaultdict, OrderedDict
from collections import defaultdict, OrderedDict, deque
from multiprocessing import Queue

import manifestinclude
Expand All @@ -25,6 +25,7 @@ def __init__(self, total_chunks, chunk_number):
self.total_chunks = total_chunks
self.chunk_number = chunk_number
assert self.chunk_number <= self.total_chunks
self.logger = structured.get_default_logger()

def __call__(self, manifest):
raise NotImplementedError
Expand All @@ -47,92 +48,24 @@ def __call__(self):
if hash(test_path) % self.total_chunks == chunk_index:
yield test_path, tests


class EqualTimeChunker(TestChunker):
"""Chunker that uses the test timeout as a proxy for the running time of the test"""

def _get_chunk(self, manifest_items):
# For each directory containing tests, calculate the maximum execution time after running all
# the tests in that directory. Then work out the index into the manifest corresponding to the
# directories at fractions of m/N of the running time where m=1..N-1 and N is the total number
# of chunks. Return an array of these indicies
def _group_by_directory(self, manifest_items):
"""Split the list of manifest items into a ordered dict that groups tests in
so that anything in the same subdirectory beyond a depth of 3 is in the same
group. So all tests in a/b/c, a/b/c/d and a/b/c/e will be grouped together
and separate to tests in a/b/f
total_time = 0
by_dir = OrderedDict()
Returns: tuple (ordered dict of {test_dir: PathData}, total estimated runtime)
"""

class PathData(object):
def __init__(self, path):
self.path = path
self.time = 0
self.tests = []

class Chunk(object):
def __init__(self):
self.paths = []
self.tests = []
self.time = 0

def append(self, path_data):
self.paths.append(path_data.path)
self.tests.extend(path_data.tests)
self.time += path_data.time

class ChunkList(object):
def __init__(self, total_time, n_chunks):
self.total_time = total_time
self.n_chunks = n_chunks

self.remaining_chunks = n_chunks

self.chunks = []

self.update_time_per_chunk()

def __iter__(self):
for item in self.chunks:
yield item

def __getitem__(self, i):
return self.chunks[i]

def sort_chunks(self):
self.chunks = sorted(self.chunks, key=lambda x:x.paths[0])

def get_tests(self, chunk_number):
return self[chunk_number - 1].tests

def append(self, chunk):
if len(self.chunks) == self.n_chunks:
raise ValueError("Tried to create more than %n chunks" % self.n_chunks)
self.chunks.append(chunk)
self.remaining_chunks -= 1

@property
def current_chunk(self):
if self.chunks:
return self.chunks[-1]

def update_time_per_chunk(self):
self.time_per_chunk = (self.total_time - sum(item.time for item in self)) / self.remaining_chunks

def create(self):
rv = Chunk()
self.append(rv)
return rv

def add_path(self, path_data):
sum_time = self.current_chunk.time + path_data.time
if sum_time > self.time_per_chunk and self.remaining_chunks > 0:
overshoot = sum_time - self.time_per_chunk
undershoot = self.time_per_chunk - self.current_chunk.time
if overshoot < undershoot:
self.create()
self.current_chunk.append(path_data)
else:
self.current_chunk.append(path_data)
self.create()
else:
self.current_chunk.append(path_data)
by_dir = OrderedDict()
total_time = 0

for i, (test_path, tests) in enumerate(manifest_items):
test_dir = tuple(os.path.split(test_path)[0].split(os.path.sep)[:3])
Expand All @@ -144,42 +77,238 @@ def add_path(self, path_data):
time = sum(wpttest.DEFAULT_TIMEOUT if test.timeout !=
"long" else wpttest.LONG_TIMEOUT for test in tests)
data.time += time
total_time += time
data.tests.append((test_path, tests))

total_time += time
return by_dir, total_time

def _maybe_remove(self, chunks, i, direction):
"""Trial removing a chunk from one chunk to an adjacent one.
:param chunks: - the list of all chunks
:param i: - the chunk index in the list of chunks to try removing from
:param direction: either "next" if we are going to move from the end to
the subsequent chunk, or "prev" if we are going to move
from the start into the previous chunk.
:returns bool: Did a chunk get moved?"""
source_chunk = chunks[i]
if direction == "next":
target_chunk = chunks[i+1]
path_index = -1
move_func = lambda: target_chunk.appendleft(source_chunk.pop())
elif direction == "prev":
target_chunk = chunks[i-1]
path_index = 0
move_func = lambda: target_chunk.append(source_chunk.popleft())
else:
raise ValueError("Unexpected move direction %s" % direction)

return self._maybe_move(source_chunk, target_chunk, path_index, move_func)

def _maybe_add(self, chunks, i, direction):
"""Trial adding a chunk from one chunk to an adjacent one.
:param chunks: - the list of all chunks
:param i: - the chunk index in the list of chunks to try adding to
:param direction: either "next" if we are going to remove from the
the subsequent chunk, or "prev" if we are going to remove
from the the previous chunk.
:returns bool: Did a chunk get moved?"""
target_chunk = chunks[i]
if direction == "next":
source_chunk = chunks[i+1]
path_index = 0
move_func = lambda: target_chunk.append(source_chunk.popleft())
elif direction == "prev":
source_chunk = chunks[i-1]
path_index = -1
move_func = lambda: target_chunk.appendleft(source_chunk.pop())
else:
raise ValueError("Unexpected move direction %s" % direction)

return self._maybe_move(source_chunk, target_chunk, path_index, move_func)

def _maybe_move(self, source_chunk, target_chunk, path_index, move_func):
"""Move from one chunk to another, assess the change in badness,
and keep the move iff it decreases the badness score.
chunk_list = ChunkList(total_time, self.total_chunks)
:param source_chunk: chunk to move from
:param target_chunk: chunk to move to
:param path_index: 0 if we are moving from the start or -1 if we are moving from the
end
:param move_func: Function that actually moves between chunks"""
if len(source_chunk.paths) <= 1:
return False

move_time = source_chunk.paths[path_index].time

new_source_badness = self._badness(source_chunk.time - move_time)
new_target_badness = self._badness(target_chunk.time + move_time)

delta_badness = ((new_source_badness + new_target_badness) -
(source_chunk.badness + target_chunk.badness))
if delta_badness < 0:
move_func()
return True

return False

def _badness(self, time):
"""Metric of badness for a specific chunk
:param time: the time for a specific chunk"""
return (time - self.expected_time)**2

def _get_chunk(self, manifest_items):
by_dir, total_time = self._group_by_directory(manifest_items)

if len(by_dir) < self.total_chunks:
raise ValueError("Tried to split into %i chunks, but only %i subdirectories included" % (
self.total_chunks, len(by_dir)))

# Put any individual dirs with a time greater than the time per chunk into their own
# chunk
self.expected_time = float(total_time) / self.total_chunks

chunks = self._create_initial_chunks(by_dir)

while True:
to_remove = []
for path_data in by_dir.itervalues():
if path_data.time > chunk_list.time_per_chunk:
to_remove.append(path_data)
if to_remove:
for path_data in to_remove:
chunk = chunk_list.create()
chunk.append(path_data)
del by_dir[path_data.path]
chunk_list.update_time_per_chunk()
# Move a test from one chunk to the next until doing so no longer
# reduces the badness
got_improvement = self._update_chunks(chunks)
if not got_improvement:
break

self.logger.debug(self.expected_time)
for i, chunk in chunks.iteritems():
self.logger.debug("%i: %i, %i" % (i + 1, chunk.time, chunk.badness))

assert self._all_tests(by_dir) == self._chunked_tests(chunks)

return self._get_tests(chunks)

@staticmethod
def _all_tests(by_dir):
"""Return a set of all tests in the manifest from a grouping by directory"""
return set(x[0] for item in by_dir.itervalues()
for x in item.tests)

@staticmethod
def _chunked_tests(chunks):
"""Return a set of all tests in the manifest from the chunk list"""
return set(x[0] for chunk in chunks.itervalues()
for path in chunk.paths
for x in path.tests)


def _create_initial_chunks(self, by_dir):
"""Create an initial unbalanced list of chunks.
:param by_dir: All tests in the manifest grouped by subdirectory
:returns list: A list of Chunk objects"""

class Chunk(object):
def __init__(self, paths, index):
"""List of PathData objects that together form a single chunk of
tests"""
self.paths = deque(paths)
self.time = sum(item.time for item in paths)
self.index = index

def appendleft(self, path):
"""Add a PathData object to the start of the chunk"""
self.paths.appendleft(path)
self.time += path.time

def append(self, path):
"""Add a PathData object to the end of the chunk"""
self.paths.append(path)
self.time += path.time

def pop(self):
"""Remove PathData object from the end of the chunk"""
assert len(self.paths) > 1
self.time -= self.paths[-1].time
return self.paths.pop()

def popleft(self):
"""Remove PathData object from the start of the chunk"""
assert len(self.paths) > 1
self.time -= self.paths[0].time
return self.paths.popleft()

@property
def badness(self_):
"""Badness metric for this chunk"""
return self._badness(self_.time)

initial_size = len(by_dir) / self.total_chunks
chunk_boundaries = [initial_size * i
for i in xrange(self.total_chunks)] + [len(by_dir)]

chunks = OrderedDict()
for i, lower in enumerate(chunk_boundaries[:-1]):
upper = chunk_boundaries[i + 1]
paths = by_dir.values()[lower:upper]
chunks[i] = Chunk(paths, i)

assert self._all_tests(by_dir) == self._chunked_tests(chunks)

return chunks

def _update_chunks(self, chunks):
"""Run a single iteration of the chunk update algorithm.
:param chunks: - List of chunks
"""
#TODO: consider replacing this with a heap
sorted_chunks = sorted(chunks.values(), key=lambda x:-x.badness)
got_improvement = False
for chunk in sorted_chunks:
if chunk.time < self.expected_time:
f = self._maybe_add
else:
f = self._maybe_remove

if chunk.index == 0:
order = ["next"]
elif chunk.index == self.total_chunks - 1:
order = ["prev"]
else:
if chunk.time < self.expected_time:
# First try to add a test from the neighboring chunk with the
# greatest total time
if chunks[chunk.index + 1].time > chunks[chunk.index - 1].time:
order = ["next", "prev"]
else:
order = ["prev", "next"]
else:
# First try to remove a test and add to the neighboring chunk with the
# lowest total time
if chunks[chunk.index + 1].time > chunks[chunk.index - 1].time:
order = ["prev", "next"]
else:
order = ["next", "prev"]

for direction in order:
if f(chunks, chunk.index, direction):
got_improvement = True
break

if got_improvement:
break

chunk = chunk_list.create()
for path_data in by_dir.itervalues():
chunk_list.add_path(path_data)
return got_improvement

assert len(chunk_list.chunks) == self.total_chunks, len(chunk_list.chunks)
assert sum(item.time for item in chunk_list) == chunk_list.total_time
def _get_tests(self, chunks):
"""Return the list of tests corresponding to the chunk number we are running.
chunk_list.sort_chunks()
:param chunks: List of chunks"""
tests = []
for path in chunks[self.chunk_number - 1].paths:
tests.extend(path.tests)

return chunk_list.get_tests(self.chunk_number)
return tests

def __call__(self, manifest_iter):
manifest = list(manifest_iter)
Expand Down

0 comments on commit 85935d4

Please sign in to comment.