Skip to content

Commit

Permalink
docs: replace TODOs with comments (#9)
Browse files Browse the repository at this point in the history
Replace some TODOs with comments about what could
be done in the future if and when it is needed. Also add
some additional comments.
  • Loading branch information
robrap authored Jul 20, 2022
1 parent f0f7386 commit 8e4bf26
Showing 1 changed file with 12 additions and 4 deletions.
16 changes: 12 additions & 4 deletions edx_event_bus_kafka/publishing/event_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ def descend_avro_schema(serializer_schema: dict, field_path: List[str]) -> dict:
Returns:
Schema for some field
TODO: Move to openedx_events.event_bus.avro.serializer?
Note: Avro helpers could be moved to openedx_events.event_bus.avro.serializer to be
used for other event bus implementations other than Kafka.
"""
subschema = serializer_schema
for field_name in field_path:
Expand Down Expand Up @@ -114,9 +115,6 @@ def get_serializer(signal: OpenEdxPublicSignal) -> AvroSignalSerializer:
return AvroSignalSerializer(signal)


# TODO: Cache this, but in a way that still allows changes to settings
# via remote-config (and in particular does not result in mixed
# cache/uncached configuration).
def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str) -> Optional[SerializingProducer]:
"""
Create the producer for a signal and a key field path.
Expand All @@ -127,6 +125,11 @@ def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str) -
signal: The OpenEdxPublicSignal to make a producer for
event_key_field: Path to the event data field to use as the event key (period-delimited
string naming the dictionary keys to descend)
Performance note:
This could be cached, but requires care such that it allows changes to settings via
remote-config (and in particular does not result in mixed cache/uncached configuration).
This complexity is being deferred until this becomes a performance issue.
"""
if schema_registry_url := getattr(settings, 'SCHEMA_REGISTRY_URL', None):
schema_registry_config = {
Expand Down Expand Up @@ -188,6 +191,9 @@ def on_event_deliver(err, evt):
Arguments:
err: Error if event production failed
evt: Event that was delivered (or failed to be delivered)
Note: This is meant to be temporary until we implement
more rigorous error handling.
"""
if err is not None:
logger.warning(f"Event delivery failed: {err!r}")
Expand Down Expand Up @@ -218,4 +224,6 @@ def send_to_event_bus(signal: OpenEdxPublicSignal, topic: str, event_key_field:
producer.produce(topic, key=event_key, value=event_data,
on_delivery=on_event_deliver,
headers={EVENT_TYPE_HEADER_KEY: signal.event_type})
# TODO (EventBus): Investigate poll() vs. flush(), and other related settings:
# See https://github.com/openedx/event-bus-kafka/issues/10
producer.poll() # wait indefinitely for the above event to either be delivered or fail

0 comments on commit 8e4bf26

Please sign in to comment.