Skip to content
This repository has been archived by the owner on Jul 11, 2022. It is now read-only.

Commit

Permalink
Expand Reporter and Config tests for ZipkinV2
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Fitzpatrick <rmfitzpatrick@signalfx.com>
  • Loading branch information
Ryan Fitzpatrick committed Jul 9, 2018
1 parent cdbd4e1 commit b3b5226
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 43 deletions.
23 changes: 21 additions & 2 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
from jaeger_client import Config, ConstSampler, ProbabilisticSampler, RateLimitingSampler
from jaeger_client.config import DEFAULT_THROTTLER_PORT
from jaeger_client.metrics import MetricsFactory
from jaeger_client.reporter import NullReporter
from jaeger_client.reporter import NullReporter, ZipkinV2Reporter
from jaeger_client import constants


class ConfigTests(unittest.TestCase):

def setUp(self):
if Config.initialized():
Config._initialized = False

def test_enabled(self):
c = Config({'enabled': True}, service_name='x')
assert c.enabled
Expand Down Expand Up @@ -142,14 +146,23 @@ def test_disable_metrics(self):
def test_initialize_tracer(self):
c = Config({}, service_name='x')
tracer = c.initialize_tracer()

assert opentracing.tracer == tracer

def test_global_tracer_initializaion(self):
c = Config({}, service_name='x')
tracer = c.initialize_tracer()
assert tracer
attempt = c.initialize_tracer()
assert attempt is None

def test_reporter_type(self):
c = Config({'reporter_type': 'JaEGer'}, service_name='x')
assert c.reporter_type == 'jaeger'
c = Config({'reporter_type': 'ZiPKIn_V2'}, service_name='x')
assert c.reporter_type == 'zipkin_v2'
c = Config({'reporter_type': 'NotAReporterType'}, service_name='x')
with self.assertRaises(ValueError):
c.reporter_type

def test_headers(self):
c = Config({}, service_name='x')
Expand All @@ -163,3 +176,9 @@ def test_zipkin_spans_url(self):
assert c.zipkin_spans_url == 'http://localhost:9411/api/v2/spans'
c = Config({'zipkin_spans_url': 'someurl'}, service_name='x')
assert c.zipkin_spans_url == 'someurl'

def test_initialize_trace_with_zipkin_reporter(self):
c = Config({'reporter_type': 'zipkin_v2',
'headers': {'Header': 'Value'}}, service_name='x')
tracer = c.initialize_tracer()
assert isinstance(tracer.reporter, ZipkinV2Reporter)
173 changes: 132 additions & 41 deletions tests/test_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import logging
import time
import collections
import json

import mock
import pytest
Expand All @@ -30,7 +31,7 @@
from jaeger_client.utils import ErrorReporter
from tornado.ioloop import IOLoop
from tornado.testing import AsyncTestCase, gen_test
from jaeger_client.reporter import ThriftReporter
from jaeger_client.reporter import QueueReporter, ThriftReporter, ZipkinV2Reporter
from jaeger_client.ioloop_util import future_result


Expand Down Expand Up @@ -58,6 +59,22 @@ def test_logging_reporter():
reporter.close().result()


def test_queue_reporter():
with pytest.raises(NotImplementedError):
QueueReporter(mock.MagicMock())

with mock.patch.object(QueueReporter, 'create_agent') as create_agent:
channel = mock.MagicMock()
reporter = QueueReporter(channel)
create_agent.assert_called_with(channel)

with pytest.raises(NotImplementedError):
reporter._send('batch').result()

with pytest.raises(NotImplementedError):
reporter.make_batch('spans', 'process')


def test_composite_reporter():
reporter = jaeger_client.reporter.CompositeReporter(
jaeger_client.reporter.NullReporter(),
Expand Down Expand Up @@ -132,7 +149,7 @@ def _incr_count(self, key, value):
self.counters[key] = value + self.counters.get(key, 0)


class ThriftReporterTest(AsyncTestCase):
class QueueReporterTest(object):
@pytest.yield_fixture
def thread_loop(self):
yield
Expand All @@ -147,20 +164,6 @@ def _new_span(name):
span.end_time = span.start_time + 0.001 # 1ms
return span

@staticmethod
def _new_reporter(batch_size, flush=None, queue_cap=100):
reporter = ThriftReporter(channel=mock.MagicMock(),
io_loop=IOLoop.current(),
batch_size=batch_size,
flush_interval=flush,
metrics_factory=FakeMetricsFactory(),
error_reporter=HardErrorReporter(),
queue_capacity=queue_cap)
reporter.set_process('service', {}, max_length=0)
sender = FakeSender()
reporter._send = sender
return reporter, sender

@tornado.gen.coroutine
def _wait_for(self, fn):
"""Wait until fn() returns truth, but not longer than 1 second."""
Expand Down Expand Up @@ -233,31 +236,6 @@ def test_submit_queue_full_batch_size_1(self):
sender.futures[1].set_result(1)
yield reporter.close()

@gen_test
def test_submit_batch_size_2(self):
reporter, sender = self._new_reporter(batch_size=2, flush=0.01)
reporter.report_span(self._new_span('1'))
yield tornado.gen.sleep(0.001)
assert 0 == len(sender.futures)

reporter.report_span(self._new_span('2'))
yield self._wait_for(lambda: len(sender.futures) > 0)
assert 1 == len(sender.futures)
assert 2 == len(sender.requests[0].spans)
sender.futures[0].set_result(1)

# 3rd span will not be submitted right away, but after `flush` interval
reporter.report_span(self._new_span('3'))
yield tornado.gen.sleep(0.001)
assert 1 == len(sender.futures)
yield tornado.gen.sleep(0.001)
assert 1 == len(sender.futures)
yield tornado.gen.sleep(0.01)
assert 2 == len(sender.futures)
sender.futures[1].set_result(1)

yield reporter.close()

@gen_test
def test_close_drains_queue(self):
reporter, sender = self._new_reporter(batch_size=1, flush=0.050)
Expand Down Expand Up @@ -291,3 +269,116 @@ def send(_):
yield reporter.close()
assert reporter.queue.qsize() == 0, 'all spans drained'
assert count[0] == 4, 'last span submitted in one extrac batch'

@gen_test
def test_invalid_capacity_size(self):
with pytest.raises(ValueError):
self._new_reporter(batch_size=2, queue_cap=1)


class ThriftReporterTest(QueueReporterTest, AsyncTestCase):

@staticmethod
def _new_reporter(batch_size, flush=None, queue_cap=100):
reporter = ThriftReporter(channel=mock.MagicMock(),
io_loop=IOLoop.current(),
batch_size=batch_size,
flush_interval=flush,
metrics_factory=FakeMetricsFactory(),
error_reporter=HardErrorReporter(),
queue_capacity=queue_cap)
reporter.set_process('service', {}, max_length=0)
sender = FakeSender()
reporter._send = sender
return reporter, sender

@gen_test
def test_submit_batch_size_2(self):
reporter, sender = self._new_reporter(batch_size=2, flush=0.01)
reporter.report_span(self._new_span('1'))
yield tornado.gen.sleep(0.001)
assert 0 == len(sender.futures)

reporter.report_span(self._new_span('2'))
yield self._wait_for(lambda: len(sender.futures) > 0)
assert 1 == len(sender.futures)
assert 2 == len(sender.requests[0].spans)
sender.futures[0].set_result(1)

# 3rd span will not be submitted right away, but after `flush` interval
reporter.report_span(self._new_span('3'))
yield tornado.gen.sleep(0.001)
assert 1 == len(sender.futures)
yield tornado.gen.sleep(0.001)
assert 1 == len(sender.futures)
yield tornado.gen.sleep(0.01)
assert 2 == len(sender.futures)
sender.futures[1].set_result(1)

yield reporter.close()


class ZipkinV2ReporterTest(QueueReporterTest, AsyncTestCase):

@staticmethod
def _new_reporter(batch_size, flush=None, queue_cap=100):
reporter = ZipkinV2Reporter(channel=mock.MagicMock(),
spans_url='api/v2/spans',
io_loop=IOLoop.current(),
batch_size=batch_size,
flush_interval=flush,
metrics_factory=FakeMetricsFactory(),
error_reporter=HardErrorReporter(),
queue_capacity=queue_cap)
reporter.set_process('service', {}, max_length=0)
sender = FakeSender()
reporter._send = sender
return reporter, sender

@gen_test
def test_submit_batch_size_2(self):
reporter, sender = self._new_reporter(batch_size=2, flush=0.01)
reporter.report_span(self._new_span('1'))
yield tornado.gen.sleep(0.001)
assert 0 == len(sender.futures)

reporter.report_span(self._new_span('2'))
yield self._wait_for(lambda: len(sender.futures) > 0)
assert 1 == len(sender.futures)
assert 2 == len(sender.requests[0])
sender.futures[0].set_result(1)

# 3rd span will not be submitted right away, but after `flush` interval
reporter.report_span(self._new_span('3'))
yield tornado.gen.sleep(0.001)
assert 1 == len(sender.futures)
yield tornado.gen.sleep(0.001)
assert 1 == len(sender.futures)
yield tornado.gen.sleep(0.01)
assert 2 == len(sender.futures)
sender.futures[1].set_result(1)

yield reporter.close()

@gen_test
def test_send_submits_request(self):
url = 'http://myzipkin/api/v2/spans'
headers = {'MyHeader': 'MyHeaderVal'}
reporter = ZipkinV2Reporter(channel=mock.MagicMock(),
spans_url=url,
headers=headers,
io_loop=IOLoop.current(),
batch_size=1,
metrics_factory=FakeMetricsFactory(),
error_reporter=HardErrorReporter())
reporter.set_process('service', {}, max_length=0)
with mock.patch('tornado.httpclient.AsyncHTTPClient.fetch') as sender:
span = self._new_span('1')
batch = reporter.make_batch([span], reporter._process)
reporter.report_span(span)
yield tornado.gen.sleep(0.001)
request = sender.call_args[0][0]
assert json.loads(request.body.decode('utf8')) == batch
assert request.url == url
headers.update({'content-type': 'application/json'})
assert request.headers == headers

0 comments on commit b3b5226

Please sign in to comment.