Skip to content

Commit

Permalink
Add JSON input support to zipkin plugin (#3150)
Browse files Browse the repository at this point in the history
(cherry picked from commit 13a6b91)
  • Loading branch information
goller authored and danielnelson committed Aug 22, 2017
1 parent 7254111 commit d4cd1b7
Show file tree
Hide file tree
Showing 16 changed files with 3,103 additions and 517 deletions.
3 changes: 3 additions & 0 deletions plugins/inputs/zipkin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ based on its main usage cases and the evolution of the OpenTracing standard.*
port = 9411 # Port on which Telegraf listens
```

The plugin accepts spans in `JSON` or `thrift` if the `Content-Type` is `application/json` or `application/x-thrift`, respectively.
If `Content-Type` is not set, then the plugin assumes it is `JSON` format.

## Tracing:

This plugin uses Annotations tags and fields to track data from spans
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,17 @@ func main() {
if err != nil {
log.Fatalf("%v\n", err)
}
ioutil.WriteFile(outFileName, raw, 0644)
if err := ioutil.WriteFile(outFileName, raw, 0644); err != nil {
log.Fatalf("%v", err)
}
case "thrift":
raw, err := thriftToJSONSpans(contents)
if err != nil {
log.Fatalf("%v\n", err)
}
ioutil.WriteFile(outFileName, raw, 0644)
if err := ioutil.WriteFile(outFileName, raw, 0644); err != nil {
log.Fatalf("%v", err)
}
default:
log.Fatalf("Unsupported input type")
}
Expand Down
210 changes: 210 additions & 0 deletions plugins/inputs/zipkin/codec/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package codec

import (
"time"

"github.com/influxdata/telegraf/plugins/inputs/zipkin/trace"
"github.com/openzipkin/zipkin-go-opentracing/_thrift/gen-go/zipkincore"
)

//now is a mockable time for now
var now = time.Now

// DefaultServiceName when the span does not have any serviceName
const DefaultServiceName = "unknown"

// Decoder decodes the bytes and returns a trace
type Decoder interface {
Decode(octets []byte) ([]Span, error)
}

// Span are created by instrumentation in RPC clients or servers
type Span interface {
Trace() (string, error)
SpanID() (string, error)
Parent() (string, error)
Name() string
Annotations() []Annotation
BinaryAnnotations() ([]BinaryAnnotation, error)
Timestamp() time.Time
Duration() time.Duration
}

// Annotation represents an event that explains latency with a timestamp.
type Annotation interface {
Timestamp() time.Time
Value() string
Host() Endpoint
}

// BinaryAnnotation represent tags applied to a Span to give it context
type BinaryAnnotation interface {
Key() string
Value() string
Host() Endpoint
}

// Endpoint represents the network context of a service recording an annotation
type Endpoint interface {
Host() string
Name() string
}

// DefaultEndpoint is used if the annotations have no endpoints
type DefaultEndpoint struct{}

// Host returns 0.0.0.0; used when the host is unknown
func (d *DefaultEndpoint) Host() string { return "0.0.0.0" }

// Name returns "unknown" when an endpoint doesn't exist
func (d *DefaultEndpoint) Name() string { return DefaultServiceName }

// MicroToTime converts zipkin's native time of microseconds into time.Time
func MicroToTime(micro int64) time.Time {
return time.Unix(0, micro*int64(time.Microsecond)).UTC()
}

// NewTrace converts a slice of []Span into a new Trace
func NewTrace(spans []Span) (trace.Trace, error) {
tr := make(trace.Trace, len(spans))
for i, span := range spans {
bin, err := span.BinaryAnnotations()
if err != nil {
return nil, err
}
endpoint := serviceEndpoint(span.Annotations(), bin)
id, err := span.SpanID()
if err != nil {
return nil, err
}

tid, err := span.Trace()
if err != nil {
return nil, err
}

pid, err := parentID(span)
if err != nil {
return nil, err
}

tr[i] = trace.Span{
ID: id,
TraceID: tid,
Name: span.Name(),
Timestamp: guessTimestamp(span),
Duration: convertDuration(span),
ParentID: pid,
ServiceName: endpoint.Name(),
Annotations: NewAnnotations(span.Annotations(), endpoint),
BinaryAnnotations: NewBinaryAnnotations(bin, endpoint),
}
}
return tr, nil
}

// NewAnnotations converts a slice of Annotation into a slice of new Annotations
func NewAnnotations(annotations []Annotation, endpoint Endpoint) []trace.Annotation {
formatted := make([]trace.Annotation, len(annotations))
for i, annotation := range annotations {
formatted[i] = trace.Annotation{
Host: endpoint.Host(),
ServiceName: endpoint.Name(),
Timestamp: annotation.Timestamp(),
Value: annotation.Value(),
}
}

return formatted
}

// NewBinaryAnnotations is very similar to NewAnnotations, but it
// converts BinaryAnnotations instead of the normal Annotation
func NewBinaryAnnotations(annotations []BinaryAnnotation, endpoint Endpoint) []trace.BinaryAnnotation {
formatted := make([]trace.BinaryAnnotation, len(annotations))
for i, annotation := range annotations {
formatted[i] = trace.BinaryAnnotation{
Host: endpoint.Host(),
ServiceName: endpoint.Name(),
Key: annotation.Key(),
Value: annotation.Value(),
}
}
return formatted
}

func minMax(span Span) (time.Time, time.Time) {
min := now().UTC()
max := time.Time{}.UTC()
for _, annotation := range span.Annotations() {
ts := annotation.Timestamp()
if !ts.IsZero() && ts.Before(min) {
min = ts
}
if !ts.IsZero() && ts.After(max) {
max = ts
}
}
if max.IsZero() {
max = min
}
return min, max
}

func guessTimestamp(span Span) time.Time {
ts := span.Timestamp()
if !ts.IsZero() {
return ts
}

min, _ := minMax(span)
return min
}

func convertDuration(span Span) time.Duration {
duration := span.Duration()
if duration != 0 {
return duration
}
min, max := minMax(span)
return max.Sub(min)
}

func parentID(span Span) (string, error) {
// A parent ID of "" means that this is a parent span. In this case,
// we set the parent ID of the span to be its own id, so it points to
// itself.
id, err := span.Parent()
if err != nil {
return "", err
}

if id != "" {
return id, nil
}
return span.SpanID()
}

func serviceEndpoint(ann []Annotation, bann []BinaryAnnotation) Endpoint {
for _, a := range ann {
switch a.Value() {
case zipkincore.SERVER_RECV, zipkincore.SERVER_SEND, zipkincore.CLIENT_RECV, zipkincore.CLIENT_SEND:
if a.Host() != nil && a.Host().Name() != "" {
return a.Host()
}
}
}

for _, a := range bann {
if a.Key() == zipkincore.LOCAL_COMPONENT && a.Host() != nil && a.Host().Name() != "" {
return a.Host()
}
}
// Unable to find any "standard" endpoint host, so, use any that exist in the regular annotations
for _, a := range ann {
if a.Host() != nil && a.Host().Name() != "" {
return a.Host()
}
}
return &DefaultEndpoint{}
}
Loading

0 comments on commit d4cd1b7

Please sign in to comment.