Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

KafkaChannel Tracing #1155

Merged
merged 11 commits into from
Jul 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ require (
github.com/stretchr/testify v1.5.1
github.com/xanzy/go-gitlab v0.28.0
gitlab.com/flimzy/testy v0.2.1 // indirect
go.opencensus.io v0.22.4
go.opencensus.io v0.22.5-0.20200716030834-3456e1d174b2
go.opentelemetry.io/otel v0.4.2 // indirect
go.uber.org/zap v1.14.1
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,8 @@ go.opencensus.io v0.22.3 h1:8sGtKOrtQqkN1bp2AtX+misvLIlOmsEsNd+9NIcPEm8=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4 h1:LYy1Hy3MJdrCdMwwzxA/dRok4ejH+RwNGbuoD9fCjto=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5-0.20200716030834-3456e1d174b2 h1:p7zhKkxd+cS4tMYPy/HkKVczBa28t8TU+sUh7GJ/Ge4=
go.opencensus.io v0.22.5-0.20200716030834-3456e1d174b2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opentelemetry.io/otel v0.2.3/go.mod h1:OgNpQOjrlt33Ew6Ds0mGjmcTQg/rhUctsbkRdk/g1fw=
go.opentelemetry.io/otel v0.4.2 h1:nT+GOqqRR1cIY92xmo1DeiXLHtIlXH1KLRgnsnhuNrs=
go.opentelemetry.io/otel v0.4.2/go.mod h1:OgNpQOjrlt33Ew6Ds0mGjmcTQg/rhUctsbkRdk/g1fw=
Expand Down
19 changes: 18 additions & 1 deletion kafka/channel/pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/Shopify/sarama"
protocolkafka "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"go.opencensus.io/trace"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1beta1"
Expand Down Expand Up @@ -91,6 +92,8 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat
return err
}

kafkaProducerMessage.Headers = append(kafkaProducerMessage.Headers, serializeTrace(trace.FromContext(ctx).SpanContext())...)

dispatcher.kafkaAsyncProducer.Input() <- &kafkaProducerMessage
return nil
},
Expand Down Expand Up @@ -151,7 +154,11 @@ func (c consumerMessageHandler) Handle(ctx context.Context, consumerMessage *sar
zap.String("topic", consumerMessage.Topic),
zap.String("sub", string(c.sub.UID)),
)
err := c.dispatcher.DispatchMessage(context.Background(), message, nil, destination, reply, deadLetter)

ctx, span := startTraceFromMessage(c.logger, ctx, message, consumerMessage.Topic)
defer span.End()

err := c.dispatcher.DispatchMessage(ctx, message, nil, destination, reply, deadLetter)
// NOTE: only return `true` here if DispatchMessage actually delivered the message.
return err == nil, err
}
Expand Down Expand Up @@ -365,3 +372,13 @@ func newSubscription(spec eventingduck.SubscriberSpec, name string, namespace st
Namespace: namespace,
}
}

func startTraceFromMessage(logger *zap.Logger, inCtx context.Context, message *protocolkafka.Message, topic string) (context.Context, *trace.Span) {
sc, ok := parseSpanContext(message.Headers)
if !ok {
logger.Warn("Cannot parse the spancontext, creating a new span")
return trace.StartSpan(inCtx, "kafkachannel-"+topic)
}

return trace.StartSpanWithRemoteParent(inCtx, "kafkachannel-"+topic, sc)
}
9 changes: 8 additions & 1 deletion kafka/channel/pkg/dispatcher/dispatcher_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"

tracingconfig "knative.dev/pkg/tracing/config"

"knative.dev/eventing-contrib/kafka/channel/pkg/utils"
)

Expand All @@ -58,7 +60,12 @@ func TestDispatcher(t *testing.T) {
t.Fatal(err)
}

tracing.SetupStaticPublishing(logger.Sugar(), "localhost", tracing.AlwaysSample)
tracing.SetupStaticPublishing(logger.Sugar(), "localhost", &tracingconfig.Config{
Backend: tracingconfig.Zipkin,
Debug: true,
SampleRate: 1.0,
ZipkinEndpoint: "http://localhost:9411/api/v2/spans",
})

dispatcherArgs := KafkaDispatcherArgs{
KnCEConnectionArgs: nil,
Expand Down
64 changes: 64 additions & 0 deletions kafka/channel/pkg/dispatcher/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dispatcher

import (
"github.com/Shopify/sarama"
"go.opencensus.io/plugin/ochttp/propagation/tracecontext"
"go.opencensus.io/trace"
)

const (
traceParentHeader = "traceparent"
traceStateHeader = "tracestate"
)

var format = &tracecontext.HTTPFormat{}

func serializeTrace(spanContext trace.SpanContext) []sarama.RecordHeader {
tp, ts := format.SpanContextToHeaders(spanContext)

if ts != "" {
return []sarama.RecordHeader{{
Key: []byte(traceParentHeader),
Value: []byte(tp),
}, {
Key: []byte(traceStateHeader),
Value: []byte(ts),
}}
}

return []sarama.RecordHeader{{
Key: []byte(traceParentHeader),
Value: []byte(tp),
}}
}

func parseSpanContext(headers map[string][]byte) (sc trace.SpanContext, ok bool) {
tpBytes, ok := headers[traceParentHeader]
if !ok {
return trace.SpanContext{}, false
}
tp := string(tpBytes)

ts := ""
if tsBytes, ok := headers[traceStateHeader]; ok {
ts = string(tsBytes)
}

return format.SpanContextFromHeaders(tp, ts)
}
73 changes: 73 additions & 0 deletions kafka/channel/pkg/dispatcher/tracing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dispatcher

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"go.opencensus.io/trace"
"go.opencensus.io/trace/tracestate"
)

var (
traceID = trace.TraceID{75, 249, 47, 53, 119, 179, 77, 166, 163, 206, 146, 157, 14, 14, 71, 54}
spanID = trace.SpanID{0, 240, 103, 170, 11, 169, 2, 183}
traceOpt = trace.TraceOptions(1)
entry1 = tracestate.Entry{Key: "foo", Value: "bar"}
entry2 = tracestate.Entry{Key: "hello", Value: "world example"}
sampleTracestate, _ = tracestate.New(nil, entry1, entry2)
sampleSpanContext = trace.SpanContext{
TraceID: traceID,
SpanID: spanID,
TraceOptions: traceOpt,
Tracestate: sampleTracestate,
}
)

func TestRoundtripWithNewSpan(t *testing.T) {
_, span := trace.StartSpan(context.TODO(), "aaa", trace.WithSpanKind(trace.SpanKindClient))
span.AddAttributes(trace.BoolAttribute("hello", true))
inSpanContext := span.SpanContext()

serializedHeaders := serializeTrace(inSpanContext)

// Translate back to headers
headers := make(map[string][]byte)
for _, h := range serializedHeaders {
headers[string(h.Key)] = h.Value
}

outSpanContext, ok := parseSpanContext(headers)
require.True(t, ok)
require.Equal(t, inSpanContext, outSpanContext)
}

func TestRoundtrip(t *testing.T) {
serializedHeaders := serializeTrace(sampleSpanContext)

// Translate back to headers
headers := make(map[string][]byte)
for _, h := range serializedHeaders {
headers[string(h.Key)] = h.Value
}

outSpanContext, ok := parseSpanContext(headers)
require.True(t, ok)
require.Equal(t, sampleSpanContext, outSpanContext)
}
9 changes: 7 additions & 2 deletions kafka/channel/pkg/reconciler/dispatcher/kafkachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"reflect"

"go.uber.org/zap"
"knative.dev/eventing/pkg/tracing"

corev1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -93,10 +94,14 @@ var _ controller.Reconciler = (*Reconciler)(nil)

// NewController initializes the controller and is called by the generated code.
// Registers event handlers to enqueue events.
func NewController(ctx context.Context, _ configmap.Watcher) *controller.Impl {

func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
logger := logging.FromContext(ctx)

err := tracing.SetupDynamicPublishing(logger.Sugar(), cmw.(*configmap.InformedWatcher), "kafka-ch-dispatcher", "config-tracing")
if err != nil {
logger.Fatal("unable to setup tracing", zap.Error(err))
}

configMap, err := configmap.Load("/etc/config-kafka")
if err != nil {
logger.Fatal("error loading configuration", zap.Error(err))
Expand Down
24 changes: 24 additions & 0 deletions test/config/config-tracing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright 2020 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: v1
kind: ConfigMap
metadata:
name: config-tracing
namespace: knative-eventing
data:
backend: "zipkin"
zipkin-endpoint: "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans"
debug: "true"
sample-rate: "1.0"
52 changes: 52 additions & 0 deletions test/conformance/channel_tracing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// +build e2e

/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package conformance

import (
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/eventing/test/conformance/helpers"
testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/resources"

contribtest "knative.dev/eventing-contrib/test"
)

func TestChannelTracingWithReply(t *testing.T) {
// Enable this test only for Kafka
helpers.ChannelTracingTestHelperWithChannelTestRunner(t, testlib.ComponentsTestRunner{
ComponentFeatureMap: map[metav1.TypeMeta][]testlib.Feature{
{
APIVersion: resources.MessagingAPIVersion,
Kind: contribtest.KafkaChannelKind,
}: {
testlib.FeatureBasic,
testlib.FeatureRedelivery,
testlib.FeaturePersistence,
},
},
ComponentsToTest: []metav1.TypeMeta{
{
APIVersion: resources.MessagingAPIVersion,
Kind: contribtest.KafkaChannelKind,
},
},
}, testlib.SetupClientOptionNoop)
}
9 changes: 9 additions & 0 deletions test/e2e-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ readonly VENDOR_EVENTING_TEST_IMAGES="vendor/knative.dev/eventing/test/test_imag
# HEAD eventing test images.
readonly HEAD_EVENTING_TEST_IMAGES="${GOPATH}/src/knative.dev/eventing/test/test_images/"

# Config tracing config.
readonly CONFIG_TRACING_CONFIG="test/config/config-tracing.yaml"

# NATS Streaming installation config.
readonly NATSS_INSTALLATION_CONFIG="natss/config/broker/natss.yaml"
# NATSS channel CRD config directory.
Expand Down Expand Up @@ -90,6 +93,9 @@ function knative_setup() {
fi
wait_until_pods_running knative-eventing || fail_test "Knative Eventing did not come up"

# Setup config tracing for tracing tests
kubectl replace -f $CONFIG_TRACING_CONFIG

# TODO install head if !is_release_branch
echo "Installing Knative Monitoring"
# Hack hack hack. Why is this namespace not created as part of monitoring release.
Expand Down Expand Up @@ -152,6 +158,9 @@ function test_setup() {
sed -i 's@knative.dev/eventing/test/test_images@knative.dev/eventing-contrib/vendor/knative.dev/eventing/test/test_images@g' "${VENDOR_EVENTING_TEST_IMAGES}"*/*.yaml
$(dirname $0)/upload-test-images.sh ${VENDOR_EVENTING_TEST_IMAGES} e2e || fail_test "Error uploading test images"
$(dirname $0)/upload-test-images.sh "test/test_images" e2e || fail_test "Error uploading test images"

# Setup config tracing for tracing tests
kubectl replace -f $CONFIG_TRACING_CONFIG
}

function test_teardown() {
Expand Down
2 changes: 2 additions & 0 deletions vendor/go.opencensus.io/plugin/ocgrpc/client_metrics.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions vendor/go.opencensus.io/plugin/ocgrpc/server_metrics.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading