diff --git a/CHANGELOG.rst b/CHANGELOG.rst index a883efe..949afc7 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -16,6 +16,15 @@ Unreleased * +[0.3.0] - 2022-08-10 +~~~~~~~~~~~~~~~~~~~~ + +Updated +_______ + +* Moved configuration onto separate file. +* Updated configuration settings to have EVENT_BUS_KAFKA prefix. + [0.2.0] - 2022-08-09 ~~~~~~~~~~~~~~~~~~~~ diff --git a/edx_event_bus_kafka/__init__.py b/edx_event_bus_kafka/__init__.py index 63623c6..69e9158 100644 --- a/edx_event_bus_kafka/__init__.py +++ b/edx_event_bus_kafka/__init__.py @@ -2,4 +2,4 @@ Kafka implementation for Open edX event bus. """ -__version__ = '0.2.0' +__version__ = '0.3.0' diff --git a/edx_event_bus_kafka/config.py b/edx_event_bus_kafka/config.py new file mode 100644 index 0000000..c2fed42 --- /dev/null +++ b/edx_event_bus_kafka/config.py @@ -0,0 +1,54 @@ +""" +Configuration loading and validation. +""" + +import warnings +from typing import Optional + +from confluent_kafka.schema_registry import SchemaRegistryClient +from django.conf import settings + + +def create_schema_registry_client() -> Optional[SchemaRegistryClient]: + """ + Create a schema registry client from common settings. + """ + url = getattr(settings, 'EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL', None) + if url is None: + warnings.warn("Cannot configure event-bus-kafka: Missing setting EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL") + return None + + key = getattr(settings, 'EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_KEY', '') + secret = getattr(settings, 'EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_SECRET', '') + + return SchemaRegistryClient({ + 'url': url, + 'basic.auth.user.info': f"{key}:{secret}", + }) + + +def load_common_settings() -> Optional[dict]: + """ + Load common settings, a base for either producer or consumer configuration. + """ + bootstrap_servers = getattr(settings, 'EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS', None) + if bootstrap_servers is None: + warnings.warn("Cannot configure event-bus-kafka: Missing setting EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS") + return None + + base_settings = { + 'bootstrap.servers': bootstrap_servers, + } + + key = getattr(base_settings, 'EVENT_BUS_KAFKA_API_KEY', None) + secret = getattr(base_settings, 'EVENT_BUS_KAFKA_API_SECRET', None) + + if key and secret: + base_settings.update({ + 'sasl.mechanism': 'PLAIN', + 'security.protocol': 'SASL_SSL', + 'sasl.username': key, + 'sasl.password': secret, + }) + + return base_settings diff --git a/edx_event_bus_kafka/publishing/event_producer.py b/edx_event_bus_kafka/publishing/event_producer.py index aa891e3..d9cfdaf 100644 --- a/edx_event_bus_kafka/publishing/event_producer.py +++ b/edx_event_bus_kafka/publishing/event_producer.py @@ -6,17 +6,16 @@ import json import logging -import warnings from functools import lru_cache from typing import Any, List, Optional from confluent_kafka import SerializingProducer -from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka.schema_registry.avro import AvroSerializer -from django.conf import settings from openedx_events.event_bus.avro.serializer import AvroSignalSerializer from openedx_events.tooling import OpenEdxPublicSignal +from edx_event_bus_kafka.config import create_schema_registry_client, load_common_settings + logger = logging.getLogger(__name__) # CloudEvent standard name for the event type header, see @@ -136,33 +135,14 @@ def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str) - remote-config (and in particular does not result in mixed cache/uncached configuration). This complexity is being deferred until this becomes a performance issue. """ - if schema_registry_url := getattr(settings, 'SCHEMA_REGISTRY_URL', None): - schema_registry_config = { - 'url': schema_registry_url, - 'basic.auth.user.info': f"{getattr(settings, 'SCHEMA_REGISTRY_API_KEY', '')}" - f":{getattr(settings, 'SCHEMA_REGISTRY_API_SECRET', '')}", - } - else: - warnings.warn("Cannot configure event-bus-kafka: Missing setting SCHEMA_REGISTRY_URL") + schema_registry_client = create_schema_registry_client() + if schema_registry_client is None: return None - if bootstrap_servers := getattr(settings, 'KAFKA_BOOTSTRAP_SERVERS', None): - producer_settings = { - 'bootstrap.servers': bootstrap_servers, - } - else: - warnings.warn("Cannot configure event-bus-kafka: Missing setting KAFKA_BOOTSTRAP_SERVERS") + producer_settings = load_common_settings() + if producer_settings is None: return None - if getattr(settings, 'KAFKA_API_KEY', None) and getattr(settings, 'KAFKA_API_SECRET', None): - producer_settings.update({ - 'sasl.mechanism': 'PLAIN', - 'security.protocol': 'SASL_SSL', - 'sasl.username': settings.KAFKA_API_KEY, - 'sasl.password': settings.KAFKA_API_SECRET, - }) - - schema_registry_client = SchemaRegistryClient(schema_registry_config) signal_serializer = get_serializer(signal) def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument diff --git a/edx_event_bus_kafka/publishing/test_event_producer.py b/edx_event_bus_kafka/publishing/test_event_producer.py index 2125980..f8cc5c1 100644 --- a/edx_event_bus_kafka/publishing/test_event_producer.py +++ b/edx_event_bus_kafka/publishing/test_event_producer.py @@ -74,10 +74,10 @@ def test_get_producer_for_signal_configured(self): """Creation succeeds when all settings are present.""" signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED with override_settings( - SCHEMA_REGISTRY_URL='http://localhost:12345', - SCHEMA_REGISTRY_API_KEY='some_key', - SCHEMA_REGISTRY_API_SECRET='some_secret', - KAFKA_BOOTSTRAP_SERVERS='http://localhost:54321', + EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345', + EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_KEY='some_key', + EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_SECRET='some_secret', + EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='http://localhost:54321', # include these just to maximize code coverage KAFKA_API_KEY='some_other_key', KAFKA_API_SECRET='some_other_secret',