From b3b5226176323385118b02c3b0aee6659d51ac79 Mon Sep 17 00:00:00 2001 From: Ryan Fitzpatrick Date: Mon, 9 Jul 2018 11:10:46 -0400 Subject: [PATCH] Expand Reporter and Config tests for ZipkinV2 Signed-off-by: Ryan Fitzpatrick --- tests/test_config.py | 23 +++++- tests/test_reporter.py | 173 +++++++++++++++++++++++++++++++---------- 2 files changed, 153 insertions(+), 43 deletions(-) diff --git a/tests/test_config.py b/tests/test_config.py index 7c7fd380..275c8be0 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -19,12 +19,16 @@ from jaeger_client import Config, ConstSampler, ProbabilisticSampler, RateLimitingSampler from jaeger_client.config import DEFAULT_THROTTLER_PORT from jaeger_client.metrics import MetricsFactory -from jaeger_client.reporter import NullReporter +from jaeger_client.reporter import NullReporter, ZipkinV2Reporter from jaeger_client import constants class ConfigTests(unittest.TestCase): + def setUp(self): + if Config.initialized(): + Config._initialized = False + def test_enabled(self): c = Config({'enabled': True}, service_name='x') assert c.enabled @@ -142,14 +146,23 @@ def test_disable_metrics(self): def test_initialize_tracer(self): c = Config({}, service_name='x') tracer = c.initialize_tracer() - assert opentracing.tracer == tracer + def test_global_tracer_initializaion(self): + c = Config({}, service_name='x') + tracer = c.initialize_tracer() + assert tracer + attempt = c.initialize_tracer() + assert attempt is None + def test_reporter_type(self): c = Config({'reporter_type': 'JaEGer'}, service_name='x') assert c.reporter_type == 'jaeger' c = Config({'reporter_type': 'ZiPKIn_V2'}, service_name='x') assert c.reporter_type == 'zipkin_v2' + c = Config({'reporter_type': 'NotAReporterType'}, service_name='x') + with self.assertRaises(ValueError): + c.reporter_type def test_headers(self): c = Config({}, service_name='x') @@ -163,3 +176,9 @@ def test_zipkin_spans_url(self): assert c.zipkin_spans_url == 'http://localhost:9411/api/v2/spans' c = Config({'zipkin_spans_url': 'someurl'}, service_name='x') assert c.zipkin_spans_url == 'someurl' + + def test_initialize_trace_with_zipkin_reporter(self): + c = Config({'reporter_type': 'zipkin_v2', + 'headers': {'Header': 'Value'}}, service_name='x') + tracer = c.initialize_tracer() + assert isinstance(tracer.reporter, ZipkinV2Reporter) diff --git a/tests/test_reporter.py b/tests/test_reporter.py index d166985a..b2dbc76b 100644 --- a/tests/test_reporter.py +++ b/tests/test_reporter.py @@ -18,6 +18,7 @@ import logging import time import collections +import json import mock import pytest @@ -30,7 +31,7 @@ from jaeger_client.utils import ErrorReporter from tornado.ioloop import IOLoop from tornado.testing import AsyncTestCase, gen_test -from jaeger_client.reporter import ThriftReporter +from jaeger_client.reporter import QueueReporter, ThriftReporter, ZipkinV2Reporter from jaeger_client.ioloop_util import future_result @@ -58,6 +59,22 @@ def test_logging_reporter(): reporter.close().result() +def test_queue_reporter(): + with pytest.raises(NotImplementedError): + QueueReporter(mock.MagicMock()) + + with mock.patch.object(QueueReporter, 'create_agent') as create_agent: + channel = mock.MagicMock() + reporter = QueueReporter(channel) + create_agent.assert_called_with(channel) + + with pytest.raises(NotImplementedError): + reporter._send('batch').result() + + with pytest.raises(NotImplementedError): + reporter.make_batch('spans', 'process') + + def test_composite_reporter(): reporter = jaeger_client.reporter.CompositeReporter( jaeger_client.reporter.NullReporter(), @@ -132,7 +149,7 @@ def _incr_count(self, key, value): self.counters[key] = value + self.counters.get(key, 0) -class ThriftReporterTest(AsyncTestCase): +class QueueReporterTest(object): @pytest.yield_fixture def thread_loop(self): yield @@ -147,20 +164,6 @@ def _new_span(name): span.end_time = span.start_time + 0.001 # 1ms return span - @staticmethod - def _new_reporter(batch_size, flush=None, queue_cap=100): - reporter = ThriftReporter(channel=mock.MagicMock(), - io_loop=IOLoop.current(), - batch_size=batch_size, - flush_interval=flush, - metrics_factory=FakeMetricsFactory(), - error_reporter=HardErrorReporter(), - queue_capacity=queue_cap) - reporter.set_process('service', {}, max_length=0) - sender = FakeSender() - reporter._send = sender - return reporter, sender - @tornado.gen.coroutine def _wait_for(self, fn): """Wait until fn() returns truth, but not longer than 1 second.""" @@ -233,31 +236,6 @@ def test_submit_queue_full_batch_size_1(self): sender.futures[1].set_result(1) yield reporter.close() - @gen_test - def test_submit_batch_size_2(self): - reporter, sender = self._new_reporter(batch_size=2, flush=0.01) - reporter.report_span(self._new_span('1')) - yield tornado.gen.sleep(0.001) - assert 0 == len(sender.futures) - - reporter.report_span(self._new_span('2')) - yield self._wait_for(lambda: len(sender.futures) > 0) - assert 1 == len(sender.futures) - assert 2 == len(sender.requests[0].spans) - sender.futures[0].set_result(1) - - # 3rd span will not be submitted right away, but after `flush` interval - reporter.report_span(self._new_span('3')) - yield tornado.gen.sleep(0.001) - assert 1 == len(sender.futures) - yield tornado.gen.sleep(0.001) - assert 1 == len(sender.futures) - yield tornado.gen.sleep(0.01) - assert 2 == len(sender.futures) - sender.futures[1].set_result(1) - - yield reporter.close() - @gen_test def test_close_drains_queue(self): reporter, sender = self._new_reporter(batch_size=1, flush=0.050) @@ -291,3 +269,116 @@ def send(_): yield reporter.close() assert reporter.queue.qsize() == 0, 'all spans drained' assert count[0] == 4, 'last span submitted in one extrac batch' + + @gen_test + def test_invalid_capacity_size(self): + with pytest.raises(ValueError): + self._new_reporter(batch_size=2, queue_cap=1) + + +class ThriftReporterTest(QueueReporterTest, AsyncTestCase): + + @staticmethod + def _new_reporter(batch_size, flush=None, queue_cap=100): + reporter = ThriftReporter(channel=mock.MagicMock(), + io_loop=IOLoop.current(), + batch_size=batch_size, + flush_interval=flush, + metrics_factory=FakeMetricsFactory(), + error_reporter=HardErrorReporter(), + queue_capacity=queue_cap) + reporter.set_process('service', {}, max_length=0) + sender = FakeSender() + reporter._send = sender + return reporter, sender + + @gen_test + def test_submit_batch_size_2(self): + reporter, sender = self._new_reporter(batch_size=2, flush=0.01) + reporter.report_span(self._new_span('1')) + yield tornado.gen.sleep(0.001) + assert 0 == len(sender.futures) + + reporter.report_span(self._new_span('2')) + yield self._wait_for(lambda: len(sender.futures) > 0) + assert 1 == len(sender.futures) + assert 2 == len(sender.requests[0].spans) + sender.futures[0].set_result(1) + + # 3rd span will not be submitted right away, but after `flush` interval + reporter.report_span(self._new_span('3')) + yield tornado.gen.sleep(0.001) + assert 1 == len(sender.futures) + yield tornado.gen.sleep(0.001) + assert 1 == len(sender.futures) + yield tornado.gen.sleep(0.01) + assert 2 == len(sender.futures) + sender.futures[1].set_result(1) + + yield reporter.close() + + +class ZipkinV2ReporterTest(QueueReporterTest, AsyncTestCase): + + @staticmethod + def _new_reporter(batch_size, flush=None, queue_cap=100): + reporter = ZipkinV2Reporter(channel=mock.MagicMock(), + spans_url='api/v2/spans', + io_loop=IOLoop.current(), + batch_size=batch_size, + flush_interval=flush, + metrics_factory=FakeMetricsFactory(), + error_reporter=HardErrorReporter(), + queue_capacity=queue_cap) + reporter.set_process('service', {}, max_length=0) + sender = FakeSender() + reporter._send = sender + return reporter, sender + + @gen_test + def test_submit_batch_size_2(self): + reporter, sender = self._new_reporter(batch_size=2, flush=0.01) + reporter.report_span(self._new_span('1')) + yield tornado.gen.sleep(0.001) + assert 0 == len(sender.futures) + + reporter.report_span(self._new_span('2')) + yield self._wait_for(lambda: len(sender.futures) > 0) + assert 1 == len(sender.futures) + assert 2 == len(sender.requests[0]) + sender.futures[0].set_result(1) + + # 3rd span will not be submitted right away, but after `flush` interval + reporter.report_span(self._new_span('3')) + yield tornado.gen.sleep(0.001) + assert 1 == len(sender.futures) + yield tornado.gen.sleep(0.001) + assert 1 == len(sender.futures) + yield tornado.gen.sleep(0.01) + assert 2 == len(sender.futures) + sender.futures[1].set_result(1) + + yield reporter.close() + + @gen_test + def test_send_submits_request(self): + url = 'http://myzipkin/api/v2/spans' + headers = {'MyHeader': 'MyHeaderVal'} + reporter = ZipkinV2Reporter(channel=mock.MagicMock(), + spans_url=url, + headers=headers, + io_loop=IOLoop.current(), + batch_size=1, + metrics_factory=FakeMetricsFactory(), + error_reporter=HardErrorReporter()) + reporter.set_process('service', {}, max_length=0) + with mock.patch('tornado.httpclient.AsyncHTTPClient.fetch') as sender: + span = self._new_span('1') + batch = reporter.make_batch([span], reporter._process) + reporter.report_span(span) + yield tornado.gen.sleep(0.001) + request = sender.call_args[0][0] + assert json.loads(request.body.decode('utf8')) == batch + assert request.url == url + headers.update({'content-type': 'application/json'}) + assert request.headers == headers