Skip to content

Commit

Permalink
fixup: minor code style fixes and clearer variable definitions
Browse files Browse the repository at this point in the history
  • Loading branch information
whuang1202 committed Aug 9, 2022
1 parent 92bb972 commit 38bfaa6
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 25 deletions.
Empty file removed default.db
Empty file.
20 changes: 7 additions & 13 deletions edx_event_bus_kafka/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,13 @@
Configuration loading and validation.
"""

import warnings
from typing import Optional

import warnings
import attr
from confluent_kafka.schema_registry import SchemaRegistryClient
from django.conf import settings


@attr.s
class CommonConfig:
settings = attr.ib(type=dict)


def create_schema_registry_client() -> Optional[SchemaRegistryClient]:
"""
Create a schema registry client from common settings.
Expand All @@ -32,7 +26,7 @@ def create_schema_registry_client() -> Optional[SchemaRegistryClient]:
'basic.auth.user.info': f"{key}:{secret}",
})

def load_common_settings(settings) -> Optional[dict]:
def load_common_settings() -> Optional[dict]:
"""
Load common settings, a base for either producer or consumer configuration.
"""
Expand All @@ -41,19 +35,19 @@ def load_common_settings(settings) -> Optional[dict]:
warnings.warn("Cannot configure event-bus-kafka: Missing setting EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS")
return None

settings = {
base_settings = {
'bootstrap.servers': bootstrap_servers,
}

key = getattr(settings, 'EVENT_BUS_KAFKA_API_KEY', None)
secret = getattr(settings, 'EVENT_BUS_KAFKA_API_SECRET', None)
key = getattr(base_settings, 'EVENT_BUS_KAFKA_API_KEY', None)
secret = getattr(base_settings, 'EVENT_BUS_KAFKA_API_SECRET', None)

if key and secret:
settings.update({
base_settings.update({
'sasl.mechanism': 'PLAIN',
'security.protocol': 'SASL_SSL',
'sasl.username': key,
'sasl.password': secret,
})

return settings
return base_settings
21 changes: 10 additions & 11 deletions edx_event_bus_kafka/publishing/event_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,16 @@

import json
import logging
import warnings
from functools import lru_cache
from typing import Any, List, Optional

from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry.avro import AvroSerializer
from django.conf import settings
from edx_event_bus_kafka.config import create_schema_registry_client, load_common_settings
from openedx_events.event_bus.avro.serializer import AvroSignalSerializer
from openedx_events.tooling import OpenEdxPublicSignal

from edx_event_bus_kafka.config import create_schema_registry_client, load_common_settings

logger = logging.getLogger(__name__)

# CloudEvent standard name for the event type header, see
Expand Down Expand Up @@ -135,10 +134,11 @@ def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str) -
if schema_registry_client is None:
return None

producer_settings = load_common_settings(settings)

signal_serializer = get_serializer(signal)
producer_settings = load_common_settings()
if producer_settings is None:
return None

signal_serializer = get_serializer(signal)
def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument
"""Tells Avro how to turn objects into dictionaries."""
return signal_serializer.to_dict(event_data)
Expand All @@ -155,11 +155,10 @@ def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument
to_dict=inner_to_dict,
)

if producer_settings is not None:
producer_settings.update({
'key.serializer': key_serializer,
'value.serializer': value_serializer,
})
producer_settings.update({
'key.serializer': key_serializer,
'value.serializer': value_serializer,
})

return SerializingProducer(producer_settings)

Expand Down
1 change: 0 additions & 1 deletion edx_event_bus_kafka/publishing/test_event_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ def test_get_producer_for_signal(self):
assert ep.get_producer_for_signal(signal, 'user.id') is None
assert len(caught_warnings) == 1
assert str(caught_warnings[0].message).startswith("Cannot configure event-bus-kafka: Missing setting ")
print (caught_warnings[0].message)

# Creation succeeds when all settings are present
with override_settings(
Expand Down

0 comments on commit 38bfaa6

Please sign in to comment.