From 102d6cf28af462b43c27a7e082fadff831eb665e Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Fri, 10 Nov 2017 14:21:06 +0100 Subject: [PATCH] remote endpoint, refactor Signed-off-by: Pavol Loffay --- Makefile | 4 +- cmd/collector/app/zipkin/json.go | 10 ++- cmd/collector/app/zipkin/json_test.go | 8 +- cmd/collector/app/zipkin/jsonv2.go | 104 +++++++++++++++++--------- 4 files changed, 82 insertions(+), 44 deletions(-) diff --git a/Makefile b/Makefile index d7f50336a9e..758bde73ffb 100644 --- a/Makefile +++ b/Makefile @@ -34,8 +34,8 @@ THRIFT_GO_ARGS=thrift_import="github.com/apache/thrift/lib/go/thrift" THRIFT_GEN=$(shell which thrift-gen) THRIFT_GEN_DIR=thrift-gen -SWAGGER_IMG_VER=0.12.0 -SWAGGER_IMAGE=quay.io/goswagger/swagger:$(SWAGGER_IMG_VER) +SWAGGER_VER=0.12.0 +SWAGGER_IMAGE=quay.io/goswagger/swagger:$(SWAGGER_VER) SWAGGER=docker run --rm -it -u ${shell id -u} -v "${PWD}:/go/src/${PROJECT_ROOT}" -w /go/src/${PROJECT_ROOT} $(SWAGGER_IMAGE) SWAGGER_GEN_DIR=swagger-gen diff --git a/cmd/collector/app/zipkin/json.go b/cmd/collector/app/zipkin/json.go index d9eac5bade2..d54747b05b5 100644 --- a/cmd/collector/app/zipkin/json.go +++ b/cmd/collector/app/zipkin/json.go @@ -152,7 +152,11 @@ func cutLongID(id string) string { return id } -func endpointToThrift(ip4 string, ip6 string, p int32, service string) (*zipkincore.Endpoint, error) { +func endpointToThrift(e endpoint) (*zipkincore.Endpoint, error) { + return eToThrift(e.IPv4, e.IPv6, e.Port, e.ServiceName) +} + +func eToThrift(ip4 string, ip6 string, p int32, service string) (*zipkincore.Endpoint, error) { ipv4, err := parseIpv4(ip4) if err != nil { return nil, err @@ -179,7 +183,7 @@ func port(p int32) int32 { } func annoToThrift(a annotation) (*zipkincore.Annotation, error) { - endpoint, err := endpointToThrift(a.Endpoint.IPv4, a.Endpoint.IPv6, a.Endpoint.Port, a.Endpoint.ServiceName) + endpoint, err := endpointToThrift(a.Endpoint) if err != nil { return nil, err } @@ -192,7 +196,7 @@ func annoToThrift(a annotation) (*zipkincore.Annotation, error) { } func binAnnoToThrift(ba binaryAnnotation) (*zipkincore.BinaryAnnotation, error) { - endpoint, err := endpointToThrift(ba.Endpoint.IPv4, ba.Endpoint.IPv6, ba.Endpoint.Port, ba.Endpoint.ServiceName) + endpoint, err := endpointToThrift(ba.Endpoint) if err != nil { return nil, err } diff --git a/cmd/collector/app/zipkin/json_test.go b/cmd/collector/app/zipkin/json_test.go index 1445b2fd09b..7b99acdfafb 100644 --- a/cmd/collector/app/zipkin/json_test.go +++ b/cmd/collector/app/zipkin/json_test.go @@ -217,7 +217,7 @@ func TestEndpointToThrift(t *testing.T) { Port: 80, IPv4: "127.0.0.1", } - tEndpoint, err := endpointToThrift(endp.IPv4, endp.IPv6, endp.Port, endp.ServiceName) + tEndpoint, err := endpointToThrift(endp) require.NoError(t, err) assert.Equal(t, "foo", tEndpoint.ServiceName) assert.Equal(t, int16(80), tEndpoint.Port) @@ -228,7 +228,7 @@ func TestEndpointToThrift(t *testing.T) { Port: 80, IPv4: "", } - tEndpoint, err = endpointToThrift(endp.IPv4, endp.IPv6, endp.Port, endp.ServiceName) + tEndpoint, err = endpointToThrift(endp) require.NoError(t, err) assert.Equal(t, "foo", tEndpoint.ServiceName) assert.Equal(t, int16(80), tEndpoint.Port) @@ -239,7 +239,7 @@ func TestEndpointToThrift(t *testing.T) { Port: 80, IPv4: "127.0.0.A", } - tEndpoint, err = endpointToThrift(endp.IPv4, endp.IPv6, endp.Port, endp.ServiceName) + tEndpoint, err = endpointToThrift(endp) require.Error(t, err) assert.Equal(t, errWrongIpv4, err) assert.Nil(t, tEndpoint) @@ -249,7 +249,7 @@ func TestEndpointToThrift(t *testing.T) { Port: 80, IPv6: "::R", } - tEndpoint, err = endpointToThrift(endp.IPv4, endp.IPv6, endp.Port, endp.ServiceName) + tEndpoint, err = endpointToThrift(endp) require.Error(t, err) assert.Equal(t, errWrongIpv6, err) assert.Nil(t, tEndpoint) diff --git a/cmd/collector/app/zipkin/jsonv2.go b/cmd/collector/app/zipkin/jsonv2.go index 24d011a8b01..8d0062e669b 100644 --- a/cmd/collector/app/zipkin/jsonv2.go +++ b/cmd/collector/app/zipkin/jsonv2.go @@ -15,10 +15,10 @@ package zipkin import ( - "fmt" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/swagger-gen/models" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" + "github.com/opentracing/opentracing-go/ext" ) func spansV2ToThrift(spans *models.ListOfSpans) ([]*zipkincore.Span, error) { @@ -42,7 +42,6 @@ func spanV2ToThrift(s models.Span) (*zipkincore.Span, error) { if err != nil { return nil, err } - tSpan := &zipkincore.Span{ ID: int64(id), TraceID: int64(traceID.Low), @@ -61,64 +60,85 @@ func spanV2ToThrift(s models.Span) (*zipkincore.Span, error) { tSpan.ParentID = &signed } - localE, err := endpointToThrift(string(s.LocalEndpoint.IPV4), - string(s.LocalEndpoint.IPV6), - int32(s.LocalEndpoint.Port), - s.LocalEndpoint.ServiceName) - if err != nil { - return nil, err - } + localE, err := endpointV2ToThrift(s.LocalEndpoint) for _, a := range s.Annotations { - tA := annToThrift(a, *localE) + tA := annoV2ToThrift(a, *localE) tSpan.Annotations = append(tSpan.Annotations, tA) } - for k, v := range s.Tags { - ba := &zipkincore.BinaryAnnotation{ - Key: k, - Value: []byte(v), - AnnotationType: zipkincore.AnnotationType_STRING, - Host: localE, + tSpan.BinaryAnnotations = append(tSpan.BinaryAnnotations, tagsToThrift(s.Tags, *localE)...) + tSpan.Annotations = append(tSpan.Annotations, kindToThrift(s, localE)...) + + if s.RemoteEndpoint != nil { + if bAnno, err := rEndpToThrift(s.RemoteEndpoint, s.Kind); err != nil { + tSpan.BinaryAnnotations = append(tSpan.BinaryAnnotations, bAnno) + } else { + return nil, err } - tSpan.BinaryAnnotations = append(tSpan.BinaryAnnotations, ba) } + return tSpan, nil +} - if s.Kind == models.SpanKindCLIENT { - tSpan.Annotations = append(tSpan.Annotations, &zipkincore.Annotation{ - Value: zipkincore.CLIENT_SEND, +func rEndpToThrift(e *models.Endpoint, kind string) (*zipkincore.BinaryAnnotation, error) { + rEndp, err := endpointV2ToThrift(e) + if err != nil { + return nil, err + } + + var key string + switch kind { + case models.SpanKindCLIENT: + key = zipkincore.SERVER_ADDR + case models.SpanKindSERVER: + key = zipkincore.CLIENT_ADDR + default: + key = string(ext.PeerAddress) + } + + return &zipkincore.BinaryAnnotation{ + Key: key, + Host: rEndp, + AnnotationType: zipkincore.AnnotationType_BOOL, + }, nil +} + +func kindToThrift(s models.Span, localE *zipkincore.Endpoint) []*zipkincore.Annotation { + var annos []*zipkincore.Annotation + switch s.Kind { + case models.SpanKindCLIENT: + annos = append(annos, &zipkincore.Annotation{ + Value: zipkincore.SERVER_RECV, Host: localE, Timestamp: s.Timestamp, }) - tSpan.Annotations = append(tSpan.Annotations, &zipkincore.Annotation{ - Value: zipkincore.CLIENT_RECV, + annos = append(annos, &zipkincore.Annotation{ + Value: zipkincore.SERVER_SEND, Host: localE, Timestamp: s.Timestamp + s.Duration, }) - } else if s.Kind == models.SpanKindSERVER { - tSpan.Annotations = append(tSpan.Annotations, &zipkincore.Annotation{ - Value: zipkincore.SERVER_RECV, + case models.SpanKindSERVER: + annos = append(annos, &zipkincore.Annotation{ + Value: zipkincore.CLIENT_SEND, Host: localE, Timestamp: s.Timestamp, }) - tSpan.Annotations = append(tSpan.Annotations, &zipkincore.Annotation{ - Value: zipkincore.SERVER_SEND, + annos = append(annos, &zipkincore.Annotation{ + Value: zipkincore.CLIENT_RECV, Host: localE, Timestamp: s.Timestamp + s.Duration, }) + // TODO support for producer/consumer once idl supports it } - // TODO consumer producer - // TODO - //s.RemoteEndpoint - - fmt.Println("V2 span") - fmt.Println(tSpan) + return annos +} - return tSpan, nil +func endpointV2ToThrift(e *models.Endpoint) (*zipkincore.Endpoint, error) { + return eToThrift(string(e.IPV4), string(e.IPV6), int32(e.Port), e.ServiceName) } -func annToThrift(a *models.Annotation, e zipkincore.Endpoint) *zipkincore.Annotation { +func annoV2ToThrift(a *models.Annotation, e zipkincore.Endpoint) *zipkincore.Annotation { ta := &zipkincore.Annotation{ Value: a.Value, Timestamp: a.Timestamp, @@ -126,3 +146,17 @@ func annToThrift(a *models.Annotation, e zipkincore.Endpoint) *zipkincore.Annota } return ta } + +func tagsToThrift(tags models.Tags, localE zipkincore.Endpoint) []*zipkincore.BinaryAnnotation { + var bAnnos []*zipkincore.BinaryAnnotation + for k, v := range tags { + ba := &zipkincore.BinaryAnnotation{ + Key: k, + Value: []byte(v), + AnnotationType: zipkincore.AnnotationType_STRING, + Host: &localE, + } + bAnnos = append(bAnnos, ba) + } + return bAnnos +}