Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JSON input to zipkin plugin #3150

Merged
merged 12 commits into from
Aug 22, 2017
3 changes: 3 additions & 0 deletions plugins/inputs/zipkin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ based on its main usage cases and the evolution of the OpenTracing standard.*
port = 9411 # Port on which Telegraf listens
```

The plugin accepts spans in `JSON` or `thrift` if the `Content-Type` is `application/json` or `application/x-thrift`, respectively.
If `Content-Type` is not set, then the plugin assumes it is `JSON` format.

## Tracing:

This plugin uses Annotations tags and fields to track data from spans
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,17 @@ func main() {
if err != nil {
log.Fatalf("%v\n", err)
}
ioutil.WriteFile(outFileName, raw, 0644)
if err := ioutil.WriteFile(outFileName, raw, 0644); err != nil {
log.Fatalf("%v", err)
}
case "thrift":
raw, err := thriftToJSONSpans(contents)
if err != nil {
log.Fatalf("%v\n", err)
}
ioutil.WriteFile(outFileName, raw, 0644)
if err := ioutil.WriteFile(outFileName, raw, 0644); err != nil {
log.Fatalf("%v", err)
}
default:
log.Fatalf("Unsupported input type")
}
Expand Down
210 changes: 210 additions & 0 deletions plugins/inputs/zipkin/codec/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package codec

import (
"time"

"github.com/influxdata/telegraf/plugins/inputs/zipkin/trace"
"github.com/openzipkin/zipkin-go-opentracing/_thrift/gen-go/zipkincore"
)

//now is a mockable time for now
var now = time.Now

// DefaultServiceName when the span does not have any serviceName
const DefaultServiceName = "unknown"

// Decoder decodes the bytes and returns a trace
type Decoder interface {
Decode(octets []byte) ([]Span, error)
}

// Span are created by instrumentation in RPC clients or servers
type Span interface {
Trace() (string, error)
SpanID() (string, error)
Parent() (string, error)
Name() string
Annotations() []Annotation
BinaryAnnotations() ([]BinaryAnnotation, error)
Timestamp() time.Time
Duration() time.Duration
}

// Annotation represents an event that explains latency with a timestamp.
type Annotation interface {
Timestamp() time.Time
Value() string
Host() Endpoint
}

// BinaryAnnotation represent tags applied to a Span to give it context
type BinaryAnnotation interface {
Key() string
Value() string
Host() Endpoint
}

// Endpoint represents the network context of a service recording an annotation
type Endpoint interface {
Host() string
Name() string
}

// DefaultEndpoint is used if the annotations have no endpoints
type DefaultEndpoint struct{}

// Host returns 0.0.0.0; used when the host is unknown
func (d *DefaultEndpoint) Host() string { return "0.0.0.0" }

// Name returns "unknown" when an endpoint doesn't exist
func (d *DefaultEndpoint) Name() string { return DefaultServiceName }

// MicroToTime converts zipkin's native time of microseconds into time.Time
func MicroToTime(micro int64) time.Time {
return time.Unix(0, micro*int64(time.Microsecond)).UTC()
}

// NewTrace converts a slice of []Span into a new Trace
func NewTrace(spans []Span) (trace.Trace, error) {
tr := make(trace.Trace, len(spans))
for i, span := range spans {
bin, err := span.BinaryAnnotations()
if err != nil {
return nil, err
}
endpoint := serviceEndpoint(span.Annotations(), bin)
id, err := span.SpanID()
if err != nil {
return nil, err
}

tid, err := span.Trace()
if err != nil {
return nil, err
}

pid, err := parentID(span)
if err != nil {
return nil, err
}

tr[i] = trace.Span{
ID: id,
TraceID: tid,
Name: span.Name(),
Timestamp: guessTimestamp(span),
Duration: convertDuration(span),
ParentID: pid,
ServiceName: endpoint.Name(),
Annotations: NewAnnotations(span.Annotations(), endpoint),
BinaryAnnotations: NewBinaryAnnotations(bin, endpoint),
}
}
return tr, nil
}

// NewAnnotations converts a slice of Annotation into a slice of new Annotations
func NewAnnotations(annotations []Annotation, endpoint Endpoint) []trace.Annotation {
formatted := make([]trace.Annotation, len(annotations))
for i, annotation := range annotations {
formatted[i] = trace.Annotation{
Host: endpoint.Host(),
ServiceName: endpoint.Name(),
Timestamp: annotation.Timestamp(),
Value: annotation.Value(),
}
}

return formatted
}

// NewBinaryAnnotations is very similar to NewAnnotations, but it
// converts BinaryAnnotations instead of the normal Annotation
func NewBinaryAnnotations(annotations []BinaryAnnotation, endpoint Endpoint) []trace.BinaryAnnotation {
formatted := make([]trace.BinaryAnnotation, len(annotations))
for i, annotation := range annotations {
formatted[i] = trace.BinaryAnnotation{
Host: endpoint.Host(),
ServiceName: endpoint.Name(),
Key: annotation.Key(),
Value: annotation.Value(),
}
}
return formatted
}

func minMax(span Span) (time.Time, time.Time) {
min := now().UTC()
max := time.Time{}.UTC()
for _, annotation := range span.Annotations() {
ts := annotation.Timestamp()
if !ts.IsZero() && ts.Before(min) {
min = ts
}
if !ts.IsZero() && ts.After(max) {
max = ts
}
}
if max.IsZero() {
max = min
}
return min, max
}

func guessTimestamp(span Span) time.Time {
ts := span.Timestamp()
if !ts.IsZero() {
return ts
}

min, _ := minMax(span)
return min
}

func convertDuration(span Span) time.Duration {
duration := span.Duration()
if duration != 0 {
return duration
}
min, max := minMax(span)
return max.Sub(min)
}

func parentID(span Span) (string, error) {
// A parent ID of "" means that this is a parent span. In this case,
// we set the parent ID of the span to be its own id, so it points to
// itself.
id, err := span.Parent()
if err != nil {
return "", err
}

if id != "" {
return id, nil
}
return span.SpanID()
}

func serviceEndpoint(ann []Annotation, bann []BinaryAnnotation) Endpoint {
for _, a := range ann {
switch a.Value() {
case zipkincore.SERVER_RECV, zipkincore.SERVER_SEND, zipkincore.CLIENT_RECV, zipkincore.CLIENT_SEND:
if a.Host() != nil && a.Host().Name() != "" {
return a.Host()
}
}
}

for _, a := range bann {
if a.Key() == zipkincore.LOCAL_COMPONENT && a.Host() != nil && a.Host().Name() != "" {
return a.Host()
}
}
// Unable to find any "standard" endpoint host, so, use any that exist in the regular annotations
for _, a := range ann {
if a.Host() != nil && a.Host().Name() != "" {
return a.Host()
}
}
return &DefaultEndpoint{}
}
Loading