Skip to content

Commit

Permalink
remote endpoint, refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay committed Nov 13, 2017
1 parent 509145a commit 102d6cf
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 44 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ THRIFT_GO_ARGS=thrift_import="github.com/apache/thrift/lib/go/thrift"
THRIFT_GEN=$(shell which thrift-gen)
THRIFT_GEN_DIR=thrift-gen

SWAGGER_IMG_VER=0.12.0
SWAGGER_IMAGE=quay.io/goswagger/swagger:$(SWAGGER_IMG_VER)
SWAGGER_VER=0.12.0
SWAGGER_IMAGE=quay.io/goswagger/swagger:$(SWAGGER_VER)
SWAGGER=docker run --rm -it -u ${shell id -u} -v "${PWD}:/go/src/${PROJECT_ROOT}" -w /go/src/${PROJECT_ROOT} $(SWAGGER_IMAGE)
SWAGGER_GEN_DIR=swagger-gen

Expand Down
10 changes: 7 additions & 3 deletions cmd/collector/app/zipkin/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,11 @@ func cutLongID(id string) string {
return id
}

func endpointToThrift(ip4 string, ip6 string, p int32, service string) (*zipkincore.Endpoint, error) {
func endpointToThrift(e endpoint) (*zipkincore.Endpoint, error) {
return eToThrift(e.IPv4, e.IPv6, e.Port, e.ServiceName)
}

func eToThrift(ip4 string, ip6 string, p int32, service string) (*zipkincore.Endpoint, error) {
ipv4, err := parseIpv4(ip4)
if err != nil {
return nil, err
Expand All @@ -179,7 +183,7 @@ func port(p int32) int32 {
}

func annoToThrift(a annotation) (*zipkincore.Annotation, error) {
endpoint, err := endpointToThrift(a.Endpoint.IPv4, a.Endpoint.IPv6, a.Endpoint.Port, a.Endpoint.ServiceName)
endpoint, err := endpointToThrift(a.Endpoint)
if err != nil {
return nil, err
}
Expand All @@ -192,7 +196,7 @@ func annoToThrift(a annotation) (*zipkincore.Annotation, error) {
}

func binAnnoToThrift(ba binaryAnnotation) (*zipkincore.BinaryAnnotation, error) {
endpoint, err := endpointToThrift(ba.Endpoint.IPv4, ba.Endpoint.IPv6, ba.Endpoint.Port, ba.Endpoint.ServiceName)
endpoint, err := endpointToThrift(ba.Endpoint)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/collector/app/zipkin/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func TestEndpointToThrift(t *testing.T) {
Port: 80,
IPv4: "127.0.0.1",
}
tEndpoint, err := endpointToThrift(endp.IPv4, endp.IPv6, endp.Port, endp.ServiceName)
tEndpoint, err := endpointToThrift(endp)
require.NoError(t, err)
assert.Equal(t, "foo", tEndpoint.ServiceName)
assert.Equal(t, int16(80), tEndpoint.Port)
Expand All @@ -228,7 +228,7 @@ func TestEndpointToThrift(t *testing.T) {
Port: 80,
IPv4: "",
}
tEndpoint, err = endpointToThrift(endp.IPv4, endp.IPv6, endp.Port, endp.ServiceName)
tEndpoint, err = endpointToThrift(endp)
require.NoError(t, err)
assert.Equal(t, "foo", tEndpoint.ServiceName)
assert.Equal(t, int16(80), tEndpoint.Port)
Expand All @@ -239,7 +239,7 @@ func TestEndpointToThrift(t *testing.T) {
Port: 80,
IPv4: "127.0.0.A",
}
tEndpoint, err = endpointToThrift(endp.IPv4, endp.IPv6, endp.Port, endp.ServiceName)
tEndpoint, err = endpointToThrift(endp)
require.Error(t, err)
assert.Equal(t, errWrongIpv4, err)
assert.Nil(t, tEndpoint)
Expand All @@ -249,7 +249,7 @@ func TestEndpointToThrift(t *testing.T) {
Port: 80,
IPv6: "::R",
}
tEndpoint, err = endpointToThrift(endp.IPv4, endp.IPv6, endp.Port, endp.ServiceName)
tEndpoint, err = endpointToThrift(endp)
require.Error(t, err)
assert.Equal(t, errWrongIpv6, err)
assert.Nil(t, tEndpoint)
Expand Down
104 changes: 69 additions & 35 deletions cmd/collector/app/zipkin/jsonv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
package zipkin

import (
"fmt"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/swagger-gen/models"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
"github.com/opentracing/opentracing-go/ext"
)

func spansV2ToThrift(spans *models.ListOfSpans) ([]*zipkincore.Span, error) {
Expand All @@ -42,7 +42,6 @@ func spanV2ToThrift(s models.Span) (*zipkincore.Span, error) {
if err != nil {
return nil, err
}

tSpan := &zipkincore.Span{
ID: int64(id),
TraceID: int64(traceID.Low),
Expand All @@ -61,68 +60,103 @@ func spanV2ToThrift(s models.Span) (*zipkincore.Span, error) {
tSpan.ParentID = &signed
}

localE, err := endpointToThrift(string(s.LocalEndpoint.IPV4),
string(s.LocalEndpoint.IPV6),
int32(s.LocalEndpoint.Port),
s.LocalEndpoint.ServiceName)
if err != nil {
return nil, err
}
localE, err := endpointV2ToThrift(s.LocalEndpoint)

for _, a := range s.Annotations {
tA := annToThrift(a, *localE)
tA := annoV2ToThrift(a, *localE)
tSpan.Annotations = append(tSpan.Annotations, tA)
}

for k, v := range s.Tags {
ba := &zipkincore.BinaryAnnotation{
Key: k,
Value: []byte(v),
AnnotationType: zipkincore.AnnotationType_STRING,
Host: localE,
tSpan.BinaryAnnotations = append(tSpan.BinaryAnnotations, tagsToThrift(s.Tags, *localE)...)
tSpan.Annotations = append(tSpan.Annotations, kindToThrift(s, localE)...)

if s.RemoteEndpoint != nil {
if bAnno, err := rEndpToThrift(s.RemoteEndpoint, s.Kind); err != nil {
tSpan.BinaryAnnotations = append(tSpan.BinaryAnnotations, bAnno)
} else {
return nil, err
}
tSpan.BinaryAnnotations = append(tSpan.BinaryAnnotations, ba)
}
return tSpan, nil
}

if s.Kind == models.SpanKindCLIENT {
tSpan.Annotations = append(tSpan.Annotations, &zipkincore.Annotation{
Value: zipkincore.CLIENT_SEND,
func rEndpToThrift(e *models.Endpoint, kind string) (*zipkincore.BinaryAnnotation, error) {
rEndp, err := endpointV2ToThrift(e)
if err != nil {
return nil, err
}

var key string
switch kind {
case models.SpanKindCLIENT:
key = zipkincore.SERVER_ADDR
case models.SpanKindSERVER:
key = zipkincore.CLIENT_ADDR
default:
key = string(ext.PeerAddress)
}

return &zipkincore.BinaryAnnotation{
Key: key,
Host: rEndp,
AnnotationType: zipkincore.AnnotationType_BOOL,
}, nil
}

func kindToThrift(s models.Span, localE *zipkincore.Endpoint) []*zipkincore.Annotation {
var annos []*zipkincore.Annotation
switch s.Kind {
case models.SpanKindCLIENT:
annos = append(annos, &zipkincore.Annotation{
Value: zipkincore.SERVER_RECV,
Host: localE,
Timestamp: s.Timestamp,
})
tSpan.Annotations = append(tSpan.Annotations, &zipkincore.Annotation{
Value: zipkincore.CLIENT_RECV,
annos = append(annos, &zipkincore.Annotation{
Value: zipkincore.SERVER_SEND,
Host: localE,
Timestamp: s.Timestamp + s.Duration,
})
} else if s.Kind == models.SpanKindSERVER {
tSpan.Annotations = append(tSpan.Annotations, &zipkincore.Annotation{
Value: zipkincore.SERVER_RECV,
case models.SpanKindSERVER:
annos = append(annos, &zipkincore.Annotation{
Value: zipkincore.CLIENT_SEND,
Host: localE,
Timestamp: s.Timestamp,
})
tSpan.Annotations = append(tSpan.Annotations, &zipkincore.Annotation{
Value: zipkincore.SERVER_SEND,
annos = append(annos, &zipkincore.Annotation{
Value: zipkincore.CLIENT_RECV,
Host: localE,
Timestamp: s.Timestamp + s.Duration,
})
// TODO support for producer/consumer once idl supports it
}
// TODO consumer producer

// TODO
//s.RemoteEndpoint

fmt.Println("V2 span")
fmt.Println(tSpan)
return annos
}

return tSpan, nil
func endpointV2ToThrift(e *models.Endpoint) (*zipkincore.Endpoint, error) {
return eToThrift(string(e.IPV4), string(e.IPV6), int32(e.Port), e.ServiceName)
}

func annToThrift(a *models.Annotation, e zipkincore.Endpoint) *zipkincore.Annotation {
func annoV2ToThrift(a *models.Annotation, e zipkincore.Endpoint) *zipkincore.Annotation {
ta := &zipkincore.Annotation{
Value: a.Value,
Timestamp: a.Timestamp,
Host: &e,
}
return ta
}

func tagsToThrift(tags models.Tags, localE zipkincore.Endpoint) []*zipkincore.BinaryAnnotation {
var bAnnos []*zipkincore.BinaryAnnotation
for k, v := range tags {
ba := &zipkincore.BinaryAnnotation{
Key: k,
Value: []byte(v),
AnnotationType: zipkincore.AnnotationType_STRING,
Host: &localE,
}
bAnnos = append(bAnnos, ba)
}
return bAnnos
}

0 comments on commit 102d6cf

Please sign in to comment.