From 52c8f7872ebf14e697501756addc366f6d4079d7 Mon Sep 17 00:00:00 2001 From: Flier Lu Date: Fri, 11 Aug 2017 13:49:54 +0800 Subject: [PATCH] support Zipkin v2 Span format, resolve #11 --- src/Collector.cpp | 19 +++- src/Collector.h | 9 ++ src/Span.cpp | 7 +- src/Span.h | 224 +++++++++++++++++++++++++++++++++++++++++++++- test/TestSpan.cpp | 62 ++++++++++++- 5 files changed, 309 insertions(+), 12 deletions(-) diff --git a/src/Collector.cpp b/src/Collector.cpp index b887764..888fa49 100644 --- a/src/Collector.cpp +++ b/src/Collector.cpp @@ -21,6 +21,8 @@ namespace zipkin std::shared_ptr MessageCodec::binary(new BinaryCodec()); std::shared_ptr MessageCodec::json(new JsonCodec()); std::shared_ptr MessageCodec::pretty_json(new PrettyJsonCodec()); +std::shared_ptr MessageCodec::json_v2(new JsonCodec(2)); +std::shared_ptr MessageCodec::pretty_json_v2(new PrettyJsonCodec(2)); CompressionCodec parse_compression_codec(const std::string &codec) { @@ -57,7 +59,10 @@ std::shared_ptr MessageCodec::parse(const std::string &codec) return json; if (codec == "pretty_json") return pretty_json; - + if (codec == "json_v2") + return json_v2; + if (codec == "pretty_json_v2") + return pretty_json_v2; return nullptr; } @@ -85,7 +90,11 @@ size_t JsonCodec::encode(boost::shared_ptrserialize_json(writer); + if (m_format_version == 2) { + span->serialize_json_v2(writer); + } else { + span->serialize_json(writer); + } } writer.EndArray(); @@ -105,7 +114,11 @@ size_t PrettyJsonCodec::encode(boost::shared_ptrserialize_json(writer); + if (m_format_version == 2) { + span->serialize_json_v2(writer); + } else { + span->serialize_json(writer); + } } writer.EndArray(); diff --git a/src/Collector.h b/src/Collector.h index 83938c3..79f76df 100644 --- a/src/Collector.h +++ b/src/Collector.h @@ -4,6 +4,7 @@ #include #include #include + #include #include @@ -54,6 +55,8 @@ class MessageCodec static std::shared_ptr binary; static std::shared_ptr json; static std::shared_ptr pretty_json; + static std::shared_ptr json_v2; + static std::shared_ptr pretty_json_v2; }; /** @@ -74,7 +77,10 @@ class BinaryCodec : public MessageCodec */ class JsonCodec : public MessageCodec { + int m_format_version; public: + JsonCodec(int format_version = 1) : m_format_version(format_version) {} + virtual const std::string name(void) const override { return "json"; } virtual const std::string mime_type(void) const override { return "application/json"; } @@ -87,7 +93,10 @@ class JsonCodec : public MessageCodec */ class PrettyJsonCodec : public MessageCodec { + int m_format_version; public: + PrettyJsonCodec(int format_version = 1) : m_format_version(format_version) {} + virtual const std::string name(void) const override { return "pretty_json"; } virtual const std::string mime_type(void) const override { return "application/json"; } diff --git a/src/Span.cpp b/src/Span.cpp index 771e6c5..b6d3ab5 100644 --- a/src/Span.cpp +++ b/src/Span.cpp @@ -152,7 +152,7 @@ const char *to_string(AnnotationType type) } } -void Span::reset(const std::string &name, span_id_t parent_id, userdata_t userdata, bool sampled) +void Span::reset(const std::string &name, span_id_t parent_id, userdata_t userdata, bool sampled, bool shared) { m_span.debug = false; m_span.duration = 0; @@ -178,6 +178,9 @@ void Span::reset(const std::string &name, span_id_t parent_id, userdata_t userda m_userdata = userdata; m_sampled = sampled; + m_shared = shared; + m_local_endpoint.reset(); + m_remote_endpoint.reset(); } void Span::submit(void) @@ -340,4 +343,4 @@ void CachedSpan::release(void) } } -} // namespace zipkin \ No newline at end of file +} // namespace zipkin diff --git a/src/Span.h b/src/Span.h index fe6d076..cbd4a94 100644 --- a/src/Span.h +++ b/src/Span.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -543,6 +544,8 @@ class Span ::Span m_span; userdata_t m_userdata; bool m_sampled; + bool m_shared; + std::shared_ptr m_local_endpoint, m_remote_endpoint; static const ::Endpoint host(const Endpoint *endpoint); @@ -550,16 +553,16 @@ class Span /** * \brief Construct a span */ - Span(Tracer *tracer, const std::string &name, span_id_t parent_id = 0, userdata_t userdata = nullptr, bool sampled = true) + Span(Tracer *tracer, const std::string &name, span_id_t parent_id = 0, userdata_t userdata = nullptr, bool sampled = true, bool shared = false) : m_tracer(tracer) { - reset(name, parent_id, userdata, sampled); + reset(name, parent_id, userdata, sampled, shared); } /** * \brief Reset a span */ - void reset(const std::string &name, span_id_t parent_id = 0, userdata_t userdata = nullptr, bool sampled = true); + void reset(const std::string &name, span_id_t parent_id = 0, userdata_t userdata = nullptr, bool sampled = true, bool shared = false); /** * \brief Submit a Span to Tracer @@ -730,6 +733,41 @@ class Span return *this; } + /** + * \brief Report the span is sharing between the client and the server. + */ + inline bool shared(void) const { return m_shared; } + + /** \sa Span#shared */ + inline Span &with_shared(bool shared = true) + { + m_shared = shared; + return *this; + } + + /** + * \brief Local endpoint + */ + inline std::shared_ptr local_endpoint(void) const { return m_local_endpoint; } + + /** \sa Span#local_endpoint */ + inline Span &with_local_endpoint(std::shared_ptr local_endpoint) + { + m_local_endpoint = local_endpoint; + return *this; + } + /** + * \brief Remote endpoint + */ + inline std::shared_ptr remote_endpoint(void) const { return m_remote_endpoint; } + + /** \sa Span#remote_endpoint */ + inline Span &with_remote_endpoint(std::shared_ptr remote_endpoint) + { + remote_endpoint = remote_endpoint; + return *this; + } + virtual inline Span *span(const std::string &name, userdata_t userdata = nullptr) const { Span *span = new Span(m_tracer, name, id(), userdata); @@ -916,6 +954,9 @@ class Span template void serialize_json(RapidJsonWriter &writer) const; + template + void serialize_json_v2(RapidJsonWriter &writer) const; + class Scope { Span &m_span; @@ -1375,4 +1416,179 @@ void Span::serialize_json(RapidJsonWriter &writer) const writer.EndObject(); } -} // namespace zipkin \ No newline at end of file +template +void Span::serialize_json_v2(RapidJsonWriter &writer) const +{ + auto serialize_endpoint = [&writer](const Endpoint *host) { + writer.StartObject(); + + writer.Key("serviceName"); + writer.String(host->service_name()); + + writer.Key("ipv4"); + writer.String(host->addr().to_v4().to_string()); + + writer.Key("port"); + writer.Int(host->port()); + + writer.EndObject(); + }; + + auto serialize_value = [&writer](const std::string &data, AnnotationType type) { + std::ostringstream oss; + + switch (type) + { + case AnnotationType::BOOL: + oss << *reinterpret_cast(data.c_str()); + break; + + case AnnotationType::I16: + oss << __impl::big_to_native(*reinterpret_cast(data.c_str())); + break; + + case AnnotationType::I32: + oss << __impl::big_to_native(*reinterpret_cast(data.c_str())); + break; + + case AnnotationType::I64: + oss << __impl::big_to_native(*reinterpret_cast(data.c_str())); + break; + + case AnnotationType::DOUBLE: + oss << *reinterpret_cast(data.c_str()); + break; + + case AnnotationType::BYTES: + oss << base64::encode(data); + break; + + case AnnotationType::STRING: + oss << data; + break; + } + + writer.String(oss.str()); + }; + + char str[64]; + + writer.StartObject(); + + writer.Key("traceId"); + if (m_span.trace_id_high) + { + writer.String(str, snprintf(str, sizeof(str), SPAN_ID_FMT SPAN_ID_FMT, m_span.trace_id_high, m_span.trace_id)); + } + else + { + writer.String(str, snprintf(str, sizeof(str), "0000000000000000" SPAN_ID_FMT, m_span.trace_id)); + } + + writer.Key("name"); + writer.String(m_span.name); + + writer.Key("id"); + writer.String(str, snprintf(str, sizeof(str), SPAN_ID_FMT, m_span.id)); + + if (m_span.__isset.parent_id) + { + writer.Key("parentId"); + writer.String(str, snprintf(str, sizeof(str), SPAN_ID_FMT, m_span.parent_id)); + } + + for (auto &annotation : m_span.annotations) + { + if (annotation.value == TraceKeys::CLIENT_SEND || annotation.value == TraceKeys::CLIENT_RECV) { + writer.Key("kind"); + writer.String("CLIENT"); + break; + } + if (annotation.value == TraceKeys::SERVER_SEND || annotation.value == TraceKeys::SERVER_RECV) { + writer.Key("kind"); + writer.String("SERVER"); + break; + } + } + + if (m_span.__isset.timestamp) + { + writer.Key("timestamp"); + writer.Int64(m_span.timestamp); + } + + if (m_span.__isset.duration) + { + writer.Key("duration"); + writer.Int64(m_span.duration); + } + + auto local_endpoint = m_local_endpoint; + auto remote_endpoint = m_remote_endpoint; + + for (auto &annotation : m_span.binary_annotations) + { + if (!local_endpoint && annotation.value == TraceKeys::CLIENT_ADDR && + (annotation.host.__isset.ipv4 || annotation.host.__isset.ipv6)) { + local_endpoint.reset(new Endpoint(annotation.host)); + } + if (!remote_endpoint && annotation.value == TraceKeys::SERVER_ADDR && + (annotation.host.__isset.ipv4 || annotation.host.__isset.ipv6)) { + remote_endpoint.reset(new Endpoint(annotation.host)); + } + } + + if (local_endpoint) { + writer.Key("localEndpoint"); + serialize_endpoint(m_local_endpoint.get()); + } + + if (remote_endpoint) { + writer.Key("remoteEndpoint"); + serialize_endpoint(m_remote_endpoint.get()); + } + + writer.Key("annotations"); + writer.StartArray(); + + for (auto &annotation : m_span.annotations) + { + writer.StartObject(); + + writer.Key("timestamp"); + writer.Int64(annotation.timestamp); + + writer.Key("value"); + writer.String(annotation.value); + + writer.EndObject(); + } + + writer.EndArray(m_span.annotations.size()); + + writer.Key("tags"); + writer.StartObject(); + + for (auto &annotation : m_span.binary_annotations) + { + writer.Key(annotation.key.c_str()); + serialize_value(annotation.value, annotation.annotation_type); + } + + writer.EndObject(); + + if (m_span.debug) + { + writer.Key("debug"); + writer.Bool(m_span.debug); + } + + if (m_shared) { + writer.Key("shared"); + writer.Bool(m_shared); + } + + writer.EndObject(); +} + +} // namespace zipkin diff --git a/test/TestSpan.cpp b/test/TestSpan.cpp index 2c44df7..03d9a8a 100644 --- a/test/TestSpan.cpp +++ b/test/TestSpan.cpp @@ -245,6 +245,31 @@ static const char *json_template = R"###({ "timestamp": %lld })###"; +static const char *json_v2_template = R"###({ + "traceId": "%016llx%016llx", + "name": "test", + "id": "%016llx", + "parentId": "%016llx", + "kind": "CLIENT", + "timestamp": %lld, + "annotations": [ + { + "timestamp": %lld, + "value": "cs" + } + ], + "tags": { + "sa": "8.8.8.8", + "bool": "1", + "i16": "123", + "i32": "123", + "i64": "123", + "double": "12.3", + "string": "测试", + "bytes": "AQID" + } +})###"; + TEST(span, serialize_json) { MockTracer tracer; @@ -274,8 +299,39 @@ TEST(span, serialize_json) span.serialize_json(writer); char str[2048] = {0}; - int str_len = snprintf(str, sizeof(str), json_template, span.trace_id_high(), span.trace_id(), span.id(), span.parent_id(), - span.message().annotations[0].timestamp, span.message().timestamp); + int str_len = snprintf(str, sizeof(str), json_template, span.trace_id_high(), span.trace_id(), span.id(), span.parent_id(), span.message().annotations[0].timestamp, span.message().timestamp); + + ASSERT_EQ(std::string(buffer.GetString(), buffer.GetSize()), std::string(str, str_len)); +} + +TEST(span, serialize_json_v2) +{ + MockTracer tracer; + + zipkin::Span span(&tracer, "test", zipkin::Span::next_id()); + zipkin::Endpoint host("host", "127.0.0.1", 80); + zipkin::Endpoint remote("remote", "8.8.8.8"); + + span.client_send(&host); + span.server_addr("8.8.8.8", &remote); + span.annotate("bool", true, &host); + span.annotate("i16", (int16_t)123); + span.annotate("i32", (int32_t)123); + span.annotate("i64", (int64_t)123); + span.annotate("double", 12.3); + span.annotate("string", std::wstring(L"测试")); + + uint8_t bytes[] = {1, 2, 3}; + + span.annotate("bytes", bytes); + + rapidjson::StringBuffer buffer; + rapidjson::PrettyWriter writer(buffer); + + span.serialize_json_v2(writer); + + char str[2048] = {0}; + int str_len = snprintf(str, sizeof(str), json_v2_template, span.trace_id_high(), span.trace_id(), span.id(), span.parent_id(), span.message().timestamp, span.message().annotations[0].timestamp); ASSERT_EQ(std::string(buffer.GetString(), buffer.GetSize()), std::string(str, str_len)); } @@ -327,4 +383,4 @@ TEST(span, annotate_stream) ASSERT_EQ(span.message().annotations.size(), 2); ASSERT_EQ(span.message().binary_annotations.size(), 8); -} \ No newline at end of file +}