Skip to content

Commit

Permalink
Add client API (#178)
Browse files Browse the repository at this point in the history
* Refactor testagent fixture

Pull it out of the snapshot integration module so we can reuse it for
the client tests.

* Add client
  • Loading branch information
Kyle-Verhoog authored Feb 17, 2024
1 parent 07b485b commit f575987
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 93 deletions.
139 changes: 139 additions & 0 deletions ddapm_test_agent/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import time
from typing import Any
from typing import List
from typing import cast
import urllib.parse

import requests

from ddapm_test_agent.trace import Trace


class TestAgentClient:
__test__ = False

def __init__(self, base_url: str):
self._base_url = base_url
self._session = requests.Session()

def _url(self, path: str) -> str:
return urllib.parse.urljoin(self._base_url, path)

def traces(self, clear: bool = False, **kwargs: Any) -> List[Trace]:
resp = self._session.get(self._url("/test/session/traces"), **kwargs)
if clear:
self.clear()
json = resp.json()
return cast(List[Trace], json)

def requests(self, **kwargs: Any) -> List[Any]:
resp = self._session.get(self._url("/test/session/requests"), **kwargs)
json = resp.json()
return cast(List[Any], json)

def raw_telemetry(self, clear: bool = False) -> List[Any]:
raw_reqs = self.requests()
reqs = []
for req in raw_reqs:
if req["url"].endswith("/telemetry/proxy/api/v2/apmtelemetry"):
reqs.append(req)
if clear:
self.clear()
return reqs

def telemetry(self, clear: bool = False, **kwargs: Any) -> List[Any]:
resp = self._session.get(self._url("/test/session/apmtelemetry"), **kwargs)
if clear:
self.clear()
return cast(List[Any], resp.json())

def clear(self, **kwargs: Any) -> None:
self._session.get(self._url("/test/session/clear"), **kwargs)

def info(self, **kwargs):
resp = self._session.get(self._url("/info"), **kwargs)
json = resp.json()
return json

def wait_for_num_traces(self, num: int, clear: bool = False, wait_loops: int = 30) -> List[Trace]:
"""Wait for `num` traces to be received from the test agent.
Returns after the number of traces has been received or raises otherwise after 2 seconds of polling.
Returned traces are sorted by the first span start time to simplify assertions for more than one trace by knowing that returned traces are in the same order as they have been created.
"""
num_received = 0
traces = []
for i in range(wait_loops):
try:
traces = self.traces(clear=False)
except requests.exceptions.RequestException:
pass
else:
num_received = len(traces)
if num_received == num:
if clear:
self.clear()
return sorted(traces, key=lambda trace: trace[0]["start"])
time.sleep(0.1)
raise ValueError(
"Number (%r) of traces not available from test agent, got %r:\n%r" % (num, num_received, traces)
)

def wait_for_num_spans(self, num: int, clear: bool = False, wait_loops: int = 30) -> List[Trace]:
"""Wait for `num` spans to be received from the test agent.
Returns after the number of spans has been received or raises otherwise after 2 seconds of polling.
Returned traces are sorted by the first span start time to simplify assertions for more than one trace by knowing that returned traces are in the same order as they have been created.
"""
num_received = None
for i in range(wait_loops):
try:
traces = self.traces(clear=False)
except requests.exceptions.RequestException:
pass
else:
num_received = 0
for trace in traces:
num_received += len(trace)
if num_received == num:
if clear:
self.clear()
return sorted(traces, key=lambda trace: trace[0]["start"])
time.sleep(0.1)
raise ValueError("Number (%r) of spans not available from test agent, got %r" % (num, num_received))

def wait_for_telemetry_event(self, event_name: str, clear: bool = False, wait_loops: int = 200) -> Any:
"""Wait for and return the given telemetry event from the test agent."""
for i in range(wait_loops):
try:
events = self.telemetry(clear=False)
except requests.exceptions.RequestException:
pass
else:
for event in events:
if event["request_type"] == "message-batch":
for message in event["payload"]:
if message["request_type"] == event_name:
if clear:
self.clear()
return message
elif event["request_type"] == event_name:
if clear:
self.clear()
return event
time.sleep(0.01)
raise AssertionError("Telemetry event %r not found" % event_name)

def wait_to_start(self, num_tries: int = 50, delay: float = 0.1) -> None:
exc = []
for i in range(num_tries):
try:
self.info()
except requests.exceptions.RequestException as e:
exc.append(e)
time.sleep(delay)
else:
return
raise AssertionError(f"Test agent did not start in time ({num_tries * delay} seconds). Got {exc[-1]}")
5 changes: 5 additions & 0 deletions releasenotes/notes/client-173dbc6655e42337.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
features:
- |
Add a `client` module which provides an API client that can be used to
programmatically interface with the test agent.
62 changes: 60 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import asyncio
import json
import os
from pathlib import Path
import random
import socket
import subprocess
from typing import AsyncGenerator
from typing import Awaitable
from typing import Dict
from typing import Generator
Expand All @@ -11,6 +15,9 @@
from typing import Set
from typing import cast

import aiohttp
from aiohttp.client_exceptions import ClientConnectorError
from aiohttp.client_exceptions import ClientOSError
from aiohttp.web import Response
from ddsketch import LogCollapsingLowestDenseDDSketch
from ddsketch.pb.proto import DDSketchProto
Expand Down Expand Up @@ -519,9 +526,60 @@ def fn(token: Optional[str] = None) -> Awaitable[Response]:


@pytest.fixture
def available_port():
def available_port() -> str:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("", 0)) # Bind to a free port provided by the host.
port = s.getsockname()[1] # Get the port number assigned.
s.close() # Release the socket.
yield port
return str(port)


@pytest.fixture
def testagent_port(available_port: str) -> str:
return available_port


@pytest.fixture
def testagent_url(testagent_port: str) -> str:
return "http://localhost:%s" % testagent_port


@pytest.fixture(scope="module")
def testagent_snapshot_ci_mode() -> bool:
# Default all tests in this module to be run in CI mode
# unless a special env var is passed to make generating
# the snapshots easier.
return os.getenv("GENERATE_SNAPSHOTS") != "1"


@pytest.fixture
async def testagent(
loop: asyncio.BaseEventLoop, testagent_port: str, testagent_snapshot_ci_mode: bool
) -> AsyncGenerator[aiohttp.ClientSession, None]:
env = os.environ.copy()
env.update(
{
"PORT": testagent_port,
"SNAPSHOT_CI": "1" if testagent_snapshot_ci_mode else "0",
"SNAPSHOT_DIR": os.path.join(os.path.dirname(__file__), "integration_snapshots"),
}
)
p = subprocess.Popen(["ddapm-test-agent"], env=env)

# Wait for server to start
try:
async with aiohttp.ClientSession() as session:
for _ in range(100):
try:
r = await session.get(f"http://localhost:{testagent_port}")
except (ClientConnectorError, ClientOSError):
pass
else:
if r.status == 404:
break
await asyncio.sleep(0.05)
else:
assert 0
yield session
finally:
p.terminate()
32 changes: 32 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from ddtrace import Tracer
import pytest

from ddapm_test_agent.client import TestAgentClient


@pytest.fixture
async def client(testagent, testagent_url):
return TestAgentClient(testagent_url)


@pytest.fixture
async def tracer(testagent_url):
t = Tracer(testagent_url)
yield t
t.shutdown()


async def test_client_traces(client: TestAgentClient, tracer: Tracer) -> None:
assert client.traces() == []
with tracer.trace("test"):
pass
traces = client.wait_for_num_traces(1)
assert len(traces) == 1


async def test_client_requests(client: TestAgentClient, tracer: Tracer) -> None:
assert client.requests() == []
with tracer.trace("test"):
pass
tracer.flush()
assert len(client.requests()) == 1
Loading

0 comments on commit f575987

Please sign in to comment.