Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log and report kafka metrics after producing them #155

Merged
merged 1 commit into from
Apr 11, 2024

Conversation

agbpatro
Copy link
Collaborator

Description

Currently we report on kafka metrics on .Produce(). However, there are scenarios that kafka delivery fails and we don't record those failures. Handling reporting of metrics based on producer events instead

Type of change

Please select all options that apply to this change:

  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Bug fix (non-breaking change which fixes an issue)
  • Documentation update

Checklist:

Confirm you have completed the following steps:

  • My code follows the style of this project.
  • I have performed a self-review of my code.
  • I have made corresponding updates to the documentation.
  • I have added/updated unit tests to cover my changes.
  • I have added/updated integration tests to cover my changes.

@agbpatro agbpatro force-pushed the report_kafka_metrics_after_delivery branch 3 times, most recently from ad01a81 to 6eeab9c Compare April 10, 2024 22:47
@@ -74,21 +72,17 @@ func (p *Producer) Produce(entry *telemetry.Record) {
Key: []byte(entry.Vin),
Headers: headersFromRecord(entry),
Timestamp: time.Now(),
Opaque: entry,
}

// Note: confluent kafka supports the concept of one channel per connection, so we could add those here and get rid of reliableAckWorkers
// ex.: https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/producer_custom_channel_example/producer_custom_channel_example.go#L79
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there's a better place for this comment now?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I would prefer to keep it at this location since we will update how p.kafkaProducer.Produce(msg, nil) works. eventually, it will be powered through eventDeliveryChannel

entry.ProduceTime = time.Now()
if err := p.kafkaProducer.Produce(msg, nil); err != nil {
p.logError(err)
return
}

metricsRegistry.produceCount.Inc(map[string]string{"record_type": entry.TxType})
metricsRegistry.byteTotal.Add(int64(entry.Length()), map[string]string{"record_type": entry.TxType})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we keep these metrics and add the new ones in the producer-events handler (with different names), so we can distinguish between things we believe we produced and things that got confirmed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

@@ -113,9 +107,14 @@ func (p *Producer) handleProducerEvents(ackChan chan (*telemetry.Record)) {
case kafka.Error:
p.logError(fmt.Errorf("producer_error %v", ev))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably add a metrics here to count the kafka errors as seen by the producer. Will help see problems at a glance.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log error increments error metrics

metricsRegistry.errorCount.Inc(map[string]string{})

@agbpatro agbpatro force-pushed the report_kafka_metrics_after_delivery branch from 6eeab9c to 3a9954a Compare April 10, 2024 22:56
Copy link
Collaborator

@vmallet vmallet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully we address the reliableAck situation sooner than later because as it is, the code feels a bit contorted in the way that reliableAck is not really used, but a little bit for the metrics / logging part..

Name: "kafka_produce_total_bytes",
Help: "The number of bytes produced to Kafka.",
Labels: []string{"record_type"},
})

metricsRegistry.produceAckCount = metricsCollector.RegisterCounter(adapter.CollectorOptions{
Name: "kafka_produce_ack_total",
Help: "The number of records produced to Kafka.",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... for which we got an ACK?


metricsRegistry.bytesAckTotal = metricsCollector.RegisterCounter(adapter.CollectorOptions{
Name: "kafka_produce_ack_total_bytes",
Help: "The number of bytes produced to Kafka.",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... for which we got an ACK?

@agbpatro agbpatro force-pushed the report_kafka_metrics_after_delivery branch from 3a9954a to 0575a62 Compare April 11, 2024 00:13
Currently we report on kafka metrics on .Produce().
However, there are scenarios when kafka delivery fails and
we don't record those failures.
Handling reporting of metrics based on producer events instead
@agbpatro agbpatro force-pushed the report_kafka_metrics_after_delivery branch from 0575a62 to cb6afce Compare April 11, 2024 00:13
@agbpatro agbpatro merged commit 86315ff into main Apr 11, 2024
5 checks passed
@agbpatro agbpatro deleted the report_kafka_metrics_after_delivery branch April 11, 2024 00:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants