Skip to content

Commit

Permalink
Make test runner more chatty to avoid it getting killed by buildbot.
Browse files Browse the repository at this point in the history
NOTRY=true
NOTREECHECKS=true

Review URL: https://codereview.chromium.org/1064043002

Cr-Commit-Position: refs/heads/master@{#27653}
  • Loading branch information
mi-ac authored and Commit bot committed Apr 8, 2015
1 parent 0716bc3 commit a54f22d
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 12 deletions.
9 changes: 6 additions & 3 deletions tools/testrunner/local/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,14 @@ def _RunInternal(self, jobs):
try:
it = pool.imap_unordered(RunTest, queue)
for result in it:
test = test_map[result[0]]
if result.heartbeat:
self.indicator.Heartbeat()
continue
test = test_map[result.value[0]]
if self.context.predictable:
update_perf = self._ProcessTestPredictable(test, result, pool)
update_perf = self._ProcessTestPredictable(test, result.value, pool)
else:
update_perf = self._ProcessTestNormal(test, result, pool)
update_perf = self._ProcessTestNormal(test, result.value, pool)
if update_perf:
self._RunPerfSafe(lambda: self.perfdata.UpdatePerfData(test))
finally:
Expand Down
34 changes: 30 additions & 4 deletions tools/testrunner/local/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

from Queue import Empty
from multiprocessing import Event, Process, Queue

class NormalResult():
Expand All @@ -24,6 +25,20 @@ def __init__(self):
self.break_now = True


class MaybeResult():
def __init__(self, heartbeat, value):
self.heartbeat = heartbeat
self.value = value

@staticmethod
def create_heartbeat():
return MaybeResult(True, None)

@staticmethod
def create_result(value):
return MaybeResult(False, value)


def Worker(fn, work_queue, done_queue, done):
"""Worker to be run in a child process.
The worker stops on two conditions. 1. When the poison pill "STOP" is
Expand Down Expand Up @@ -51,7 +66,7 @@ class Pool():
# Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
BUFFER_FACTOR = 4

def __init__(self, num_workers):
def __init__(self, num_workers, heartbeat_timeout=30):
self.num_workers = num_workers
self.processes = []
self.terminated = False
Expand All @@ -67,11 +82,15 @@ def __init__(self, num_workers):
self.work_queue = Queue()
self.done_queue = Queue()
self.done = Event()
self.heartbeat_timeout = heartbeat_timeout

def imap_unordered(self, fn, gen):
"""Maps function "fn" to items in generator "gen" on the worker processes
in an arbitrary order. The items are expected to be lists of arguments to
the function. Returns a results iterator."""
the function. Returns a results iterator. A result value of type
MaybeResult either indicates a heartbeat of the runner, i.e. indicating
that the runner is still waiting for the result to be computed, or it wraps
the real result."""
try:
gen = iter(gen)
self.advance = self._advance_more
Expand All @@ -86,7 +105,14 @@ def imap_unordered(self, fn, gen):

self.advance(gen)
while self.count > 0:
result = self.done_queue.get()
while True:
try:
result = self.done_queue.get(timeout=self.heartbeat_timeout)
break
except Empty:
# Indicate a heartbeat. The iterator will continue fetching the
# next result.
yield MaybeResult.create_heartbeat()
self.count -= 1
if result.exception:
# Ignore items with unexpected exceptions.
Expand All @@ -95,7 +121,7 @@ def imap_unordered(self, fn, gen):
# A keyboard interrupt happened in one of the worker processes.
raise KeyboardInterrupt
else:
yield result.result
yield MaybeResult.create_result(result.result)
self.advance(gen)
finally:
self.terminate()
Expand Down
10 changes: 5 additions & 5 deletions tools/testrunner/local/pool_unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ def testNormal(self):
results = set()
pool = Pool(3)
for result in pool.imap_unordered(Run, [[x] for x in range(0, 10)]):
results.add(result)
results.add(result.value)
self.assertEquals(set(range(0, 10)), results)

def testException(self):
results = set()
pool = Pool(3)
for result in pool.imap_unordered(Run, [[x] for x in range(0, 12)]):
# Item 10 will not appear in results due to an internal exception.
results.add(result)
results.add(result.value)
expect = set(range(0, 12))
expect.remove(10)
self.assertEquals(expect, results)
Expand All @@ -34,8 +34,8 @@ def testAdd(self):
results = set()
pool = Pool(3)
for result in pool.imap_unordered(Run, [[x] for x in range(0, 10)]):
results.add(result)
if result < 30:
pool.add([result + 20])
results.add(result.value)
if result.value < 30:
pool.add([result.value + 20])
self.assertEquals(set(range(0, 10) + range(20, 30) + range(40, 50)),
results)
7 changes: 7 additions & 0 deletions tools/testrunner/local/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ def AboutToRun(self, test):
def HasRun(self, test, has_unexpected_output):
pass

def Heartbeat(self):
pass

def PrintFailureHeader(self, test):
if test.suite.IsNegativeTest(test):
negative_marker = '[negative] '
Expand Down Expand Up @@ -128,6 +131,10 @@ def HasRun(self, test, has_unexpected_output):
outcome = 'pass'
print 'Done running %s: %s' % (test.GetLabel(), outcome)

def Heartbeat(self):
print 'Still working...'
sys.stdout.flush()


class DotsProgressIndicator(SimpleProgressIndicator):

Expand Down

0 comments on commit a54f22d

Please sign in to comment.