diff --git a/coverage/cmdline.py b/coverage/cmdline.py index c3d011d94..9360f29dc 100644 --- a/coverage/cmdline.py +++ b/coverage/cmdline.py @@ -37,6 +37,7 @@ class Opts(object): ) CONCURRENCY_CHOICES = [ "thread", "gevent", "greenlet", "eventlet", "multiprocessing", + "twisted", ] concurrency = optparse.make_option( '', '--concurrency', action='store', metavar="LIB", @@ -635,16 +636,16 @@ def do_run(self, options, args): show_help("Can't append to data files in parallel mode.") return ERR - if options.concurrency == "multiprocessing": + if options.concurrency in ("multiprocessing", "twisted"): # Can't set other run-affecting command line options with - # multiprocessing. + # multiprocessing or Twisted. for opt_name in ['branch', 'include', 'omit', 'pylib', 'source', 'timid']: # As it happens, all of these options have no default, meaning # they will be None if they have not been specified. if getattr(options, opt_name) is not None: show_help( - "Options affecting multiprocessing must only be specified " - "in a configuration file.\n" + "Options affecting multiprocessing and twisted must " + "only be specified in a configuration file.\n" "Remove --{} from the command line.".format(opt_name) ) return ERR diff --git a/coverage/control.py b/coverage/control.py index c398f2a7f..03883bb21 100644 --- a/coverage/control.py +++ b/coverage/control.py @@ -37,6 +37,11 @@ # Jython has no multiprocessing module. patch_multiprocessing = None +try: + from coverage.twistedproc import patch_twisted +except ImportError: + patch_twisted = None + os = isolate_module(os) @@ -110,8 +115,8 @@ def __init__( `concurrency` is a string indicating the concurrency library being used in the measured code. Without this, coverage.py will get incorrect results if these libraries are in use. Valid strings are "greenlet", - "eventlet", "gevent", "multiprocessing", or "thread" (the default). - This can also be a list of these strings. + "eventlet", "gevent", "multiprocessing", "twisted", or "thread" (the + default). This can also be a list of these strings. If `check_preimported` is true, then when coverage is started, the aleady-imported files will be checked to see if they should be measured @@ -335,10 +340,9 @@ def load(self): if not should_skip: self._data.read() - def _init_for_start(self): - """Initialization for start()""" - # Construct the collector. - concurrency = self.config.concurrency or [] + def _init_multiprocessing(self, concurrency): + """Enable collection in multiprocessing workers, if requested. + """ if "multiprocessing" in concurrency: if not patch_multiprocessing: raise CoverageException( # pragma: only jython @@ -349,6 +353,28 @@ def _init_for_start(self): # it for the main process. self.config.parallel = True + def _init_twisted(self, concurrency): + """Enable collection in Twisted-spawned children, if requested. + """ + if "twisted" in concurrency: + if patch_twisted is None: + # XXX Untested + raise CoverageException( + "twisted is not supported on this Python" + ) + patch_twisted( + rcfile=self.config.config_file, + ) + # XXX Untested + self.config.parallel = True + + def _init_for_start(self): + """Initialization for start()""" + # Construct the collector. + concurrency = self.config.concurrency or [] + self._init_multiprocessing(concurrency) + self._init_twisted(concurrency) + dycon = self.config.dynamic_context if not dycon or dycon == "none": context_switchers = [] diff --git a/coverage/twistedproc.py b/coverage/twistedproc.py new file mode 100644 index 000000000..d59122321 --- /dev/null +++ b/coverage/twistedproc.py @@ -0,0 +1,85 @@ +# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0 +# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt + +"""Monkey-patching to add Twisted support for coverage.py +""" + +from functools import partial +from tempfile import mkdtemp +from os.path import join + +# XXX do it without installing the default reactor +import twisted.internet.reactor + +from coverage.misc import contract + +# An attribute that will be set on the module to indicate that it has been +# monkey-patched. Value copied from multiproc.py. +PATCHED_MARKER = "_coverage$patched" + +@contract(rcfile=str) +def patch_twisted(rcfile): + """ + The twisted.internet.interfaces.IReactorProcess.spawnProcess + implementation of the Twisted reactor is patched to enable coverage + collection in spawned processes. + + This works by clobbering sitecustomize. + """ + if getattr(twisted.internet, PATCHED_MARKER, False): + return + + origSpawnProcess = twisted.internet.reactor.spawnProcess + twisted.internet.reactor.spawnProcess = partial( + _coverageSpawnProcess, + origSpawnProcess, + rcfile, + ) + setattr(twisted.internet, PATCHED_MARKER, True) + + +def _coverageSpawnProcess( + origSpawnProcess, + rcfile, + processProtocol, + executable, + args=(), + env=None, + *a, + **kw +): + """ + Spawn a process using ``origSpawnProcess``. Set up its environment so + that coverage its collected, if it is a Python process. + """ + if env is None: + env = os.environ.copy() + pythonpath = env.get(u"PYTHONPATH", u"").split(u":") + dtemp = mkdtemp() + pythonpath.insert(0, dtemp) + sitecustomize = join(dtemp, u"sitecustomize.py") + with open(sitecustomize, "wt") as f: + f.write("""\ +import sys, os.path +sys.path.remove({dtemp!r}) +os.remove({sitecustomize!r}) +if os.path.exists({sitecustomizec!r}): + os.remove({sitecustomizec!r}) +os.rmdir({dtemp!r}) +import coverage +coverage.process_startup() +""".format( + sitecustomize=sitecustomize, + sitecustomizec=sitecustomize + u"c", + dtemp=dtemp, +)) + env[u"PYTHONPATH"] = u":".join(pythonpath) + env[u"COVERAGE_PROCESS_START"] = rcfile + return origSpawnProcess( + processProtocol, + executable, + args, + env, + *a, + **kw + ) diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py index bcf619422..cf4ae8a19 100644 --- a/tests/test_concurrency.py +++ b/tests/test_concurrency.py @@ -6,6 +6,7 @@ import os import random import sys +import textwrap import threading import time @@ -42,6 +43,10 @@ except ImportError: # pragma: only jython greenlet = None +try: + import twisted.internet.reactor +except ImportError: + twisted = None def measurable_line(l): """Is this a line of code coverage will measure? @@ -462,6 +467,80 @@ def test_multiprocessing_with_branching(self): self.try_multiprocessing_code_with_branching(code, expected_out) +TWISTED_CODE = """ + # Above this will be a definition of work(). + from sys import executable, argv, stdout + from twisted.internet.task import react + from twisted.internet.utils import getProcessOutput + from twisted.internet.defer import inlineCallbacks + + @inlineCallbacks + def master(reactor, *xs): + for x in xs: + y = yield getProcessOutput(executable, [__file__, "worker", x]) + stdout.write(y) + + def worker(x): + print("work(%s) = %s" % (x, work(int(x)))) + + if __name__ == '__main__': + if argv[1] == "master": + react(master, argv[2:]) + else: + worker(*argv[2:]) +""" + +class TwistedTest(CoverageTest): + """Test support of Twisted's multi-process feature.""" + + keep_temp_dir = True + + def setUp(self): + if twisted is None: + self.skipTest("No Twisted in this Python") + super(TwistedTest, self).setUp() + + def test_twisted(self): + """ + With the Twisted concurrency model enabled, coverage is collected from + child Python processes that are started with + IReactorProcess.spawnProcess. + """ + code = SQUARE_OR_CUBE_WORK + TWISTED_CODE + source_file = "spawnprocess.py" + self.make_file(source_file, textwrap.dedent(code)) + self.make_file(".coveragerc", """\ + [run] + concurrency = twisted + source = . + """) + + x = 41 + y = x + 1 + command = ( + "coverage run --concurrency=twisted " + "spawnprocess.py master %s %s" % (x, y) + ) + out = self.run_command(command) + expected_out = ( + "work(%s) = %s\n" + "work(%s) = %s\n" + ) % ( + x, x ** 2, + y, y ** 3, + ) + + # Make sure it actually ran the code we were trying to make it run. + self.assertEqual(out, expected_out) + + # Make sure the coverage collected on that run is what we expect, too. + out = self.run_command("coverage combine") + self.assertEqual(out, "") + out = self.run_command("coverage report -m") + last_line = self.squeezed_lines(out)[-1] + self.assertRegex(last_line, r"%s \d+ 0 100%%" % (source_file,)) + + def test_coverage_stop_in_threads(): has_started_coverage = [] has_stopped_coverage = [] diff --git a/tox.ini b/tox.ini index 6f539a105..32a920ff6 100644 --- a/tox.ini +++ b/tox.ini @@ -15,6 +15,7 @@ deps = -r requirements/pytest.pip pip==19.1.1 setuptools==41.0.1 + twisted==19.2.1 # gevent 1.3 causes a failure: https://github.com/nedbat/coveragepy/issues/663 py{27,35,36}: gevent==1.2.2 py{27,35,36,37,38}: eventlet==0.24.1