Skip to content

Commit

Permalink
feat: pass along event headers as metadata when emitting
Browse files Browse the repository at this point in the history
  • Loading branch information
Rebecca Graber committed Jan 31, 2023
1 parent f997905 commit 6d806b0
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 65 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ Change Log
Unreleased
**********

[3.8.0] - 2023-01-31
********************
Added
=====
* Producer now passes all metadata fields as headers
* Consumer emits events with the original metadata information (from the producer)

[3.7.0] - 2023-01-30
********************
Changed
Expand Down
9 changes: 6 additions & 3 deletions edx_event_bus_kafka/internal/consumer.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
"""
Core consumer and event-loop code.
"""

import logging
import time
import warnings
from datetime import datetime

from attrs import asdict, fields, filters
from django.conf import settings
from django.core.management.base import BaseCommand
from django.db import connection
from edx_django_utils.monitoring import record_exception, set_custom_attribute
from edx_toggles.toggles import SettingToggle
from openedx_events.data import EventsMetadata
from openedx_events.event_bus.avro.deserializer import AvroSignalDeserializer
from openedx_events.tooling import OpenEdxPublicSignal

from .config import get_full_topic, get_schema_registry_client, load_common_settings
from .utils import _get_metadata_from_headers

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -348,8 +350,9 @@ def emit_signals_from_message(self, msg):
f"Signal types do not match. Expected {self.signal.event_type}. "
f"Received message of type {event_type_str}."
)

send_results = self.signal.send_event(**msg.value())
event_metadata = asdict(_get_metadata_from_headers(headers),
filter=filters.exclude(fields(EventsMetadata).event_type))
send_results = self.signal.send_event_with_custom_metadata(**event_metadata, **msg.value())
# Raise an exception if any receivers errored out. This allows logging of the receivers
# along with partition, offset, etc. in record_event_consuming_error. Hopefully the
# receiver code is idempotent and we can just replay any messages that were involved.
Expand Down
41 changes: 1 addition & 40 deletions edx_event_bus_kafka/internal/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from openedx_events.tooling import OpenEdxPublicSignal

from .config import get_full_topic, get_schema_registry_client, load_common_settings
from .utils import _get_headers_from_metadata

logger = logging.getLogger(__name__)

Expand All @@ -35,18 +36,6 @@
except ImportError: # pragma: no cover
confluent_kafka = None

# CloudEvent standard names for the event headers, see
# https://github.com/cloudevents/spec/blob/v1.0.1/kafka-protocol-binding.md#325-example
EVENT_TYPE_HEADER_KEY = "ce_type"
ID_HEADER_KEY = "ce_id"
SOURCE_HEADER_KEY = "ce_source"
SOURCEHOST_HEADER_KEY = "sourcehost"
SPEC_VERSION_HEADER_KEY = "ce_specversion"
# The documentation is unclear as to which of the following two headers to use for content type, so for now
# use both
CONTENT_TYPE_HEADER_KEY = "content-type"
DATA_CONTENT_TYPE_HEADER_KEY = "ce_datacontenttype"


def record_producing_error(error, context):
"""
Expand Down Expand Up @@ -186,34 +175,6 @@ def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument
return key_serializer, value_serializer


def _get_headers_from_metadata(event_metadata: EventsMetadata):
"""
Create a dictionary of CloudEvent-compliant Kafka headers from an EventsMetadata object.
This method assumes the EventMetadata object was the one sent with the event data to the original signal handler.
Arguments:
event_metadata: An EventsMetadata object sent by an OpenEdxPublicSignal
Returns:
A dictionary of headers
"""
# Dictionary (or list of key/value tuples) where keys are strings and values are binary.
# CloudEvents specifies using UTF-8; that should be the default, but let's make it explicit.
return {
# The way EventMetadata is initialized none of these should ever be null.
# If it is we want the error to be raised.
EVENT_TYPE_HEADER_KEY: event_metadata.event_type.encode("utf-8"),
ID_HEADER_KEY: str(event_metadata.id).encode("utf-8"),
SOURCE_HEADER_KEY: event_metadata.source.encode("utf-8"),
SOURCEHOST_HEADER_KEY: event_metadata.sourcehost.encode("utf-8"),
# Always 1.0. See "Fields" in OEP-41
SPEC_VERSION_HEADER_KEY: b'1.0',
CONTENT_TYPE_HEADER_KEY: b'application/avro',
DATA_CONTENT_TYPE_HEADER_KEY: b'application/avro',
}


@attr.s(kw_only=True, repr=False)
class ProducingContext:
"""
Expand Down
12 changes: 7 additions & 5 deletions edx_event_bus_kafka/internal/tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import copy
from datetime import datetime
from typing import Optional
from unittest.mock import Mock, call, patch
from unittest.mock import ANY, Mock, call, patch
from uuid import uuid1

import ddt
import pytest
Expand Down Expand Up @@ -129,8 +130,9 @@ def setUp(self):
)
)
}
self.message_id_bytes = b'0000-0000'
self.message_id = self.message_id_bytes.decode('utf-8')
self.message_id = uuid1()
self.message_id_bytes = str(self.message_id).encode('utf-8')

self.signal_type_bytes = b'org.openedx.learning.auth.session.login.completed.v1'
self.signal_type = self.signal_type_bytes.decode('utf-8')
self.normal_message = FakeMessage(
Expand Down Expand Up @@ -258,15 +260,15 @@ def raise_exception():
assert "-- event details: " in exc_log_msg
assert "'partition': 2" in exc_log_msg
assert "'offset': 12345" in exc_log_msg
assert "'headers': [('ce_id', b'0000-0000'), " \
assert f"'headers': [('ce_id', b'{self.message_id}'), " \
"('ce_type', b'org.openedx.learning.auth.session.login.completed.v1')]" in exc_log_msg
assert "'key': b'\\x00\\x00\\x00\\x00\\x01\\x0cfoobob'" in exc_log_msg
assert "email='bob@foo.example'" in exc_log_msg

mock_set_custom_attribute.assert_has_calls(
[
call("kafka_topic", "prod-some-topic"),
call("kafka_message_id", "0000-0000"),
call("kafka_message_id", str(self.message_id)),
call("kafka_partition", 2),
call("kafka_offset", 12345),
call("kafka_event_type", "org.openedx.learning.auth.session.login.completed.v1"),
Expand Down
23 changes: 7 additions & 16 deletions edx_event_bus_kafka/internal/tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Test the event producer code.
"""

import datetime
import gc
import time
import warnings
Expand Down Expand Up @@ -144,8 +145,9 @@ def test_send_to_event_bus(self, mock_get_serializers):
EVENT_BUS_TOPIC_PREFIX='prod',
SERVICE_VARIANT='test',
):
now = datetime.datetime.now(datetime.timezone.utc)
metadata = EventsMetadata(event_type=self.signal.event_type,
minorversion=0)
time=now, sourcelib=(1, 2, 3))
producer_api = ep.create_producer()
with patch.object(producer_api, 'producer', autospec=True) as mock_producer:
producer_api.send(
Expand All @@ -162,7 +164,10 @@ def test_send_to_event_bus(self, mock_get_serializers):
'sourcehost': metadata.sourcehost.encode("utf8"),
'ce_specversion': b'1.0',
'content-type': b'application/avro',
'ce_datacontenttype': b'application/avro'
'ce_datacontenttype': b'application/avro',
'ce_time': now.isoformat().encode("utf8"),
'ce_minorversion': b'0',
'sourcelib': b'(1, 2, 3)',
}

mock_producer.produce.assert_called_once_with(
Expand Down Expand Up @@ -320,17 +325,3 @@ def test_serialize_and_produce_to_same_topic(self, mock_context):
# headers are tested elsewhere, we just want to verify the topics
headers=ANY,
)

def test_headers_from_event_metadata(self):
with override_settings(SERVICE_VARIANT='test'):
metadata = EventsMetadata(event_type=self.signal.event_type, minorversion=0)
headers = ep._get_headers_from_metadata(event_metadata=metadata) # pylint: disable=protected-access
self.assertDictEqual(headers, {
'ce_type': b'org.openedx.learning.auth.session.login.completed.v1',
'ce_id': str(metadata.id).encode("utf8"),
'ce_source': b'openedx/test/web',
'ce_specversion': b'1.0',
'sourcehost': metadata.sourcehost.encode("utf8"),
'content-type': b'application/avro',
'ce_datacontenttype': b'application/avro',
})
58 changes: 58 additions & 0 deletions edx_event_bus_kafka/internal/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""
Test header conversion utils
"""

from datetime import datetime
from uuid import uuid1

import attr
from django.test import TestCase, override_settings
from openedx_events.data import EventsMetadata

from edx_event_bus_kafka.internal.utils import _get_headers_from_metadata, _get_metadata_from_headers


class TestUtils(TestCase):
""" Tests for header conversion utils """
def test_headers_from_event_metadata(self):
with override_settings(SERVICE_VARIANT='test'):
metadata = EventsMetadata(event_type="org.openedx.learning.auth.session.login.completed.v1",
time=datetime.fromisoformat("2023-01-01T14:00:00+00:00"))
headers = _get_headers_from_metadata(event_metadata=metadata)
self.assertDictEqual(headers, {
'ce_type': b'org.openedx.learning.auth.session.login.completed.v1',
'ce_id': str(metadata.id).encode("utf8"),
'ce_source': b'openedx/test/web',
'ce_specversion': b'1.0',
'sourcehost': metadata.sourcehost.encode("utf8"),
'content-type': b'application/avro',
'ce_datacontenttype': b'application/avro',
'ce_time': b'2023-01-01T14:00:00+00:00',
'sourcelib': str(metadata.sourcelib).encode("utf8"),
'ce_minorversion': str(metadata.minorversion).encode("utf8")
})

def test_metadata_from_headers(self):
uuid = uuid1()
headers = [
('ce_type', b'org.openedx.learning.auth.session.login.completed.v1'),
('ce_id', str(uuid).encode("utf8")),
('ce_source', b'openedx/test/web'),
('ce_specversion', b'1.0'),
('sourcehost', b'testsource'),
('content-type', b'application/avro'),
('ce_datacontenttype', b'application/avro'),
('ce_time', b'2023-01-01T14:00:00+00:00'),
('sourcelib', b'(1,2,3)')
]
generated_metadata = _get_metadata_from_headers(headers)
expected_metadata = EventsMetadata(
event_type="org.openedx.learning.auth.session.login.completed.v1",
id=uuid,
minorversion=0,
source='openedx/test/web',
sourcehost='testsource',
time=datetime.fromisoformat("2023-01-01T14:00:00+00:00"),
sourcelib=(1, 2, 3),
)
self.assertDictEqual(attr.asdict(generated_metadata), attr.asdict(expected_metadata))
112 changes: 112 additions & 0 deletions edx_event_bus_kafka/internal/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""
Utilities for converting between message headers and EventsMetadata
"""

import logging
from collections import defaultdict
from datetime import datetime
from typing import List, Tuple
from uuid import UUID

from openedx_events.data import EventsMetadata

EVENT_TYPE_HEADER_KEY = "ce_type"
ID_HEADER_KEY = "ce_id"
SOURCE_HEADER_KEY = "ce_source"
SPEC_VERSION_HEADER_KEY = "ce_specversion"
TIME_HEADER_KEY = "ce_time"
MINORVERSION_HEADER_KEY = "ce_minorversion"

# not CloudEvent headers, so no "ce" prefix
SOURCEHOST_HEADER_KEY = "sourcehost"
SOURCELIB_HEADER_KEY = "sourcelib"

# The documentation is unclear as to which of the following two headers to use for content type, so for now
# use both
CONTENT_TYPE_HEADER_KEY = "content-type"
DATA_CONTENT_TYPE_HEADER_KEY = "ce_datacontenttype"

logger = logging.getLogger(__name__)

HEADER_KEY_TO_EVENTSMETADATA_FIELD = {
ID_HEADER_KEY: 'id',
EVENT_TYPE_HEADER_KEY: 'event_type',
MINORVERSION_HEADER_KEY: 'minorversion',
SOURCE_HEADER_KEY: 'source',
SOURCEHOST_HEADER_KEY: 'sourcehost',
TIME_HEADER_KEY: 'time',
SOURCELIB_HEADER_KEY: 'sourcelib'
}


def _get_metadata_from_headers(headers: List[Tuple]):
"""
Create an EventsMetadata object from the headers of a Kafka message
Arguments
headers: The list of headers returned from calling message.headers() on a consumed message
Returns
An instance of EventsMetadata with the parameters from the headers. Any fields missing from the headers
are set to the defaults of the EventsMetadata class
"""
# Transform list of (header, value) tuples to a {header: [list of values]} dict. Necessary as an intermediate
# step because there is no guarantee of unique headers in the list of tuples
headers_as_dict = defaultdict(list)
metadata_kwargs = {}
for key, value in headers:
headers_as_dict[key].append(value)

# go through all the headers we care about and set the appropriate field
for header_key, metadata_field in HEADER_KEY_TO_EVENTSMETADATA_FIELD.items():
header_values = headers_as_dict[header_key]
if len(header_values) == 0:
logger.warning(f"Missing required \"{header_key}\" header on message, will use EventMetadata default")
continue
if len(header_values) > 1:
logger.warning(f"Multiple \"{header_key}\" headers found on message, using the first one found")
header_value = header_values[0].decode("utf-8")
# some headers require conversion to the expected type
if header_key == ID_HEADER_KEY:
metadata_kwargs[metadata_field] = UUID(header_value)
elif header_key == TIME_HEADER_KEY:
metadata_kwargs[metadata_field] = datetime.fromisoformat(header_value)
elif header_key == SOURCELIB_HEADER_KEY:
# convert string "(x,y,z)" to tuple
metadata_kwargs[metadata_field] = tuple(int(x) for x in header_value[1:-1].split(","))
else:
# these are all string values and don't need any conversion step
metadata_kwargs[metadata_field] = header_value
return EventsMetadata(**metadata_kwargs)


def _get_headers_from_metadata(event_metadata: EventsMetadata):
"""
Create a dictionary of CloudEvent-compliant Kafka headers from an EventsMetadata object.
This method assumes the EventMetadata object was the one sent with the event data to the original signal handler.
Arguments:
event_metadata: An EventsMetadata object sent by an OpenEdxPublicSignal
Returns:
A dictionary of headers
"""
# Dictionary (or list of key/value tuples) where keys are strings and values are binary.
# CloudEvents specifies using UTF-8; that should be the default, but let's make it explicit.
return {
# The way EventMetadata is initialized none of these should ever be null.
# If it is we want the error to be raised.
EVENT_TYPE_HEADER_KEY: event_metadata.event_type.encode("utf-8"),
ID_HEADER_KEY: str(event_metadata.id).encode("utf-8"),
SOURCE_HEADER_KEY: event_metadata.source.encode("utf-8"),
SOURCEHOST_HEADER_KEY: event_metadata.sourcehost.encode("utf-8"),
TIME_HEADER_KEY: event_metadata.time.isoformat().encode("utf-8"),
MINORVERSION_HEADER_KEY: str(event_metadata.minorversion).encode("utf-8"),
SOURCELIB_HEADER_KEY: str(event_metadata.sourcelib).encode("utf-8"),

# Always 1.0. See "Fields" in OEP-41
SPEC_VERSION_HEADER_KEY: b'1.0',
CONTENT_TYPE_HEADER_KEY: b'application/avro',
DATA_CONTENT_TYPE_HEADER_KEY: b'application/avro',
}
2 changes: 1 addition & 1 deletion requirements/base.in
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@

Django # Web application framework
# openedx-events 3.1.0 defines producer interface
openedx-events>=3.1.0 # Events API
openedx-events>=4.2.0 # Events API
edx_django_utils
edx_toggles

0 comments on commit 6d806b0

Please sign in to comment.