Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for tracing asynchronous code #87

Merged
merged 9 commits into from
Nov 18, 2019
28 changes: 23 additions & 5 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,27 @@
version: 2.1

commands:
runtests:
setup:
steps:
- checkout
- run:
command: |
sudo pip install pyflakes==1.5.0 pylint setuptools-lint coverage
sudo pip install --upgrade setuptools
runtests-py2:
steps:
- run:
command: |
python setup.py lint --lint-ignore aiotrace.py,test_async.py --lint-rcfile pylint.rc
coverage run ./setup.py test
coverage report --include="beeline/*"
coverage html --include="beeline/*"
- store_artifacts:
path: htmlcov
runtests-py3:
steps:
- run:
command: |
pyflakes beeline
python setup.py lint --lint-rcfile pylint.rc
coverage run ./setup.py test
Expand Down Expand Up @@ -58,19 +72,23 @@ jobs:
test_python2-7:
executor: python2-7
steps:
- runtests
- setup
- runtests-py2
test_python3-5:
executor: python3-5
steps:
- runtests
- setup
- runtests-py3
test_python3-6:
executor: python3-6
steps:
- runtests
- setup
- runtests-py3
test_python3-7:
executor: python3-7
steps:
- runtests
- setup
- runtests-py3
publish:
executor: python3-7
steps:
Expand Down
78 changes: 43 additions & 35 deletions beeline/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
''' module beeline '''
import copy
import functools
import logging
import os
Expand All @@ -20,6 +19,37 @@
# This is the PID that initialized the beeline.
_INITPID = None

try:
import asyncio
# The async functionality uses the contextvars module, added in
# Python 3.7
import contextvars
assert contextvars

from beeline.aiotrace import AsyncioTracer, traced_impl, untraced
assert untraced

def in_async_code():
"""Return wether we are running inside an asynchronous task.

We use this information to determine which tracer
implementation to use.

"""
try:
asyncio.get_running_loop() # pylint: disable=no-member
return True
except RuntimeError:
return False

except ImportError:
# Use these non-async versions if we don't have asyncio or
# contextvars.
from beeline.trace import traced_impl

def in_async_code():
return False

class Beeline(object):
def __init__(self,
writekey='', dataset='', service_name='',
Expand Down Expand Up @@ -69,7 +99,10 @@ def __init__(self,
self.client.add_field('meta.beeline_version', VERSION)
self.client.add_field('meta.local_hostname', socket.gethostname())

self.tracer_impl = SynchronousTracer(self.client)
if in_async_code():
self.tracer_impl = AsyncioTracer(self.client)
else:
self.tracer_impl = SynchronousTracer(self.client)
self.tracer_impl.register_hooks(presend=presend_hook, sampler=sampler_hook)
self.sampler_hook = sampler_hook
self.presend_hook = presend_hook
Expand Down Expand Up @@ -171,26 +204,14 @@ def send_all(self):
span = self.tracer_impl.get_active_span()

def traced(self, name, trace_id=None, parent_id=None):
def wrapped(fn, *args, **kwargs):
@functools.wraps(fn)
def inner(*args, **kwargs):
with self.tracer(name=name, trace_id=trace_id, parent_id=parent_id):
return fn(*args, **kwargs)
return inner

return wrapped
return traced_impl(tracer_fn=self.tracer, name=name, trace_id=trace_id, parent_id=parent_id)

def traced_thread(self, fn):
trace_id = self.tracer_impl._state.trace_id
# copy as a new list - reference will be unavailable when we enter the new thread
stack = copy.copy(self.tracer_impl._state.stack)
trace_fields = copy.copy(self.tracer_impl._state.trace_fields)
trace_copy = self.tracer_impl._trace.copy()

@functools.wraps(fn)
def wrapped(*args, **kwargs):
self.tracer_impl._state.trace_id = trace_id
self.tracer_impl._state.stack = stack
self.tracer_impl._state.trace_fields = trace_fields
self.tracer_impl._trace = trace_copy
return fn(*args, **kwargs)

return wrapped
Expand Down Expand Up @@ -603,15 +624,7 @@ def my_func(n):
with this id.
'''

def wrapped(fn, *args, **kwargs):
@functools.wraps(fn)
def inner(*args, **kwargs):
with tracer(name=name, trace_id=trace_id, parent_id=parent_id):
return fn(*args, **kwargs)

return inner

return wrapped
return traced_impl(tracer_fn=tracer, name=name, trace_id=trace_id, parent_id=parent_id)

def traced_thread(fn):
'''
Expand Down Expand Up @@ -641,22 +654,17 @@ def _my_func_t():

# if beeline is not initialized, or there is no active trace, do nothing
bl = get_beeline()
if bl is None or bl.tracer_impl._state.trace_id is None:
if bl is None or bl.tracer_impl.get_active_trace_id() is None:
@functools.wraps(fn)
def noop(*args, **kwargs):
return fn(*args, **kwargs)
return noop

trace_id = bl.tracer_impl._state.trace_id
# copy as a new list - reference will be unavailable when we enter the new thread
stack = copy.copy(bl.tracer_impl._state.stack)
trace_fields = copy.copy(bl.tracer_impl._state.trace_fields)
trace_copy = bl.tracer_impl._trace.copy()

@functools.wraps(fn)
def wrapped(*args, **kwargs):
bl.tracer_impl._state.trace_id = trace_id
bl.tracer_impl._state.stack = stack
bl.tracer_impl._state.trace_fields = trace_fields
bl.tracer_impl._trace = trace_copy
return fn(*args, **kwargs)

return wrapped
return wrapped
132 changes: 132 additions & 0 deletions beeline/aiotrace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""Asynchronous tracer implementation.

This requires Python 3.7, because it uses the contextvars module.

"""
import asyncio
import contextvars # pylint: disable=import-error
import functools

from beeline.trace import Tracer

current_trace_var = contextvars.ContextVar("current_trace")


def create_task_factory(parent_factory):
"""Create a task factory that makes a copy of the current trace.

New tasks have their own context variables, but the current_trace
context variable still refers to the same Trace object as the one
in the parent task. This task factory replaces the Trace object
with a copy of itself.

"""
def task_factory_impl(loop, coro):
async def wrapper():
current_trace = current_trace_var.get(None)
if current_trace is not None:
current_trace_var.set(current_trace.copy())
return await coro

if parent_factory is None:
task = asyncio.tasks.Task(wrapper(), loop=loop)
else:
task = parent_factory(wrapper())

return task

task_factory_impl.__trace_task_factory__ = True
return task_factory_impl


class AsyncioTracer(Tracer):
def __init__(self, client):
"""Initialize, and ensure that our task factory is set up."""
super().__init__(client)

loop = asyncio.get_running_loop() # pylint: disable=no-member

task_factory = loop.get_task_factory()
if task_factory is None or not task_factory.__trace_task_factory__:
new_task_factory = create_task_factory(task_factory)
loop.set_task_factory(new_task_factory)

@property
def _trace(self):
return current_trace_var.get(None)

@_trace.setter
def _trace(self, new_trace):
current_trace_var.set(new_trace)


def traced_impl(tracer_fn, name, trace_id, parent_id):
"""Implementation of the traced decorator including async support.

The async version needs to be different, because the trace should
cover the execution of the whole decorated function. If using the
synchronous version, the trace would only cover the time it takes
to return the coroutine object.

"""
def wrapped(fn):
if asyncio.iscoroutinefunction(fn):
@functools.wraps(fn)
async def async_inner(*args, **kwargs):
with tracer_fn(name=name, trace_id=trace_id, parent_id=parent_id):
return await fn(*args, **kwargs)

return async_inner

else:
@functools.wraps(fn)
def inner(*args, **kwargs):
with tracer_fn(name=name, trace_id=trace_id, parent_id=parent_id):
return fn(*args, **kwargs)

return inner

return wrapped


def untraced(fn):
"""Async function decorator detaching from any ongoing trace.

This decorator is necessary for starting independent async tasks
from within a trace, since async tasks inherit trace state by
default.

"""

# Both synchronous and asynchronous functions may create tasks.
if asyncio.iscoroutinefunction(fn):
@functools.wraps(fn)
async def wrapped(*args, **kwargs):
try:
token = None
current_trace = current_trace_var.get(None)
if current_trace is not None:
token = current_trace_var.set(None)

return await fn(*args, **kwargs)
finally:
if token is not None:
current_trace_var.reset(token)

return wrapped

else:
@functools.wraps(fn)
def wrapped(*args, **kwargs):
try:
token = None
current_trace = current_trace_var.get(None)
if current_trace is not None:
token = current_trace_var.set(None)

return fn(*args, **kwargs)
finally:
if token is not None:
current_trace_var.reset(token)

return wrapped
Loading