Skip to content

Commit

Permalink
Do not set opaque object if reliable ack workers are not set. This fi…
Browse files Browse the repository at this point in the history
…xes a potential memory leak issue (#153)

Kafka allows you to send opaque entry which are used for post processing flows. we should only set that field if reliable ack workers are configured
  • Loading branch information
agbpatro committed Apr 9, 2024
1 parent 0431c0f commit 14e2c3d
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion datastore/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Producer struct {
metricsCollector metrics.MetricCollector
logger *logrus.Logger
airbrakeHandler *airbrake.AirbrakeHandler
reliableAck bool
}

// Metrics stores metrics reported from this package
Expand Down Expand Up @@ -53,6 +54,7 @@ func NewProducer(config *kafka.ConfigMap, namespace string, reliableAckWorkers i
prometheusEnabled: prometheusEnabled,
logger: logger,
airbrakeHandler: airbrakeHandler,
reliableAck: reliableAckWorkers > 0,
}

for i := 0; i < reliableAckWorkers; i++ {
Expand All @@ -72,11 +74,13 @@ func (p *Producer) Produce(entry *telemetry.Record) {
Key: []byte(entry.Vin),
Headers: headersFromRecord(entry),
Timestamp: time.Now(),
Opaque: entry,
}

// Note: confluent kafka supports the concept of one channel per connection, so we could add those here and get rid of reliableAckWorkers
// ex.: https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/producer_custom_channel_example/producer_custom_channel_example.go#L79
if p.reliableAck {
msg.Opaque = entry
}
entry.ProduceTime = time.Now()
if err := p.kafkaProducer.Produce(msg, nil); err != nil {
p.logError(err)
Expand Down

0 comments on commit 14e2c3d

Please sign in to comment.