Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zipkin exporter v2 api support for protobuf format #1318

Merged
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a756db0
Zipkin exporter v2 api support for protobuf format
robwknox Nov 2, 2020
0197866
adding env var OTEL_EXPORTER_ZIPKIN_TRANSPORT_FORMAT and tidying up T…
robwknox Nov 2, 2020
131cbea
lint corrections
robwknox Nov 2, 2020
bd7eef7
moving auto-gen'd files to separate dir in order to add exclusion ent…
robwknox Nov 2, 2020
a8a2407
flake8 config update to exclude new auto-gen files
robwknox Nov 3, 2020
f9c4ff8
pylint hints to help with auto-gen protobuf objects
robwknox Nov 3, 2020
d74518d
Merge branch 'master' into 1175_zipkin_exporter_v2_protobuf_support
robwknox Nov 3, 2020
8ce0dc9
documentation update
robwknox Nov 3, 2020
b6d7e6b
switching exporter to use OTEL Configuration class instead of direct …
robwknox Nov 3, 2020
17bf5f7
refactor of SPAN_KIND map structure and loosening of allowed protobuf…
robwknox Nov 3, 2020
980b028
pylint disable directive
robwknox Nov 3, 2020
c6ce2a4
docstring format correction
robwknox Nov 3, 2020
8cb1c60
Merge branch 'master' into 1175_zipkin_exporter_v2_protobuf_support
robwknox Nov 3, 2020
1bcc834
Merge branch 'master' into 1175_zipkin_exporter_v2_protobuf_support
robwknox Nov 6, 2020
bb2fec4
Merge branch 'master' into 1175_zipkin_exporter_v2_protobuf_support
robwknox Nov 9, 2020
4c50f2c
Merge branch 'master' into 1175_zipkin_exporter_v2_protobuf_support
robwknox Nov 13, 2020
d440819
changelog
robwknox Nov 16, 2020
9b0f8a3
Merge branch 'master' into 1175_zipkin_exporter_v2_protobuf_support
robwknox Nov 17, 2020
b1c95f6
Merge branch 'master' into 1175_zipkin_exporter_v2_protobuf_support
lzchen Nov 18, 2020
d825b64
Merge branch 'master' into 1175_zipkin_exporter_v2_protobuf_support
lzchen Nov 19, 2020
05ef05b
Merge branch 'master' into 1175_zipkin_exporter_v2_protobuf_support
robwknox Nov 19, 2020
5fa2da0
Merge branch 'master' into 1175_zipkin_exporter_v2_protobuf_support
lzchen Nov 23, 2020
78d6163
Merge branch 'master' into 1175_zipkin_exporter_v2_protobuf_support
lzchen Nov 23, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ exclude =
__pycache__
exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/gen/
exporter/opentelemetry-exporter-jaeger/build/*
exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/gen
docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/
docs/examples/opentelemetry-example-app/build/*
opentelemetry-proto/build/*
Expand Down
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ contextmanager-decorators=contextlib.contextmanager
# List of members which are set dynamically and missed by pylint inference
# system, and so shouldn't trigger E1101 when accessed. Python regular
# expressions are accepted.
generated-members=
generated-members=zipkin_pb2.*

# Tells whether missing members accessed in mixin class should be ignored. A
# mixin class is detected if its name ends with "mixin" (case insensitive).
Expand Down
2 changes: 2 additions & 0 deletions exporter/opentelemetry-exporter-zipkin/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- Support for v2 api protobuf format ([#1318](https://github.com/open-telemetry/opentelemetry-python/pull/1318))

## Version 0.14b0

Released 2020-10-13
Expand Down
1 change: 1 addition & 0 deletions exporter/opentelemetry-exporter-zipkin/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ package_dir=
=src
packages=find_namespace:
install_requires =
protobuf >= 3.12
requests ~= 2.7
opentelemetry-api == 0.16.dev0
opentelemetry-sdk == 0.16.dev0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
.. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/
.. _Specification: https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/sdk-environment-variables.md#zipkin-exporter

.. envvar:: OTEL_EXPORTER_ZIPKIN_ENDPOINT
.. envvar:: OTEL_EXPORTER_ZIPKIN_TRANSPORT_FORMAT

.. code:: python

from opentelemetry import trace
Expand Down Expand Up @@ -55,36 +58,54 @@
with tracer.start_as_current_span("foo"):
print("Hello world!")

The exporter supports endpoint configuration via the OTEL_EXPORTER_ZIPKIN_ENDPOINT environment variables as defined in the `Specification`_
The exporter supports the following environment variables for configuration:

:envvar:`OTEL_EXPORTER_ZIPKIN_ENDPOINT`: target to which the exporter will
send data. This may include a path (e.g. http://example.com:9411/api/v2/spans).

:envvar:`OTEL_EXPORTER_ZIPKIN_TRANSPORT_FORMAT`: transport interchange format
to use when sending data. Currently only Zipkin's v2 json and protobuf formats
are supported, with v2 json being the default.

API
---
"""

import json
import logging
import os
from typing import Optional, Sequence
from typing import Optional, Sequence, Union
from urllib.parse import urlparse

import requests

from opentelemetry.configuration import Configuration
from opentelemetry.exporter.zipkin.gen import zipkin_pb2
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from opentelemetry.trace import Span, SpanContext, SpanKind

TRANSPORT_FORMAT_JSON = "json"
TRANSPORT_FORMAT_PROTOBUF = "protobuf"

DEFAULT_RETRY = False
DEFAULT_URL = "http://localhost:9411/api/v2/spans"
DEFAULT_MAX_TAG_VALUE_LENGTH = 128
ZIPKIN_HEADERS = {"Content-Type": "application/json"}

SPAN_KIND_MAP = {
SPAN_KIND_MAP_JSON = {
SpanKind.INTERNAL: None,
SpanKind.SERVER: "SERVER",
SpanKind.CLIENT: "CLIENT",
SpanKind.PRODUCER: "PRODUCER",
SpanKind.CONSUMER: "CONSUMER",
}

SPAN_KIND_MAP_PROTOBUF = {
SpanKind.INTERNAL: zipkin_pb2.Span.Kind.SPAN_KIND_UNSPECIFIED,
SpanKind.SERVER: zipkin_pb2.Span.Kind.SERVER,
SpanKind.CLIENT: zipkin_pb2.Span.Kind.CLIENT,
SpanKind.PRODUCER: zipkin_pb2.Span.Kind.PRODUCER,
SpanKind.CONSUMER: zipkin_pb2.Span.Kind.CONSUMER,
}

SUCCESS_STATUS_CODES = (200, 202)

logger = logging.getLogger(__name__)
Expand All @@ -100,6 +121,7 @@ class ZipkinSpanExporter(SpanExporter):
ipv4: Primary IPv4 address associated with this connection.
ipv6: Primary IPv6 address associated with this connection.
retry: Set to True to configure the exporter to retry on failure.
transport_format: transport interchange format to use
"""

def __init__(
Expand All @@ -110,12 +132,13 @@ def __init__(
ipv6: Optional[str] = None,
retry: Optional[str] = DEFAULT_RETRY,
max_tag_value_length: Optional[int] = DEFAULT_MAX_TAG_VALUE_LENGTH,
transport_format: Union[
TRANSPORT_FORMAT_JSON, TRANSPORT_FORMAT_PROTOBUF, None
] = None,
):
self.service_name = service_name
if url is None:
self.url = os.environ.get(
"OTEL_EXPORTER_ZIPKIN_ENDPOINT", DEFAULT_URL
)
self.url = Configuration().EXPORTER_ZIPKIN_ENDPOINT or DEFAULT_URL
else:
self.url = url

Expand All @@ -126,10 +149,27 @@ def __init__(
self.retry = retry
self.max_tag_value_length = max_tag_value_length

if transport_format is None:
self.transport_format = (
Configuration().EXPORTER_ZIPKIN_TRANSPORT_FORMAT
or TRANSPORT_FORMAT_JSON
)
else:
self.transport_format = transport_format

def export(self, spans: Sequence[Span]) -> SpanExportResult:
zipkin_spans = self._translate_to_zipkin(spans)
if self.transport_format == TRANSPORT_FORMAT_JSON:
content_type = "application/json"
elif self.transport_format == TRANSPORT_FORMAT_PROTOBUF:
content_type = "application/x-protobuf"
else:
logger.error("Invalid transport format %s", self.transport_format)
return SpanExportResult.FAILURE

result = requests.post(
url=self.url, data=json.dumps(zipkin_spans), headers=ZIPKIN_HEADERS
url=self.url,
data=self._translate_to_transport_format(spans),
headers={"Content-Type": content_type},
)

if result.status_code not in SUCCESS_STATUS_CODES:
Expand All @@ -147,8 +187,14 @@ def export(self, spans: Sequence[Span]) -> SpanExportResult:
def shutdown(self) -> None:
pass

def _translate_to_zipkin(self, spans: Sequence[Span]):
def _translate_to_transport_format(self, spans: Sequence[Span]):
return (
self._translate_to_json(spans)
if self.transport_format == TRANSPORT_FORMAT_JSON
else self._translate_to_protobuf(spans)
)

def _translate_to_json(self, spans: Sequence[Span]):
local_endpoint = {"serviceName": self.service_name, "port": self.port}

if self.ipv4 is not None:
Expand All @@ -165,8 +211,8 @@ def _translate_to_zipkin(self, spans: Sequence[Span]):

# Timestamp in zipkin spans is int of microseconds.
# see: https://zipkin.io/pages/instrumenting.html
start_timestamp_mus = _nsec_to_usec_round(span.start_time)
duration_mus = _nsec_to_usec_round(span.end_time - span.start_time)
start_timestamp_mus = nsec_to_usec_round(span.start_time)
duration_mus = nsec_to_usec_round(span.end_time - span.start_time)

zipkin_span = {
# Ensure left-zero-padding of traceId, spanId, parentId
Expand All @@ -176,7 +222,7 @@ def _translate_to_zipkin(self, spans: Sequence[Span]):
"timestamp": start_timestamp_mus,
"duration": duration_mus,
"localEndpoint": local_endpoint,
"kind": SPAN_KIND_MAP[span.kind],
"kind": SPAN_KIND_MAP_JSON[span.kind],
"tags": self._extract_tags_from_span(span),
"annotations": self._extract_annotations_from_events(
span.events
Expand Down Expand Up @@ -211,7 +257,94 @@ def _translate_to_zipkin(self, spans: Sequence[Span]):
zipkin_span["parentId"] = format(span.parent.span_id, "016x")

zipkin_spans.append(zipkin_span)
return zipkin_spans

return json.dumps(zipkin_spans)
robwknox marked this conversation as resolved.
Show resolved Hide resolved

def _translate_to_protobuf(self, spans: Sequence[Span]):

local_endpoint = zipkin_pb2.Endpoint(
service_name=self.service_name, port=self.port
)

if self.ipv4 is not None:
local_endpoint.ipv4 = self.ipv4

if self.ipv6 is not None:
local_endpoint.ipv6 = self.ipv6

pbuf_spans = zipkin_pb2.ListOfSpans()

for span in spans:
context = span.get_span_context()
trace_id = context.trace_id.to_bytes(
length=16, byteorder="big", signed=False,
)
span_id = self.format_pbuf_span_id(context.span_id)

# Timestamp in zipkin spans is int of microseconds.
# see: https://zipkin.io/pages/instrumenting.html
start_timestamp_mus = nsec_to_usec_round(span.start_time)
duration_mus = nsec_to_usec_round(span.end_time - span.start_time)

# pylint: disable=no-member
pbuf_span = zipkin_pb2.Span(
trace_id=trace_id,
id=span_id,
name=span.name,
timestamp=start_timestamp_mus,
duration=duration_mus,
local_endpoint=local_endpoint,
kind=SPAN_KIND_MAP_PROTOBUF[span.kind],
tags=self._extract_tags_from_span(span),
)

annotations = self._extract_annotations_from_events(span.events)

if annotations is not None:
for annotation in annotations:
pbuf_span.annotations.append(
zipkin_pb2.Annotation(
timestamp=annotation["timestamp"],
value=annotation["value"],
)
)

if span.instrumentation_info is not None:
pbuf_span.tags.update(
{
"otel.instrumentation_library.name": span.instrumentation_info.name,
"otel.instrumentation_library.version": span.instrumentation_info.version,
}
)

if span.status is not None:
pbuf_span.tags.update(
{"otel.status_code": str(span.status.status_code.value)}
)
if span.status.description is not None:
pbuf_span.tags.update(
{"otel.status_description": span.status.description}
)

if context.trace_flags.sampled:
pbuf_span.debug = True

if isinstance(span.parent, Span):
pbuf_span.parent_id = self.format_pbuf_span_id(
span.parent.get_span_context().span_id
)
elif isinstance(span.parent, SpanContext):
pbuf_span.parent_id = self.format_pbuf_span_id(
span.parent.span_id
)

pbuf_spans.spans.append(pbuf_span)

return pbuf_spans.SerializeToString()

@staticmethod
def format_pbuf_span_id(span_id: int):
return span_id.to_bytes(length=8, byteorder="big", signed=False)

def _extract_tags_from_dict(self, tags_dict):
tags = {}
Expand Down Expand Up @@ -251,13 +384,13 @@ def _extract_annotations_from_events(self, events):

annotations.append(
{
"timestamp": _nsec_to_usec_round(event.timestamp),
"timestamp": nsec_to_usec_round(event.timestamp),
"value": json.dumps({event.name: attrs}),
}
)
return annotations


def _nsec_to_usec_round(nsec):
def nsec_to_usec_round(nsec):
"""Round nanoseconds to microseconds"""
return (nsec + 500) // 10 ** 3
Loading