Skip to content

Commit

Permalink
At stop_consumer function pass additional config to AdminClient (#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bohatman authored Aug 1, 2024
1 parent 53fb139 commit 4356830
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions src/ConfluentKafkaLibrary/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(
server=server,
port=port,
**kwargs)

self.kwargs = kwargs
if not isinstance(topics, list):
topics = [topics]
self.consumer.subscribe_topic(self.group_id, topics=topics)
Expand Down Expand Up @@ -65,7 +65,7 @@ def stop_consumer(self):
self.join()
self.consumer.unsubscribe(self.group_id)
self.consumer.close_consumer(self.group_id)
admin_client = AdminClient({'bootstrap.servers': f'{self.server}:{self.port}'})
admin_client = AdminClient({'bootstrap.servers': f'{self.server}:{self.port}', **self.kwargs})
response = admin_client.delete_consumer_groups([self.group_id], request_timeout=10)
try:
response[self.group_id].result()
Expand Down

0 comments on commit 4356830

Please sign in to comment.