From 80a59a51531fec5a0ebd156a984232fba0b76a1a Mon Sep 17 00:00:00 2001 From: Ryan Fitzpatrick Date: Thu, 28 Jun 2018 18:21:26 -0400 Subject: [PATCH] Add Zipkin API V2 Reporter Expands span reporter selection by breaking out Reporter into base QueueReporter class and using reporting method-specific ThriftReporter and ZipkinV2Reporter. Also breaks LocalAgentSender into two classes including LocalAgentReader for ThriftReporter only usage of TBufferedTransport. These changes allow users to continue reporting spans oob to the local jaeger-agent but with the option of reporting to Zipkin backend. In the spirit of https://github.com/jaegertracing/jaeger-client-java/pull/399 and https://github.com/jaegertracing/jaeger-client-go/pull/310 Signed-off-by: Ryan Fitzpatrick --- crossdock/server/endtoend.py | 6 +- jaeger_client/config.py | 72 +++++++++++--- jaeger_client/local_agent_net.py | 46 +++++---- jaeger_client/reporter.py | 112 +++++++++++++++++---- jaeger_client/zipkin_v2.py | 161 +++++++++++++++++++++++++++++++ tests/test_config.py | 19 ++++ tests/test_reporter.py | 18 ++-- tests/test_zipkin_v2.py | 160 ++++++++++++++++++++++++++++++ 8 files changed, 533 insertions(+), 61 deletions(-) create mode 100644 jaeger_client/zipkin_v2.py create mode 100644 tests/test_zipkin_v2.py diff --git a/crossdock/server/endtoend.py b/crossdock/server/endtoend.py index 29058708..d5b317a6 100644 --- a/crossdock/server/endtoend.py +++ b/crossdock/server/endtoend.py @@ -28,7 +28,7 @@ SAMPLER_TYPE_REMOTE, ) from jaeger_client.sampler import RemoteControlledSampler, ConstSampler -from jaeger_client.reporter import Reporter +from jaeger_client.reporter import ThriftReporter from jaeger_client.throttler import RemoteThrottler from jaeger_client.tracer import Tracer @@ -67,8 +67,8 @@ def __init__(self): init_sampler = cfg.sampler channel = self.local_agent_sender - reporter = Reporter(channel=channel, - flush_interval=cfg.reporter_flush_interval) + reporter = ThriftReporter(channel=channel, + flush_interval=cfg.reporter_flush_interval) remote_sampler = RemoteControlledSampler( channel=channel, diff --git a/jaeger_client/config.py b/jaeger_client/config.py index c8aaf249..33cf06fb 100644 --- a/jaeger_client/config.py +++ b/jaeger_client/config.py @@ -21,12 +21,13 @@ import opentracing from opentracing.propagation import Format from . import Tracer -from .local_agent_net import LocalAgentSender +from .local_agent_net import LocalAgentReader, LocalAgentSender from .throttler import RemoteThrottler from .reporter import ( - Reporter, + ThriftReporter, CompositeReporter, LoggingReporter, + ZipkinV2Reporter, ) from .sampler import ( ConstSampler, @@ -115,6 +116,7 @@ def _validate_config(self, config): 'sampler', 'tags', 'enabled', + 'reporter_type', 'reporter_batch_size', 'propagation', 'max_tag_value_length', @@ -123,7 +125,9 @@ def _validate_config(self, config): 'trace_id_header', 'baggage_header_prefix', 'service_name', - 'throttler'] + 'throttler', + 'headers', + 'zipkin_spans_url'] config_keys = config.keys() unexpected_config_keys = [k for k in config_keys if k not in allowed_keys] if unexpected_config_keys: @@ -146,6 +150,13 @@ def error_reporter(self): def enabled(self): return get_boolean(self.config.get('enabled', True), True) + @property + def reporter_type(self): + rt = self.config.get('reporter_type', 'jaeger').lower() + if rt not in ('jaeger', 'zipkin_v2'): + raise ValueError('config reporter_type must be "jaeger" or "zipkin_v2"') + return rt + @property def reporter_batch_size(self): return int(self.config.get('reporter_batch_size', 10)) @@ -312,6 +323,14 @@ def throttler_refresh_interval(self): except: return DEFAULT_THROTTLER_REFRESH_INTERVAL + @property + def headers(self): + return self.config.get('headers', {}) + + @property + def zipkin_spans_url(self): + return self.config.get('zipkin_spans_url', 'http://localhost:9411/api/v2/spans') + @staticmethod def initialized(): with Config._initialized_lock: @@ -353,14 +372,32 @@ def new_tracer(self, io_loop=None): max_operations=self.max_operations) logger.info('Using sampler %s', sampler) - reporter = Reporter( - channel=channel, - queue_capacity=self.reporter_queue_size, - batch_size=self.reporter_batch_size, - flush_interval=self.reporter_flush_interval, - logger=logger, - metrics_factory=self._metrics_factory, - error_reporter=self.error_reporter) + if self.reporter_type == 'jaeger': + reporter = ThriftReporter( + channel=channel, + queue_capacity=self.reporter_queue_size, + batch_size=self.reporter_batch_size, + flush_interval=self.reporter_flush_interval, + logger=logger, + metrics_factory=self._metrics_factory, + error_reporter=self.error_reporter + ) + elif self.reporter_type == 'zipkin_v2': + kwargs = {} + if self.headers: + kwargs['headers'] = self.headers + + reporter = ZipkinV2Reporter( + channel=channel, + spans_url=self.zipkin_spans_url, + queue_capacity=self.reporter_queue_size, + batch_size=self.reporter_batch_size, + flush_interval=self.reporter_flush_interval, + logger=logger, + metrics_factory=self._metrics_factory, + error_reporter=self.error_reporter, + **kwargs + ) if self.logging: reporter = CompositeReporter(reporter, LoggingReporter(logger)) @@ -405,11 +442,20 @@ def _initialize_global_tracer(self, tracer): def _create_local_agent_channel(self, io_loop): """ Create an out-of-process channel communicating to local jaeger-agent. - Spans are submitted as SOCK_DGRAM Thrift, sampling strategy is polled - via JSON HTTP. + If using Jaeger backend, spans are submitted as SOCK_DGRAM Thrift, sampling + strategy is polled via JSON HTTP. :param self: instance of Config """ + if self.reporter_type == 'zipkin_v2': + logger.info('Initializing Jaeger Tracer with Zipkin V2 reporter') + return LocalAgentReader( + host=self.local_agent_reporting_host, + sampling_port=self.local_agent_sampling_port, + reporting_port=self.local_agent_reporting_port, + throttling_port=self.throttler_port, + io_loop=io_loop + ) logger.info('Initializing Jaeger Tracer with UDP reporter') return LocalAgentSender( host=self.local_agent_reporting_host, diff --git a/jaeger_client/local_agent_net.py b/jaeger_client/local_agent_net.py index 5266fc51..93f312b7 100644 --- a/jaeger_client/local_agent_net.py +++ b/jaeger_client/local_agent_net.py @@ -53,16 +53,12 @@ def request_throttling_credits(self, ] + [('operations', op) for op in operations]) -class LocalAgentSender(TBufferedTransport): +class LocalAgentReader(object): """ - LocalAgentSender implements everything necessary to communicate with - local jaeger-agent. This class is designed to work in tornado and - non-tornado environments. If in torndado, pass in the ioloop, if not - then LocalAgentSender will create one for itself. - - NOTE: LocalAgentSender derives from TBufferedTransport. This will buffer - up all written data until flush() is called. Flush gets called at the - end of the batch span submission call. + LocalAgentReader implements what is necessary to obtain sampling strategies + and throttling credits from the local jaeger-agent. This class is designed + to work in tornado and non-tornado environments. If in torndado, pass in the + ioloop, if not then LocalAgentSender will create one for itself. """ def __init__(self, host, sampling_port, reporting_port, io_loop=None, throttling_port=None): @@ -77,11 +73,6 @@ def __init__(self, host, sampling_port, reporting_port, io_loop=None, throttling if throttling_port: self.throttling_http = LocalAgentHTTP(host, throttling_port) - # UDP reporting - this will only get written to after our flush() call. - # We are buffering things up because we are a TBufferedTransport. - udp = TUDPTransport(host, reporting_port) - TBufferedTransport.__init__(self, udp) - def _create_new_thread_loop(self): """ Create a daemonized thread that will run Tornado IOLoop. @@ -92,10 +83,6 @@ def _create_new_thread_loop(self): self._thread_loop.start() return self._thread_loop._io_loop - def readFrame(self): - """Empty read frame that is never ready""" - return Future() - # Pass-through for HTTP sampling strategies request. def request_sampling_strategy(self, *args, **kwargs): return self.local_agent_http.request_sampling_strategy(*args, **kwargs) @@ -103,3 +90,26 @@ def request_sampling_strategy(self, *args, **kwargs): # Pass-through for HTTP throttling credit request. def request_throttling_credits(self, *args, **kwargs): return self.throttling_http.request_throttling_credits(*args, **kwargs) + + +class LocalAgentSender(LocalAgentReader, TBufferedTransport): + """ + LocalAgentSender implements everything necessary to report spans to + the local jaeger-agent. + + NOTE: LocalAgentSender derives from TBufferedTransport. This will buffer + up all written data until flush() is called. Flush gets called at the + end of the batch span submission call. + """ + + def __init__(self, host, sampling_port, reporting_port, io_loop=None, throttling_port=None): + LocalAgentReader.__init__(self, host, sampling_port, reporting_port, + io_loop, throttling_port) + # UDP reporting - this will only get written to after our flush() call. + # We are buffering things up because we are a TBufferedTransport. + udp = TUDPTransport(host, reporting_port) + TBufferedTransport.__init__(self, udp) + + def readFrame(self): + """Empty read frame that is never ready""" + return Future() diff --git a/jaeger_client/reporter.py b/jaeger_client/reporter.py index 1fedb995..5d4949ed 100644 --- a/jaeger_client/reporter.py +++ b/jaeger_client/reporter.py @@ -14,10 +14,12 @@ from __future__ import absolute_import +import json import logging import threading import tornado.gen +import tornado.httpclient import tornado.ioloop import tornado.queues import socket @@ -25,6 +27,7 @@ from .constants import DEFAULT_FLUSH_INTERVAL from . import thrift from . import ioloop_util +from . import zipkin_v2 from .metrics import Metrics, LegacyMetricsFactory from .utils import ErrorReporter @@ -73,8 +76,12 @@ def report_span(self, span): self.logger.info('Reporting span %s', span) -class Reporter(NullReporter): - """Receives completed spans from Tracer and submits them out of process.""" +class QueueReporter(NullReporter): + """ + Base Reporter class for handling, batching, and queuing Tracer-provided spans + for transmission to a backend service. Should not be used directly and instead + should be subclassed with appropriate create_agent(), _send(), and make_batch() methods. + """ def __init__(self, channel, queue_capacity=100, batch_size=10, flush_interval=DEFAULT_FLUSH_INTERVAL, io_loop=None, error_reporter=None, metrics=None, metrics_factory=None, @@ -97,14 +104,13 @@ def __init__(self, channel, queue_capacity=100, batch_size=10, """ from threading import Lock - self._channel = channel self.queue_capacity = queue_capacity self.batch_size = batch_size self.metrics_factory = metrics_factory or LegacyMetricsFactory(metrics or Metrics()) self.metrics = ReporterMetrics(self.metrics_factory) self.error_reporter = error_reporter or ErrorReporter(Metrics()) self.logger = kwargs.get('logger', default_logger) - self.agent = Agent.Client(self._channel, self) + self.agent = self.create_agent(channel) if queue_capacity < batch_size: raise ValueError('Queue capacity cannot be less than batch size') @@ -123,6 +129,9 @@ def __init__(self, channel, queue_capacity=100, batch_size=10, self._process_lock = Lock() self._process = None + def create_agent(self, channel): + raise NotImplementedError() + def set_process(self, service_name, tags, max_length): with self._process_lock: self._process = thrift.make_process( @@ -177,14 +186,8 @@ def _consume_queue(self): self.metrics.reporter_queue_length(self.queue.qsize()) self.logger.info('Span publisher exited') - # method for protocol factory - def getProtocol(self, transport): - """ - Implements Thrift ProtocolFactory interface - :param: transport: - :return: Thrift compact protocol - """ - return TCompactProtocol.TCompactProtocol(transport) + def make_batch(self, spans, process): + raise NotImplementedError() @tornado.gen.coroutine def _submit(self, spans): @@ -195,7 +198,7 @@ def _submit(self, spans): if not process: return try: - batch = thrift.make_jaeger_batch(spans=spans, process=process) + batch = self.make_batch(spans, process) yield self._send(batch) self.metrics.reporter_success(len(spans)) except socket.error as e: @@ -209,11 +212,7 @@ def _submit(self, spans): @tornado.gen.coroutine def _send(self, batch): - """ - Send batch of spans out via thrift transport. Any exceptions thrown - will be caught above in the exception handler of _submit(). - """ - return self.agent.emitBatch(batch) + raise NotImplementedError() def close(self): """ @@ -275,3 +274,80 @@ def on_close(_): f.add_done_callback(on_close) return future + + +class ThriftReporter(QueueReporter): + """ + Receives completed spans from Tracer and submits them out of process to local + jaeger-agent. + """ + + def __init__(self, channel, queue_capacity=100, batch_size=10, + flush_interval=DEFAULT_FLUSH_INTERVAL, io_loop=None, + error_reporter=None, metrics=None, metrics_factory=None, + **kwargs): + QueueReporter.__init__(self, channel, queue_capacity, batch_size, flush_interval, + io_loop, error_reporter, metrics, metrics_factory, **kwargs) + + def create_agent(self, channel): + self.agent = Agent.Client(channel, self) + + def make_batch(self, spans, process): + return thrift.make_jaeger_batch(spans=spans, process=process) + + # method for protocol factory + def getProtocol(self, transport): + """ + Implements Thrift ProtocolFactory interface + :param: transport: + :return: Thrift compact protocol + """ + return TCompactProtocol.TCompactProtocol(transport) + + @tornado.gen.coroutine + def _send(self, batch): + """ + Send batch of spans out via thrift transport. Any exceptions thrown + will be caught above in the exception handler of _submit(). + """ + return self.agent.emitBatch(batch) + + +Reporter = ThriftReporter # backward compatibility + + +class ZipkinV2Reporter(QueueReporter): + """Receives completed spans from Tracer and submits to Zipkin's /api/v2/spans endpoint""" + + def __init__(self, channel, spans_url, queue_capacity=100, batch_size=10, + flush_interval=DEFAULT_FLUSH_INTERVAL, io_loop=None, error_reporter=None, + metrics=None, metrics_factory=None, **kwargs): + self.spans_url = spans_url + self.headers = kwargs.get('headers', {}) + QueueReporter.__init__(self, channel, queue_capacity, batch_size, flush_interval, + io_loop, error_reporter, metrics, metrics_factory, **kwargs) + + def create_agent(self, channel): + pass + + def make_batch(self, spans, process): + return zipkin_v2.make_zipkin_v2_batch(spans=spans, process=process) + + @tornado.gen.coroutine + def _send(self, batch): + """ + Send batch of spans out via AsyncHTTPClient. Any exceptions thrown + will be caught above in the exception handler of _submit(). + """ + client = tornado.httpclient.AsyncHTTPClient() + headers = {'content-type': 'application/json'} + if self.headers: + headers.update(self.headers) + + request = tornado.httpclient.HTTPRequest( + method='POST', + url=self.spans_url, + headers=headers, + body=json.dumps(batch) + ) + client.fetch(request) diff --git a/jaeger_client/zipkin_v2.py b/jaeger_client/zipkin_v2.py new file mode 100644 index 00000000..2322cb46 --- /dev/null +++ b/jaeger_client/zipkin_v2.py @@ -0,0 +1,161 @@ +# Copyright (c) 2018 Uber Technologies, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from binascii import hexlify +from struct import pack +import json + +from opentracing.ext import tags as ext_tags + +from jaeger_client.thrift_gen.jaeger.ttypes import TagType +from .thrift import timestamp_micros +from . import constants + + +def make_zipkin_v2_batch(spans=None, process=None): + batched_spans = [] + for span in spans: + with span.update_lock: + v2_span = to_zipkin_v2_span(span, process) + batched_spans.append(v2_span) + return batched_spans + + +def to_zipkin_v2_span(span, process): + """Converts jaeger thrift span and process information to Zipkin's API v2 span format""" + v2_span = dict( + traceId=to_encoded_id(span.trace_id), + id=to_encoded_id(span.span_id), + timestamp=timestamp_micros(span.start_time), + debug=span.is_debug(), + ) + + if span.end_time: + v2_span['duration'] = timestamp_micros(span.end_time - span.start_time) + + name = getattr(span, 'operation_name', '') + if name: + v2_span['name'] = name + + parent_id = getattr(span, 'parent_id', 0) + if parent_id: + v2_span['parentId'] = to_encoded_id(parent_id) + + kind = get_span_kind(span) + if kind: + v2_span['kind'] = kind + + local_endpoint = get_local_endpoint(process) + if local_endpoint: + v2_span['localEndpoint'] = local_endpoint + + remote_endpoint = get_remote_endpoint(process) + if remote_endpoint: + v2_span['remoteEndpoint'] = remote_endpoint + + tags = format_span_tags(span) + if tags: + v2_span['tags'] = tags + + annotations = [log_to_annotation(l) for l in span.logs] + if annotations: + v2_span['annotations'] = annotations + + return v2_span + + +def to_encoded_id(jaeger_id): + return hexlify(pack('>Q', jaeger_id)) + + +def get_span_kind(span): + kind = None + for tag in span.tags: + if tag.key == ext_tags.SPAN_KIND: + kind = {ext_tags.SPAN_KIND_RPC_CLIENT: 'CLIENT', + ext_tags.SPAN_KIND_RPC_SERVER: 'SERVER'}[tag.vStr] + return kind + + +ttype_attr_map = {TagType.BINARY: 'vBinary', TagType.BOOL: 'vBool', TagType.DOUBLE: 'vDouble', + TagType.LONG: 'vLong', TagType.STRING: 'vStr'} + + +def get_tag_value(tag): + attr = ttype_attr_map[tag.vType] + return getattr(tag, attr, None) + + +def get_local_endpoint(process): + local_endpoint = {} + service_name = getattr(process, 'serviceName', '') + if service_name: + local_endpoint['serviceName'] = service_name + if process.tags: + for tag in process.tags: + if tag.key == constants.JAEGER_IP_TAG_KEY: + ipv4 = get_tag_value(tag) + if ipv4: + local_endpoint['ipv4'] = ipv4 + break + return local_endpoint + + +def get_remote_endpoint(process): + tag_key_map = [(ext_tags.PEER_SERVICE, 'serviceName'), (ext_tags.PEER_HOST_IPV4, 'ipv4'), + (ext_tags.PEER_HOST_IPV6, 'ipv6'), (ext_tags.PEER_PORT, 'port')] + + remote_endpoint = {} + if process.tags: + for tag in process.tags: + for tag_key, endpoint_key in list(tag_key_map): + if tag.key == tag_key: + tag_key_map.remove((tag_key, endpoint_key)) + value = get_tag_value(tag) + if value: + if tag.key == ext_tags.PEER_PORT: + value = int(value) + remote_endpoint[endpoint_key] = value + continue + if not tag_key_map: + break + + return remote_endpoint + + +redundant_tags = (ext_tags.PEER_HOST_IPV4, ext_tags.PEER_HOST_IPV6, ext_tags.PEER_PORT, + ext_tags.PEER_SERVICE, ext_tags.SPAN_KIND, ext_tags.SPAN_KIND_RPC_CLIENT, + ext_tags.SPAN_KIND_RPC_SERVER) + + +def format_span_tags(span): + formatted = {} + for tag in span.tags: + if tag.key not in redundant_tags: + k, v = format_span_tag(tag) + if v: + formatted[k] = v + return formatted + + +def format_span_tag(tag): + key = tag.key + value = get_tag_value(tag) + return key, value + + +def log_to_annotation(log): + annotation = dict(timestamp=log.timestamp) + tags = dict([format_span_tag(t) for t in log.fields]) + annotation['value'] = json.dumps(tags) + return annotation diff --git a/tests/test_config.py b/tests/test_config.py index fa75cca9..7c7fd380 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -144,3 +144,22 @@ def test_initialize_tracer(self): tracer = c.initialize_tracer() assert opentracing.tracer == tracer + + def test_reporter_type(self): + c = Config({'reporter_type': 'JaEGer'}, service_name='x') + assert c.reporter_type == 'jaeger' + c = Config({'reporter_type': 'ZiPKIn_V2'}, service_name='x') + assert c.reporter_type == 'zipkin_v2' + + def test_headers(self): + c = Config({}, service_name='x') + assert c.headers == {} + cfg_dict = {'headers': {'HeaderOne': 'OneVal', 'HeaderTwo': 'TwoVal'}} + c = Config(cfg_dict, service_name='x') + assert c.headers == cfg_dict['headers'] + + def test_zipkin_spans_url(self): + c = Config({}, service_name='x') + assert c.zipkin_spans_url == 'http://localhost:9411/api/v2/spans' + c = Config({'zipkin_spans_url': 'someurl'}, service_name='x') + assert c.zipkin_spans_url == 'someurl' diff --git a/tests/test_reporter.py b/tests/test_reporter.py index b7e8e69e..d166985a 100644 --- a/tests/test_reporter.py +++ b/tests/test_reporter.py @@ -30,7 +30,7 @@ from jaeger_client.utils import ErrorReporter from tornado.ioloop import IOLoop from tornado.testing import AsyncTestCase, gen_test -from jaeger_client.reporter import Reporter +from jaeger_client.reporter import ThriftReporter from jaeger_client.ioloop_util import future_result @@ -132,7 +132,7 @@ def _incr_count(self, key, value): self.counters[key] = value + self.counters.get(key, 0) -class ReporterTest(AsyncTestCase): +class ThriftReporterTest(AsyncTestCase): @pytest.yield_fixture def thread_loop(self): yield @@ -149,13 +149,13 @@ def _new_span(name): @staticmethod def _new_reporter(batch_size, flush=None, queue_cap=100): - reporter = Reporter(channel=mock.MagicMock(), - io_loop=IOLoop.current(), - batch_size=batch_size, - flush_interval=flush, - metrics_factory=FakeMetricsFactory(), - error_reporter=HardErrorReporter(), - queue_capacity=queue_cap) + reporter = ThriftReporter(channel=mock.MagicMock(), + io_loop=IOLoop.current(), + batch_size=batch_size, + flush_interval=flush, + metrics_factory=FakeMetricsFactory(), + error_reporter=HardErrorReporter(), + queue_capacity=queue_cap) reporter.set_process('service', {}, max_length=0) sender = FakeSender() reporter._send = sender diff --git a/tests/test_zipkin_v2.py b/tests/test_zipkin_v2.py new file mode 100644 index 00000000..b54e355e --- /dev/null +++ b/tests/test_zipkin_v2.py @@ -0,0 +1,160 @@ +# Copyright (c) 2018 Uber Technologies, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +from time import time +import json + +import mock + +from jaeger_client.zipkin_v2 import make_zipkin_v2_batch, to_zipkin_v2_span +from jaeger_client import Span, SpanContext, ConstSampler +import jaeger_client.thrift_gen.jaeger.ttypes as ttypes +from jaeger_client.constants import JAEGER_IP_TAG_KEY +from jaeger_client.thrift import timestamp_micros +from opentracing.ext import tags as ext_tags +from jaeger_client import thrift + + +def to_tag(key, value): + return thrift.make_string_tag(key=key, value=value, max_length=100) + + +def test_to_v2_span_no_parent_nor_duration_nor_tags_nor_annotations(tracer): + start_time = time() + ctx = SpanContext(trace_id=1, span_id=1, parent_id=None, flags=1) + span = Span(context=ctx, operation_name='operation', start_time=start_time, tracer=tracer) + process = ttypes.Process(serviceName='x') + zip_span = to_zipkin_v2_span(span, process) + expected = dict(debug=False, id='0000000000000001', + traceId='0000000000000001', + localEndpoint=dict(serviceName='x'), + name='operation', timestamp=timestamp_micros(start_time)) + assert zip_span == expected + + +def test_to_v2_span_with_parent_no_duration_nor_tags_nor_annotations(tracer): + start_time = time() + ctx = SpanContext(trace_id=1, span_id=2, parent_id=1, flags=1) + span = Span(context=ctx, operation_name='operation', start_time=start_time, tracer=tracer) + process = ttypes.Process(serviceName='x', tags=[to_tag(JAEGER_IP_TAG_KEY, '127.0.0.1')]) + zip_span = to_zipkin_v2_span(span, process) + expected = dict(debug=False, id='0000000000000002', + parentId='0000000000000001', traceId='0000000000000001', + localEndpoint=dict(serviceName='x', ipv4='127.0.0.1'), + name='operation', timestamp=timestamp_micros(start_time)) + assert zip_span == expected + + +def test_to_v2_span_with_parent_and_duration_and_tags_no_annotations(tracer): + start_time = time() + ctx = SpanContext(trace_id=1, span_id=2, parent_id=1, flags=1) + span = Span(context=ctx, operation_name='operation', start_time=start_time, tracer=tracer) + span.set_tag('SomeKey', '123123') + span.set_tag(ext_tags.SPAN_KIND, ext_tags.SPAN_KIND_RPC_CLIENT) + end_time = start_time + 20 + span.end_time = end_time + process = ttypes.Process(serviceName='x', tags=[to_tag(JAEGER_IP_TAG_KEY, '127.0.0.1')]) + zip_span = to_zipkin_v2_span(span, process) + expected = dict(kind='CLIENT', debug=False, id='0000000000000002', + parentId='0000000000000001', traceId='0000000000000001', + localEndpoint=dict(serviceName='x', ipv4='127.0.0.1'), + name='operation', timestamp=timestamp_micros(start_time), + duration=20000000, tags=dict(SomeKey='123123')) + assert zip_span == expected + + +def test_to_v2_span_with_parent_and_duration_and_tags_and_annotations(tracer): + start_time = time() + ctx = SpanContext(trace_id=18446744073709551614, span_id=18446744073709551615, + parent_id=18446744073709551614, flags=1) + span = Span(context=ctx, operation_name='operation', start_time=start_time, tracer=tracer) + span.set_tag('SomeKey', '123123') + span.set_tag(ext_tags.SPAN_KIND, ext_tags.SPAN_KIND_RPC_SERVER) + event_time = time() + 3 + span.log_kv({'SomeTagKey': 'SomeTagValue'}, timestamp=event_time) + end_time = start_time + 20 + span.end_time = end_time + process = ttypes.Process(serviceName='x', tags=[to_tag(JAEGER_IP_TAG_KEY, '127.0.0.1')]) + zip_span = to_zipkin_v2_span(span, process) + expected = dict(kind='SERVER', debug=False, id='ffffffffffffffff', + parentId='fffffffffffffffe', traceId='fffffffffffffffe', + localEndpoint=dict(serviceName='x', ipv4='127.0.0.1'), + name='operation', timestamp=timestamp_micros(start_time), + duration=20000000, tags=dict(SomeKey='123123'), + annotations=[dict(timestamp=timestamp_micros(event_time), + value=json.dumps(dict(SomeTagKey='SomeTagValue')))]) + assert zip_span == expected + + +def test_to_v2_span_remote_with_parent_and_duration_and_tags_and_annotations(tracer): + start_time = time() + ctx = SpanContext(trace_id=9205364235243340812, span_id=9830726033745040073, + parent_id=9205364235243340812, flags=1) + span = Span(context=ctx, operation_name='operation', start_time=start_time, tracer=tracer) + span.set_tag('SomeKey', '123123') + event_time = time() + 3 + span.log_kv({'SomeTagKey': 'SomeTagValue1'}, timestamp=event_time) + span.log_kv({'SomeOtherTagKey': 'SomeTagValue2'}, timestamp=event_time + 1) + span.log_kv({'SomeAdditionalTagKey': 'SomeTagValue3'}, timestamp=event_time + 2) + end_time = start_time + 20 + span.end_time = end_time + process = ttypes.Process(serviceName='x', tags=[ + to_tag(JAEGER_IP_TAG_KEY, '127.0.0.1'), + to_tag(ext_tags.PEER_HOST_IPV4, '192.168.34.1'), + to_tag(ext_tags.PEER_HOST_IPV6, '::1'), + to_tag(ext_tags.PEER_SERVICE, 'remote_operation'), + to_tag(ext_tags.PEER_PORT, 99776), + to_tag('SomeOtherTag', 'TagValue') + ]) + + def is_debug(*args): # simultaneously sampled and debug + return True + + span.is_debug = is_debug + zip_span = to_zipkin_v2_span(span, process) + + expected = dict(debug=True, id='886dc13e058ed2c9', + parentId='7fc005fff5c3c40c', traceId='7fc005fff5c3c40c', + localEndpoint=dict(serviceName='x', ipv4='127.0.0.1'), + remoteEndpoint=dict(serviceName='remote_operation', + ipv4='192.168.34.1', ipv6='::1', + port=99776), + name='operation', timestamp=timestamp_micros(start_time), + duration=20000000, tags=dict(SomeKey='123123'), + annotations=[ + dict(timestamp=timestamp_micros(event_time), + value=json.dumps(dict(SomeTagKey='SomeTagValue1'))), + dict(timestamp=timestamp_micros(event_time + 1), + value=json.dumps(dict(SomeOtherTagKey='SomeTagValue2'))), + dict(timestamp=timestamp_micros(event_time + 2), + value=json.dumps(dict(SomeAdditionalTagKey='SomeTagValue3'))), + ]) + assert zip_span == expected + + +def test_make_zipkin_v2_batch(tracer): + ctx_one = SpanContext(trace_id=1, span_id=1, parent_id=None, flags=1) + span_one = Span(context=ctx_one, operation_name='operation', start_time=time(), tracer=tracer) + ctx_two = SpanContext(trace_id=1, span_id=2, parent_id=1, flags=1) + span_two = Span(context=ctx_two, operation_name='operation', start_time=time(), tracer=tracer) + ctx_three = SpanContext(trace_id=1, span_id=3, parent_id=2, flags=1) + span_three = Span(context=ctx_three, operation_name='operation', start_time=time(), tracer=tracer) + process = ttypes.Process(serviceName='x', tags=[to_tag(JAEGER_IP_TAG_KEY, '127.0.0.1')]) + batch = make_zipkin_v2_batch([span_one, span_two, span_three], process) + assert len(batch) == 3 + assert batch[0]['id'] == '0000000000000001' + assert batch[1]['id'] == '0000000000000002' + assert batch[2]['id'] == '0000000000000003'