Skip to content

Commit

Permalink
Allow Kafka producer headers to be dict or list (#1655)
Browse files Browse the repository at this point in the history
* Allow Kafka producer headers to be dict or list

* modify kafka context getter helper methods to work on dict and list

---------

Co-authored-by: Shalev Roda <65566801+shalevr@users.noreply.github.com>
Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
  • Loading branch information
3 people authored and tsloughter committed Mar 13, 2023
1 parent 7a5d3f9 commit 5ad31c1
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 12 deletions.
17 changes: 8 additions & 9 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Add metrics instrumentation for sqlalchemy
([#1645](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1645))

- Fix exception in Urllib3 when dealing with filelike body.
([#1399](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1399))

- Fix httpx resource warnings
([#1695](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1695))

### Added

- Add connection attributes to sqlalchemy connect span
([#1608](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1608))
- Add support for enabling Redis sanitization from environment variable
([#1690](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1690))
- Add metrics instrumentation for sqlalchemy
([#1645](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1645))

### Fixed

- Fix Flask instrumentation to only close the span if it was created by the same thread.
([#1654](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1654))
- Fix confluent-kafka instrumentation by allowing Producer headers to be dict or list
([#1655](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1655))
- `opentelemetry-instrumentation-system-metrics` Fix initialization of the instrumentation class when configuration is provided
([#1438](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1439))
- Fix exception in Urllib3 when dealing with filelike body.
([#1399](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1399))
- Fix httpx resource warnings
([#1695](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1695))

## Version 1.16.0/0.37b0 (2023-02-17)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,26 @@ class KafkaContextGetter(textmap.Getter):
def get(self, carrier: textmap.CarrierT, key: str) -> Optional[List[str]]:
if carrier is None:
return None
for item_key, value in carrier:

carrier_items = carrier
if isinstance(carrier, dict):
carrier_items = carrier.items()

for item_key, value in carrier_items:
if item_key == key:
if value is not None:
return [value.decode()]

return None

def keys(self, carrier: textmap.CarrierT) -> List[str]:
if carrier is None:
return []
return [key for (key, value) in carrier]

carrier_items = carrier
if isinstance(carrier, dict):
carrier_items = carrier.items()
return [key for (key, value) in carrier_items]


class KafkaContextSetter(textmap.Setter):
Expand All @@ -60,7 +70,12 @@ def set(self, carrier: textmap.CarrierT, key: str, value: str) -> None:

if value:
value = value.encode()
carrier.append((key, value))

if isinstance(carrier, list):
carrier.append((key, value))

if isinstance(carrier, dict):
carrier[key] = value


_kafka_getter = KafkaContextGetter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
ProxiedConsumer,
ProxiedProducer,
)
from opentelemetry.instrumentation.confluent_kafka.utils import (
KafkaContextGetter,
KafkaContextSetter,
)


class TestConfluentKafka(TestCase):
Expand Down Expand Up @@ -73,3 +77,30 @@ def test_consumer_commit_method_exists(self) -> None:
consumer = instrumentation.instrument_consumer(consumer)
self.assertEqual(consumer.__class__, ProxiedConsumer)
self.assertTrue(hasattr(consumer, "commit"))

def test_context_setter(self) -> None:
context_setter = KafkaContextSetter()

carrier_dict = {"key1": "val1"}
context_setter.set(carrier_dict, "key2", "val2")
self.assertGreaterEqual(
carrier_dict.items(), {"key2": "val2".encode()}.items()
)

carrier_list = [("key1", "val1")]
context_setter.set(carrier_list, "key2", "val2")
self.assertTrue(("key2", "val2".encode()) in carrier_list)

def test_context_getter(self) -> None:
context_setter = KafkaContextSetter()
context_getter = KafkaContextGetter()

carrier_dict = {}
context_setter.set(carrier_dict, "key1", "val1")
self.assertEqual(context_getter.get(carrier_dict, "key1"), ["val1"])
self.assertEqual(["key1"], context_getter.keys(carrier_dict))

carrier_list = []
context_setter.set(carrier_list, "key1", "val1")
self.assertEqual(context_getter.get(carrier_list, "key1"), ["val1"])
self.assertEqual(["key1"], context_getter.keys(carrier_list))

0 comments on commit 5ad31c1

Please sign in to comment.