Skip to content

Commit

Permalink
fix kafka: wait for metadata
Browse files Browse the repository at this point in the history
Kafka's instance metadata could be unavailable (because it's being filled asynchronously). extract_send_partition() is based on a metadata, so it may return `None` for partition and later cause all type of warning messages (e.g. `Invalid type NoneType for attribute value. Expected one of ['bool', 'str', 'bytes', 'int', 'float'] or a sequence of those types`).
The proposed fix makes sure metadata is pre-populated (based on https://github.com/dpkp/kafka-python/blob/4d598055dab7da99e41bfcceffa8462b32931cdd/kafka/producer/kafka.py#L579).
I'm just not sure if we should wrap `_wait_on_metadata` into try\except, maybe just passing Exception to the caller would be a better idea...
  • Loading branch information
rayrapetyan committed Sep 1, 2022
1 parent 8107ad4 commit c5a026f
Showing 1 changed file with 5 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from logging import getLogger
from typing import Callable, Dict, List, Optional

from kafka.errors import errors as KafkaErrors
from kafka.record.abc import ABCRecord

from opentelemetry import context, propagate, trace
Expand Down Expand Up @@ -146,6 +147,10 @@ def _traced_send(func, instance, args, kwargs):
kwargs["headers"] = headers

topic = KafkaPropertiesExtractor.extract_send_topic(args, kwargs)
try:
instance._wait_on_metadata(topic, instance.config['max_block_ms'] / 1000.0)
except KafkaErrors.BrokerResponseError as kafka_exception:
_LOG.exception(kafka_exception)
bootstrap_servers = KafkaPropertiesExtractor.extract_bootstrap_servers(
instance
)
Expand Down

0 comments on commit c5a026f

Please sign in to comment.