Skip to content

Commit

Permalink
add metrics to kafkareceiver (#3452)
Browse files Browse the repository at this point in the history
  • Loading branch information
Frefreak authored Jun 16, 2021
1 parent 9d5a51e commit 46ed756
Show file tree
Hide file tree
Showing 8 changed files with 468 additions and 6 deletions.
34 changes: 30 additions & 4 deletions receiver/kafkareceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ func WithTracesUnmarshalers(tracesUnmarshalers ...TracesUnmarshaler) FactoryOpti
}
}

// WithMetricsUnmarshalers adds MetricsUnmarshalers.
func WithMetricsUnmarshalers(metricsUnmarshalers ...MetricsUnmarshaler) FactoryOption {
return func(factory *kafkaReceiverFactory) {
for _, unmarshaler := range metricsUnmarshalers {
factory.metricsUnmarshalers[unmarshaler.Encoding()] = unmarshaler
}
}
}

// WithLogsUnmarshalers adds LogsUnmarshalers.
func WithLogsUnmarshalers(logsUnmarshalers ...LogsUnmarshaler) FactoryOption {
return func(factory *kafkaReceiverFactory) {
Expand All @@ -66,8 +75,9 @@ func WithLogsUnmarshalers(logsUnmarshalers ...LogsUnmarshaler) FactoryOption {
// NewFactory creates Kafka receiver factory.
func NewFactory(options ...FactoryOption) component.ReceiverFactory {
f := &kafkaReceiverFactory{
tracesUnmarshalers: defaultTracesUnmarshalers(),
logsUnmarshalers: defaultLogsUnmarshalers(),
tracesUnmarshalers: defaultTracesUnmarshalers(),
metricsUnmarshalers: defaultMetricsUnmarshalers(),
logsUnmarshalers: defaultLogsUnmarshalers(),
}
for _, o := range options {
o(f)
Expand All @@ -76,6 +86,7 @@ func NewFactory(options ...FactoryOption) component.ReceiverFactory {
typeStr,
createDefaultConfig,
receiverhelper.WithTraces(f.createTracesReceiver),
receiverhelper.WithMetrics(f.createMetricsReceiver),
receiverhelper.WithLogs(f.createLogsReceiver),
)
}
Expand All @@ -99,8 +110,9 @@ func createDefaultConfig() config.Receiver {
}

type kafkaReceiverFactory struct {
tracesUnmarshalers map[string]TracesUnmarshaler
logsUnmarshalers map[string]LogsUnmarshaler
tracesUnmarshalers map[string]TracesUnmarshaler
metricsUnmarshalers map[string]MetricsUnmarshaler
logsUnmarshalers map[string]LogsUnmarshaler
}

func (f *kafkaReceiverFactory) createTracesReceiver(
Expand All @@ -117,6 +129,20 @@ func (f *kafkaReceiverFactory) createTracesReceiver(
return r, nil
}

func (f *kafkaReceiverFactory) createMetricsReceiver(
_ context.Context,
set component.ReceiverCreateSettings,
cfg config.Receiver,
nextConsumer consumer.Metrics,
) (component.MetricsReceiver, error) {
c := cfg.(*Config)
r, err := newMetricsReceiver(*c, set, f.metricsUnmarshalers, nextConsumer)
if err != nil {
return nil, err
}
return r, nil
}

func (f *kafkaReceiverFactory) createLogsReceiver(
_ context.Context,
set component.ReceiverCreateSettings,
Expand Down
55 changes: 55 additions & 0 deletions receiver/kafkareceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,50 @@ func TestWithTracesUnmarshalers(t *testing.T) {
})
}

func TestCreateMetricsReceiver(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
f := kafkaReceiverFactory{metricsUnmarshalers: defaultMetricsUnmarshalers()}
r, err := f.createMetricsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), cfg, nil)
// no available broker
require.Error(t, err)
assert.Nil(t, r)
}

func TestCreateMetricsReceiver_error(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.ProtocolVersion = "2.0.0"
// disable contacting broker at startup
cfg.Metadata.Full = false
f := kafkaReceiverFactory{metricsUnmarshalers: defaultMetricsUnmarshalers()}
r, err := f.createMetricsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), cfg, nil)
require.NoError(t, err)
assert.NotNil(t, r)
}

func TestWithMetricsUnmarshalers(t *testing.T) {
unmarshaler := &customMetricsUnmarshaler{}
f := NewFactory(WithMetricsUnmarshalers(unmarshaler))
cfg := createDefaultConfig().(*Config)
// disable contacting broker
cfg.Metadata.Full = false
cfg.ProtocolVersion = "2.0.0"

t.Run("custom_encoding", func(t *testing.T) {
cfg.Encoding = unmarshaler.Encoding()
receiver, err := f.CreateMetricsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), cfg, nil)
require.NoError(t, err)
require.NotNil(t, receiver)
})
t.Run("default_encoding", func(t *testing.T) {
cfg.Encoding = defaultEncoding
receiver, err := f.CreateMetricsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), cfg, nil)
require.NoError(t, err)
assert.NotNil(t, receiver)
})
}

func TestCreateLogsReceiver(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
Expand Down Expand Up @@ -127,6 +171,9 @@ func TestWithLogsUnmarshalers(t *testing.T) {
type customTracesUnmarshaler struct {
}

type customMetricsUnmarshaler struct {
}

type customLogsUnmarshaler struct {
}

Expand All @@ -140,6 +187,14 @@ func (c customTracesUnmarshaler) Encoding() string {
return "custom"
}

func (c customMetricsUnmarshaler) Unmarshal([]byte) (pdata.Metrics, error) {
panic("implement me")
}

func (c customMetricsUnmarshaler) Encoding() string {
return "custom"
}

func (c customLogsUnmarshaler) Unmarshal([]byte) (pdata.Logs, error) {
panic("implement me")
}
Expand Down
145 changes: 145 additions & 0 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@ type kafkaTracesConsumer struct {
logger *zap.Logger
}

// kafkaMetricsConsumer uses sarama to consume and handle messages from kafka.
type kafkaMetricsConsumer struct {
id config.ComponentID
consumerGroup sarama.ConsumerGroup
nextConsumer consumer.Metrics
topics []string
cancelConsumeLoop context.CancelFunc
unmarshaler MetricsUnmarshaler

logger *zap.Logger
}

// kafkaLogsConsumer uses sarama to consume and handle messages from kafka.
type kafkaLogsConsumer struct {
id config.ComponentID
Expand All @@ -62,6 +74,7 @@ type kafkaLogsConsumer struct {
}

var _ component.Receiver = (*kafkaTracesConsumer)(nil)
var _ component.Receiver = (*kafkaMetricsConsumer)(nil)
var _ component.Receiver = (*kafkaLogsConsumer)(nil)

func newTracesReceiver(config Config, set component.ReceiverCreateSettings, unmarshalers map[string]TracesUnmarshaler, nextConsumer consumer.Traces) (*kafkaTracesConsumer, error) {
Expand Down Expand Up @@ -136,6 +149,77 @@ func (c *kafkaTracesConsumer) Shutdown(context.Context) error {
return c.consumerGroup.Close()
}

func newMetricsReceiver(config Config, set component.ReceiverCreateSettings, unmarshalers map[string]MetricsUnmarshaler, nextConsumer consumer.Metrics) (*kafkaMetricsConsumer, error) {
unmarshaler := unmarshalers[config.Encoding]
if unmarshaler == nil {
return nil, errUnrecognizedEncoding
}

c := sarama.NewConfig()
c.ClientID = config.ClientID
c.Metadata.Full = config.Metadata.Full
c.Metadata.Retry.Max = config.Metadata.Retry.Max
c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
if config.ProtocolVersion != "" {
version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
if err != nil {
return nil, err
}
c.Version = version
}
if err := kafkaexporter.ConfigureAuthentication(config.Authentication, c); err != nil {
return nil, err
}
client, err := sarama.NewConsumerGroup(config.Brokers, config.GroupID, c)
if err != nil {
return nil, err
}
return &kafkaMetricsConsumer{
id: config.ID(),
consumerGroup: client,
topics: []string{config.Topic},
nextConsumer: nextConsumer,
unmarshaler: unmarshaler,
logger: set.Logger,
}, nil
}

func (c *kafkaMetricsConsumer) Start(context.Context, component.Host) error {
ctx, cancel := context.WithCancel(context.Background())
c.cancelConsumeLoop = cancel
metricsConsumerGroup := &metricsConsumerGroupHandler{
id: c.id,
logger: c.logger,
unmarshaler: c.unmarshaler,
nextConsumer: c.nextConsumer,
ready: make(chan bool),
obsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: c.id, Transport: transport}),
}
go c.consumeLoop(ctx, metricsConsumerGroup)
<-metricsConsumerGroup.ready
return nil
}

func (c *kafkaMetricsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error {
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := c.consumerGroup.Consume(ctx, c.topics, handler); err != nil {
c.logger.Error("Error from consumer", zap.Error(err))
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
c.logger.Info("Consumer stopped", zap.Error(ctx.Err()))
return ctx.Err()
}
}
}
func (c *kafkaMetricsConsumer) Shutdown(context.Context) error {
c.cancelConsumeLoop()
return c.consumerGroup.Close()
}

func newLogsReceiver(config Config, set component.ReceiverCreateSettings, unmarshalers map[string]LogsUnmarshaler, nextConsumer consumer.Logs) (*kafkaLogsConsumer, error) {
unmarshaler := unmarshalers[config.Encoding]
if unmarshaler == nil {
Expand Down Expand Up @@ -220,6 +304,18 @@ type tracesConsumerGroupHandler struct {
obsrecv *obsreport.Receiver
}

type metricsConsumerGroupHandler struct {
id config.ComponentID
unmarshaler MetricsUnmarshaler
nextConsumer consumer.Metrics
ready chan bool
readyCloser sync.Once

logger *zap.Logger

obsrecv *obsreport.Receiver
}

type logsConsumerGroupHandler struct {
id config.ComponentID
unmarshaler LogsUnmarshaler
Expand All @@ -233,6 +329,7 @@ type logsConsumerGroupHandler struct {
}

var _ sarama.ConsumerGroupHandler = (*tracesConsumerGroupHandler)(nil)
var _ sarama.ConsumerGroupHandler = (*metricsConsumerGroupHandler)(nil)
var _ sarama.ConsumerGroupHandler = (*logsConsumerGroupHandler)(nil)

func (c *tracesConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
Expand Down Expand Up @@ -283,6 +380,54 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
return nil
}

func (c *metricsConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
c.readyCloser.Do(func() {
close(c.ready)
})
statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.id.Name())}
_ = stats.RecordWithTags(session.Context(), statsTags, statPartitionStart.M(1))
return nil
}

func (c *metricsConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.id.Name())}
_ = stats.RecordWithTags(session.Context(), statsTags, statPartitionClose.M(1))
return nil
}

func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
c.logger.Info("Starting consumer group", zap.Int32("partition", claim.Partition()))
for message := range claim.Messages() {
c.logger.Debug("Kafka message claimed",
zap.String("value", string(message.Value)),
zap.Time("timestamp", message.Timestamp),
zap.String("topic", message.Topic))
session.MarkMessage(message, "")

ctx := obsreport.ReceiverContext(session.Context(), c.id, transport)
ctx = c.obsrecv.StartMetricsOp(ctx)
statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.id.String())}
_ = stats.RecordWithTags(ctx, statsTags,
statMessageCount.M(1),
statMessageOffset.M(message.Offset),
statMessageOffsetLag.M(claim.HighWaterMarkOffset()-message.Offset-1))

metrics, err := c.unmarshaler.Unmarshal(message.Value)
if err != nil {
c.logger.Error("failed to unmarshal message", zap.Error(err))
return err
}

metricCount := metrics.MetricCount()
err = c.nextConsumer.ConsumeMetrics(session.Context(), metrics)
c.obsrecv.EndMetricsOp(ctx, c.unmarshaler.Encoding(), metricCount, err)
if err != nil {
return err
}
}
return nil
}

func (c *logsConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
c.readyCloser.Do(func() {
close(c.ready)
Expand Down
Loading

0 comments on commit 46ed756

Please sign in to comment.