Skip to content

Commit

Permalink
support Zipkin v2 Span format, resolve #11
Browse files Browse the repository at this point in the history
  • Loading branch information
flier committed Aug 11, 2017
1 parent dfa8ef5 commit 52c8f78
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 12 deletions.
19 changes: 16 additions & 3 deletions src/Collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ namespace zipkin
std::shared_ptr<BinaryCodec> MessageCodec::binary(new BinaryCodec());
std::shared_ptr<JsonCodec> MessageCodec::json(new JsonCodec());
std::shared_ptr<PrettyJsonCodec> MessageCodec::pretty_json(new PrettyJsonCodec());
std::shared_ptr<JsonCodec> MessageCodec::json_v2(new JsonCodec(2));
std::shared_ptr<PrettyJsonCodec> MessageCodec::pretty_json_v2(new PrettyJsonCodec(2));

CompressionCodec parse_compression_codec(const std::string &codec)
{
Expand Down Expand Up @@ -57,7 +59,10 @@ std::shared_ptr<MessageCodec> MessageCodec::parse(const std::string &codec)
return json;
if (codec == "pretty_json")
return pretty_json;

if (codec == "json_v2")
return json_v2;
if (codec == "pretty_json_v2")
return pretty_json_v2;
return nullptr;
}

Expand Down Expand Up @@ -85,7 +90,11 @@ size_t JsonCodec::encode(boost::shared_ptr<apache::thrift::transport::TMemoryBuf

for (auto &span : spans)
{
span->serialize_json(writer);
if (m_format_version == 2) {
span->serialize_json_v2(writer);
} else {
span->serialize_json(writer);
}
}

writer.EndArray();
Expand All @@ -105,7 +114,11 @@ size_t PrettyJsonCodec::encode(boost::shared_ptr<apache::thrift::transport::TMem

for (auto &span : spans)
{
span->serialize_json(writer);
if (m_format_version == 2) {
span->serialize_json_v2(writer);
} else {
span->serialize_json(writer);
}
}

writer.EndArray();
Expand Down
9 changes: 9 additions & 0 deletions src/Collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <atomic>
#include <thread>
#include <mutex>

#include <condition_variable>

#include <boost/lockfree/queue.hpp>
Expand Down Expand Up @@ -54,6 +55,8 @@ class MessageCodec
static std::shared_ptr<BinaryCodec> binary;
static std::shared_ptr<JsonCodec> json;
static std::shared_ptr<PrettyJsonCodec> pretty_json;
static std::shared_ptr<JsonCodec> json_v2;
static std::shared_ptr<PrettyJsonCodec> pretty_json_v2;
};

/**
Expand All @@ -74,7 +77,10 @@ class BinaryCodec : public MessageCodec
*/
class JsonCodec : public MessageCodec
{
int m_format_version;
public:
JsonCodec(int format_version = 1) : m_format_version(format_version) {}

virtual const std::string name(void) const override { return "json"; }

virtual const std::string mime_type(void) const override { return "application/json"; }
Expand All @@ -87,7 +93,10 @@ class JsonCodec : public MessageCodec
*/
class PrettyJsonCodec : public MessageCodec
{
int m_format_version;
public:
PrettyJsonCodec(int format_version = 1) : m_format_version(format_version) {}

virtual const std::string name(void) const override { return "pretty_json"; }

virtual const std::string mime_type(void) const override { return "application/json"; }
Expand Down
7 changes: 5 additions & 2 deletions src/Span.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ const char *to_string(AnnotationType type)
}
}

void Span::reset(const std::string &name, span_id_t parent_id, userdata_t userdata, bool sampled)
void Span::reset(const std::string &name, span_id_t parent_id, userdata_t userdata, bool sampled, bool shared)
{
m_span.debug = false;
m_span.duration = 0;
Expand All @@ -178,6 +178,9 @@ void Span::reset(const std::string &name, span_id_t parent_id, userdata_t userda

m_userdata = userdata;
m_sampled = sampled;
m_shared = shared;
m_local_endpoint.reset();
m_remote_endpoint.reset();
}

void Span::submit(void)
Expand Down Expand Up @@ -340,4 +343,4 @@ void CachedSpan::release(void)
}
}

} // namespace zipkin
} // namespace zipkin
224 changes: 220 additions & 4 deletions src/Span.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <locale>
#include <memory>
#include <chrono>
#include <sstream>

#include <boost/locale/encoding_utf.hpp>
#include <boost/asio.hpp>
Expand Down Expand Up @@ -543,23 +544,25 @@ class Span
::Span m_span;
userdata_t m_userdata;
bool m_sampled;
bool m_shared;
std::shared_ptr<Endpoint> m_local_endpoint, m_remote_endpoint;

static const ::Endpoint host(const Endpoint *endpoint);

public:
/**
* \brief Construct a span
*/
Span(Tracer *tracer, const std::string &name, span_id_t parent_id = 0, userdata_t userdata = nullptr, bool sampled = true)
Span(Tracer *tracer, const std::string &name, span_id_t parent_id = 0, userdata_t userdata = nullptr, bool sampled = true, bool shared = false)
: m_tracer(tracer)
{
reset(name, parent_id, userdata, sampled);
reset(name, parent_id, userdata, sampled, shared);
}

/**
* \brief Reset a span
*/
void reset(const std::string &name, span_id_t parent_id = 0, userdata_t userdata = nullptr, bool sampled = true);
void reset(const std::string &name, span_id_t parent_id = 0, userdata_t userdata = nullptr, bool sampled = true, bool shared = false);

/**
* \brief Submit a Span to Tracer
Expand Down Expand Up @@ -730,6 +733,41 @@ class Span
return *this;
}

/**
* \brief Report the span is sharing between the client and the server.
*/
inline bool shared(void) const { return m_shared; }

/** \sa Span#shared */
inline Span &with_shared(bool shared = true)
{
m_shared = shared;
return *this;
}

/**
* \brief Local endpoint
*/
inline std::shared_ptr<Endpoint> local_endpoint(void) const { return m_local_endpoint; }

/** \sa Span#local_endpoint */
inline Span &with_local_endpoint(std::shared_ptr<Endpoint> local_endpoint)
{
m_local_endpoint = local_endpoint;
return *this;
}
/**
* \brief Remote endpoint
*/
inline std::shared_ptr<Endpoint> remote_endpoint(void) const { return m_remote_endpoint; }

/** \sa Span#remote_endpoint */
inline Span &with_remote_endpoint(std::shared_ptr<Endpoint> remote_endpoint)
{
remote_endpoint = remote_endpoint;
return *this;
}

virtual inline Span *span(const std::string &name, userdata_t userdata = nullptr) const
{
Span *span = new Span(m_tracer, name, id(), userdata);
Expand Down Expand Up @@ -916,6 +954,9 @@ class Span
template <class RapidJsonWriter>
void serialize_json(RapidJsonWriter &writer) const;

template <class RapidJsonWriter>
void serialize_json_v2(RapidJsonWriter &writer) const;

class Scope
{
Span &m_span;
Expand Down Expand Up @@ -1375,4 +1416,179 @@ void Span::serialize_json(RapidJsonWriter &writer) const
writer.EndObject();
}

} // namespace zipkin
template <class RapidJsonWriter>
void Span::serialize_json_v2(RapidJsonWriter &writer) const
{
auto serialize_endpoint = [&writer](const Endpoint *host) {
writer.StartObject();

writer.Key("serviceName");
writer.String(host->service_name());

writer.Key("ipv4");
writer.String(host->addr().to_v4().to_string());

writer.Key("port");
writer.Int(host->port());

writer.EndObject();
};

auto serialize_value = [&writer](const std::string &data, AnnotationType type) {
std::ostringstream oss;

switch (type)
{
case AnnotationType::BOOL:
oss << *reinterpret_cast<const bool *>(data.c_str());
break;

case AnnotationType::I16:
oss << __impl::big_to_native(*reinterpret_cast<const uint16_t *>(data.c_str()));
break;

case AnnotationType::I32:
oss << __impl::big_to_native(*reinterpret_cast<const uint32_t *>(data.c_str()));
break;

case AnnotationType::I64:
oss << __impl::big_to_native(*reinterpret_cast<const uint64_t *>(data.c_str()));
break;

case AnnotationType::DOUBLE:
oss << *reinterpret_cast<const double *>(data.c_str());
break;

case AnnotationType::BYTES:
oss << base64::encode(data);
break;

case AnnotationType::STRING:
oss << data;
break;
}

writer.String(oss.str());
};

char str[64];

writer.StartObject();

writer.Key("traceId");
if (m_span.trace_id_high)
{
writer.String(str, snprintf(str, sizeof(str), SPAN_ID_FMT SPAN_ID_FMT, m_span.trace_id_high, m_span.trace_id));
}
else
{
writer.String(str, snprintf(str, sizeof(str), "0000000000000000" SPAN_ID_FMT, m_span.trace_id));
}

writer.Key("name");
writer.String(m_span.name);

writer.Key("id");
writer.String(str, snprintf(str, sizeof(str), SPAN_ID_FMT, m_span.id));

if (m_span.__isset.parent_id)
{
writer.Key("parentId");
writer.String(str, snprintf(str, sizeof(str), SPAN_ID_FMT, m_span.parent_id));
}

for (auto &annotation : m_span.annotations)
{
if (annotation.value == TraceKeys::CLIENT_SEND || annotation.value == TraceKeys::CLIENT_RECV) {
writer.Key("kind");
writer.String("CLIENT");
break;
}
if (annotation.value == TraceKeys::SERVER_SEND || annotation.value == TraceKeys::SERVER_RECV) {
writer.Key("kind");
writer.String("SERVER");
break;
}
}

if (m_span.__isset.timestamp)
{
writer.Key("timestamp");
writer.Int64(m_span.timestamp);
}

if (m_span.__isset.duration)
{
writer.Key("duration");
writer.Int64(m_span.duration);
}

auto local_endpoint = m_local_endpoint;
auto remote_endpoint = m_remote_endpoint;

for (auto &annotation : m_span.binary_annotations)
{
if (!local_endpoint && annotation.value == TraceKeys::CLIENT_ADDR &&
(annotation.host.__isset.ipv4 || annotation.host.__isset.ipv6)) {
local_endpoint.reset(new Endpoint(annotation.host));
}
if (!remote_endpoint && annotation.value == TraceKeys::SERVER_ADDR &&
(annotation.host.__isset.ipv4 || annotation.host.__isset.ipv6)) {
remote_endpoint.reset(new Endpoint(annotation.host));
}
}

if (local_endpoint) {
writer.Key("localEndpoint");
serialize_endpoint(m_local_endpoint.get());
}

if (remote_endpoint) {
writer.Key("remoteEndpoint");
serialize_endpoint(m_remote_endpoint.get());
}

writer.Key("annotations");
writer.StartArray();

for (auto &annotation : m_span.annotations)
{
writer.StartObject();

writer.Key("timestamp");
writer.Int64(annotation.timestamp);

writer.Key("value");
writer.String(annotation.value);

writer.EndObject();
}

writer.EndArray(m_span.annotations.size());

writer.Key("tags");
writer.StartObject();

for (auto &annotation : m_span.binary_annotations)
{
writer.Key(annotation.key.c_str());
serialize_value(annotation.value, annotation.annotation_type);
}

writer.EndObject();

if (m_span.debug)
{
writer.Key("debug");
writer.Bool(m_span.debug);
}

if (m_shared) {
writer.Key("shared");
writer.Bool(m_shared);
}

writer.EndObject();
}

} // namespace zipkin
Loading

0 comments on commit 52c8f78

Please sign in to comment.