Skip to content

Commit

Permalink
fix kafka admin error (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
JKL98ISR authored Apr 17, 2023
1 parent 267d3c2 commit b1efd86
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async def run(self, task: 'Task', session: AsyncSession, resources_provider: Res
if self.kafka_admin is None:
with self.lock:
if self.kafka_admin is None:
self.kafka_admin = AIOKafkaAdminClient(**resources_provider.kafka_settings.kafka_params)
self.kafka_admin = AIOKafkaAdminClient(**resources_provider.kafka_settings.kafka_admin_params)
await self.kafka_admin.start()

# Backward compatibility, remove in next release and replace with:
Expand Down
10 changes: 10 additions & 0 deletions backend/deepchecks_monitoring/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ def kafka_params(self):
'metadata_max_age_ms': self.kafka_max_metadata_age
}

@property
def kafka_admin_params(self):
"""Get connection parameters for kafka admin."""
return {
'bootstrap_servers': self.kafka_host,
'security_protocol': self.kafka_security_protocol,
'ssl_context': create_ssl_context(),
'metadata_max_age_ms': self.kafka_max_metadata_age
}


class DatabaseSettings(BaseDeepchecksSettings):
"""Database settings."""
Expand Down

0 comments on commit b1efd86

Please sign in to comment.