Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patch gevent to work with tracing #1028

Merged
merged 8 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 73 additions & 2 deletions baseplate/lib/tracing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from collections.abc import Sequence
from typing import Optional
from collections.abc import Callable, Sequence
from typing import Any, Optional, Protocol

import gevent.pool
from opentelemetry import context
from opentelemetry.context import Context
from opentelemetry.sdk.trace.sampling import Decision, Sampler, SamplingResult
from opentelemetry.trace import Link, SpanKind, TraceState
Expand Down Expand Up @@ -42,3 +44,72 @@ def should_sample(

def get_description(self) -> str:
return f"RateLimited(fixed rate sampling {self.rps})"


# Greenlet tracing utils
__Greenlet = gevent.Greenlet
__IMap = gevent.pool.IMap
__IMapUnordered = gevent.pool.IMapUnordered


class Runnable(Protocol):
@property
def trace_context(self) -> Context: ...

chriskuehl marked this conversation as resolved.
Show resolved Hide resolved
run: Callable


class TracingMixin:
def __init__(self: Runnable, *args: Any, **kwargs: Any) -> None:
self.bp_trace_context = context.get_current()
super().__init__(*args, **kwargs)

def run(self: Runnable) -> None:
context.attach(self.bp_trace_context)
super().run()


class TracedGreenlet(TracingMixin, gevent.Greenlet): ...


class TracedIMapUnordered(TracingMixin, gevent.pool.IMapUnordered): ...


class TracedIMap(TracedIMapUnordered, gevent.pool.IMap): ...


def patch_greenlet_tracing() -> None:
if getattr(gevent, "__rddt_patch", False):
return
gevent.__rddt_patch = True
_replace(TracedGreenlet, TracedIMap, TracedIMapUnordered)


def unpatch_greenlet_tracing() -> None:
if not getattr(gevent, "__rddt_patch", False):
return
gevent.__rddt_patch = False

_replace(__Greenlet, __IMap, __IMapUnordered)


def _replace(
g_class: gevent.Greenlet,
imap_class: gevent.pool.IMap,
imap_unordered_class: gevent.pool.IMapUnordered,
) -> None:
gevent.greenlet.Greenlet = g_class
gevent.pool.Group.greenlet_class = g_class
gevent.pool.Greenlet = g_class
gevent._imap.Greenlet = g_class

# replace gevent shortcuts
gevent.Greenlet = gevent.greenlet.Greenlet
gevent.spawn = gevent.greenlet.Greenlet.spawn
gevent.spawn_later = gevent.greenlet.Greenlet.spawn_later

# replace the original IMap classes with the new one
gevent._imap.IMap = imap_class
gevent.pool.IMap = imap_class
gevent._imap.IMapUnordered = imap_unordered_class
gevent.pool.IMapUnordered = imap_unordered_class
2 changes: 2 additions & 0 deletions bin/baseplate-serve
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
from gevent.monkey import patch_all

from baseplate.server.monkey import patch_stdlib_queues
from baseplate.lib.tracing import patch_greenlet_tracing

patch_all()
patch_stdlib_queues()
patch_greenlet_tracing()

try:
import psycopg2
Expand Down
60 changes: 60 additions & 0 deletions tests/unit/lib/tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import logging
import unittest

import gevent
from gevent import select
from opentelemetry import trace
from opentelemetry.test.test_base import TestBase

from baseplate.lib.tracing import patch_greenlet_tracing, unpatch_greenlet_tracing

logger = logging.getLogger(__name__)


class PatchedTestCase(unittest.TestCase):
def setUp(self):
super().setUp()
patch_greenlet_tracing()

def tearDown(self):
super().tearDown()
unpatch_greenlet_tracing()


class TestGevent(PatchedTestCase, TestBase):
def test_context_with_patch(self):
"""Trace context is passed to greenlets"""

def gr1():
with trace.get_tracer("gr1").start_as_current_span("child"):
select.select([], [], [], 2)

with trace.get_tracer(__name__).start_as_current_span("parent"):
gevent.joinall(
[
gevent.spawn(gr1),
]
)

finished_spans = self.get_finished_spans()
self.assertGreater(len(finished_spans), 0)
self.assertEqual(finished_spans[0].parent.span_id, finished_spans[1].context.span_id)

def test_context_without_patch(self):
"""Trace context is not passed if we explicitly don't patch"""
unpatch_greenlet_tracing()

def gr1():
with trace.get_tracer("gr1").start_as_current_span("child"):
select.select([], [], [], 2)

with trace.get_tracer(__name__).start_as_current_span("parent"):
gevent.joinall(
[
gevent.spawn(gr1),
]
)

finished_spans = self.get_finished_spans()
self.assertGreater(len(finished_spans), 0)
self.assertIsNone(finished_spans[0].parent)
Loading