Skip to content

Commit

Permalink
Improve connection state logging for Jaeger exporter
Browse files Browse the repository at this point in the history
Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>
  • Loading branch information
jpkrohling committed Jan 21, 2021
1 parent 96b226a commit 745f47c
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 12 deletions.
111 changes: 102 additions & 9 deletions exporter/jaegerexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@ package jaegerexporter
import (
"context"
"fmt"
"sync"
"time"

jaegerproto "github.com/jaegertracing/jaeger/proto-gen/api_v2"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/metadata"

"go.opentelemetry.io/collector/component"
Expand All @@ -40,21 +45,23 @@ func newTraceExporter(cfg *Config, logger *zap.Logger) (component.TracesExporter
return nil, err
}

client, err := grpc.Dial(cfg.GRPCClientSettings.Endpoint, opts...)
conn, err := grpc.Dial(cfg.GRPCClientSettings.Endpoint, opts...)
if err != nil {
return nil, err
}

collectorServiceClient := jaegerproto.NewCollectorServiceClient(client)
s := &protoGRPCSender{
logger: logger,
client: collectorServiceClient,
metadata: metadata.New(cfg.GRPCClientSettings.Headers),
waitForReady: cfg.WaitForReady,
}

collectorServiceClient := jaegerproto.NewCollectorServiceClient(conn)
s := newProtoGRPCSender(logger,
cfg.NameVal,
collectorServiceClient,
metadata.New(cfg.GRPCClientSettings.Headers),
cfg.WaitForReady,
conn,
)
exp, err := exporterhelper.NewTraceExporter(
cfg, logger, s.pushTraceData,
exporterhelper.WithStart(s.start),
exporterhelper.WithShutdown(s.shutdown),
exporterhelper.WithTimeout(cfg.TimeoutSettings),
exporterhelper.WithRetry(cfg.RetrySettings),
exporterhelper.WithQueue(cfg.QueueSettings),
Expand All @@ -66,10 +73,40 @@ func newTraceExporter(cfg *Config, logger *zap.Logger) (component.TracesExporter
// protoGRPCSender forwards spans encoded in the jaeger proto
// format, to a grpc server.
type protoGRPCSender struct {
name string
logger *zap.Logger
client jaegerproto.CollectorServiceClient
metadata metadata.MD
waitForReady bool

conn stateReporter
connStateReporterInterval time.Duration
stateChangeCallbacks []func(connectivity.State)

stopCh chan (struct{})
stopped bool
stopLock sync.Mutex
}

func newProtoGRPCSender(logger *zap.Logger, name string, cl jaegerproto.CollectorServiceClient, md metadata.MD, waitForReady bool, conn stateReporter) *protoGRPCSender {
s := &protoGRPCSender{
name: name,
logger: logger,
client: cl,
metadata: md,
waitForReady: waitForReady,

conn: conn,
connStateReporterInterval: time.Second,

stopCh: make(chan (struct{})),
}
s.AddStateChangeCallback(s.onStateChange)
return s
}

type stateReporter interface {
GetState() connectivity.State
}

func (s *protoGRPCSender) pushTraceData(
Expand Down Expand Up @@ -100,3 +137,59 @@ func (s *protoGRPCSender) pushTraceData(

return 0, nil
}

func (s *protoGRPCSender) shutdown(context.Context) error {
s.stopLock.Lock()
s.stopped = true
s.stopLock.Unlock()
close(s.stopCh)
return nil
}

func (s *protoGRPCSender) start(context.Context, component.Host) error {
go s.startConnectionStatusReporter()
return nil
}

func (s *protoGRPCSender) startConnectionStatusReporter() {
connState := s.conn.GetState()
s.propagateStateChange(connState)

ticker := time.NewTicker(s.connStateReporterInterval)
for {
select {
case <-ticker.C:
s.stopLock.Lock()
if s.stopped {
s.stopLock.Unlock()
return
}

st := s.conn.GetState()
if connState != st {
// state has changed, report it
connState = st
s.propagateStateChange(st)
}
s.stopLock.Unlock()
case <-s.stopCh:
return
}
}
}

func (s *protoGRPCSender) propagateStateChange(st connectivity.State) {
for _, callback := range s.stateChangeCallbacks {
callback(st)
}
}

func (s *protoGRPCSender) onStateChange(st connectivity.State) {
mCtx, _ := tag.New(context.Background(), tag.Upsert(tag.MustNewKey("exporter_name"), s.name))
stats.Record(mCtx, mLastConnectionState.M(int64(st)))
s.logger.Info("State of the connection with the Jaeger Collector backend", zap.Stringer("state", st))
}

func (s *protoGRPCSender) AddStateChangeCallback(f func(connectivity.State)) {
s.stateChangeCallbacks = append(s.stateChangeCallbacks, f)
}
77 changes: 77 additions & 0 deletions exporter/jaegerexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@ import (
"path"
"sync"
"testing"
"time"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/consumer/pdata"
Expand Down Expand Up @@ -248,6 +251,80 @@ func TestMutualTLS(t *testing.T) {
assert.Equal(t, jTraceID, requestes[0].GetBatch().Spans[0].TraceID)
}

func TestConnectionStateChange(t *testing.T) {
var state connectivity.State

wg := sync.WaitGroup{}
sr := &mockStateReporter{
state: connectivity.Connecting,
}
sender := &protoGRPCSender{
logger: zap.NewNop(),
stopCh: make(chan (struct{})),
conn: sr,
connStateReporterInterval: 10 * time.Millisecond,
}

wg.Add(1)
sender.AddStateChangeCallback(func(c connectivity.State) {
state = c
wg.Done()
})

sender.start(context.Background(), componenttest.NewNopHost())
defer sender.shutdown(context.Background())
wg.Wait() // wait for the initial state to be propagated

// test
wg.Add(1)
sr.SetState(connectivity.Ready)

// verify
wg.Wait() // wait until we get the state change
assert.Equal(t, connectivity.Ready, state)
}

func TestConnectionReporterEndsOnStopped(t *testing.T) {
sr := &mockStateReporter{
state: connectivity.Connecting,
}

sender := &protoGRPCSender{
logger: zap.NewNop(),
stopCh: make(chan (struct{})),
conn: sr,
connStateReporterInterval: 10 * time.Millisecond,
}

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
sender.startConnectionStatusReporter()
wg.Done()
}()

sender.stopped = true
// if the test finishes, we are good... if it gets blocked, the conn status reporter didn't return when the sender was marked as stopped
wg.Wait()
}

type mockStateReporter struct {
state connectivity.State
mu sync.RWMutex
}

func (m *mockStateReporter) GetState() connectivity.State {
m.mu.RLock()
st := m.state
m.mu.RUnlock()
return st
}
func (m *mockStateReporter) SetState(st connectivity.State) {
m.mu.Lock()
m.state = st
m.mu.Unlock()
}

func initializeGRPCTestServer(t *testing.T, beforeServe func(server *grpc.Server), opts ...grpc.ServerOption) (*grpc.Server, net.Addr) {
server := grpc.NewServer(opts...)
lis, err := net.Listen("tcp", "localhost:0")
Expand Down
39 changes: 39 additions & 0 deletions exporter/jaegerexporter/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jaegerexporter

import (
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)

var (
mLastConnectionState = stats.Int64("jaegerexporter_conn_state", "Last connection state: 0 = Idle, 1 = Connecting, 2 = Ready, 3 = TransientFailure, 4 = Shutdown", stats.UnitDimensionless)
vLastConnectionState = &view.View{
Name: mLastConnectionState.Name(),
Measure: mLastConnectionState,
Description: mLastConnectionState.Description(),
Aggregation: view.LastValue(),
TagKeys: []tag.Key{
tag.MustNewKey("exporter_name"),
},
}
)

// MetricViews return the metrics views according to given telemetry level.
func MetricViews() []*view.View {
return []*view.View{vLastConnectionState}
}
32 changes: 32 additions & 0 deletions exporter/jaegerexporter/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jaegerexporter

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestProcessorMetrics(t *testing.T) {
expectedViewNames := []string{
"jaegerexporter_conn_state",
}

views := MetricViews()
for i, viewName := range expectedViewNames {
assert.Equal(t, viewName, views[i].Name)
}
}
12 changes: 9 additions & 3 deletions service/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/exporter/jaegerexporter"
"go.opentelemetry.io/collector/internal/collector/telemetry"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/processor"
Expand Down Expand Up @@ -62,12 +63,17 @@ func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes u
}

var views []*view.View
views = append(views, obsreport.Configure(level)...)
views = append(views, processor.MetricViews()...)
views = append(views, batchprocessor.MetricViews()...)
views = append(views, fluentobserv.MetricViews()...)
views = append(views, jaegerexporter.MetricViews()...)
views = append(views, kafkareceiver.MetricViews()...)
views = append(views, obsreport.Configure(level)...)
views = append(views, obsreport.Configure(level)...)
views = append(views, processMetricsViews.Views()...)
views = append(views, fluentobserv.MetricViews()...)
views = append(views, processor.MetricViews()...)
views = append(views, processor.MetricViews()...)
views = append(views, queuedprocessor.MetricViews()...)

tel.views = views
if err = view.Register(views...); err != nil {
return err
Expand Down

0 comments on commit 745f47c

Please sign in to comment.