Skip to content

Commit

Permalink
Modified content-type to abide by attribute naming conventions for cl…
Browse files Browse the repository at this point in the history
…oudevents (#232)

* fix: changed content-type to a valid attribute

Signed-off-by: vivjd <vivjdeng@hotmail.com>

* fix: changed headers back to content-type

Signed-off-by: Vivian <118199397+vivjd@users.noreply.github.com>
Signed-off-by: vivjd <vivjdeng@hotmail.com>

* modified kafka test cases to match datacontenttype

Signed-off-by: vivjd <vivjdeng@hotmail.com>

* fix: updated kafka/conversion.py and test cases to check for valid attributes

Signed-off-by: vivjd <vivjdeng@hotmail.com>

---------

Signed-off-by: vivjd <vivjdeng@hotmail.com>
Signed-off-by: Vivian <118199397+vivjd@users.noreply.github.com>
Co-authored-by: Yurii Serhiichuk <xSAVIKx@users.noreply.github.com>
  • Loading branch information
vivjd and xSAVIKx authored May 26, 2024
1 parent 11520e3 commit 16441d7
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 14 deletions.
17 changes: 10 additions & 7 deletions cloudevents/kafka/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ def to_binary(
)

headers = {}
if event["content-type"]:
headers["content-type"] = event["content-type"].encode("utf-8")
if event["datacontenttype"]:
headers["content-type"] = event["datacontenttype"].encode("utf-8")
for attr, value in event.get_attributes().items():
if attr not in ["data", "partitionkey", "content-type"]:
if attr not in ["data", "partitionkey", "datacontenttype"]:
if value is not None:
headers["ce_{0}".format(attr)] = value.encode("utf-8")

Expand Down Expand Up @@ -126,7 +126,7 @@ def from_binary(
for header, value in message.headers.items():
header = header.lower()
if header == "content-type":
attributes["content-type"] = value.decode()
attributes["datacontenttype"] = value.decode()
elif header.startswith("ce_"):
attributes[header[3:]] = value.decode()

Expand Down Expand Up @@ -189,8 +189,8 @@ def to_structured(
attrs["data"] = data

headers = {}
if "content-type" in attrs:
headers["content-type"] = attrs.pop("content-type").encode("utf-8")
if "datacontenttype" in attrs:
headers["content-type"] = attrs.pop("datacontenttype").encode("utf-8")

try:
value = envelope_marshaller(attrs)
Expand Down Expand Up @@ -255,7 +255,10 @@ def from_structured(
attributes[name] = decoded_value

for header, val in message.headers.items():
attributes[header.lower()] = val.decode()
if header.lower() == "content-type":
attributes["datacontenttype"] = val.decode()
else:
attributes[header.lower()] = val.decode()
if event_type:
result = event_type.create(attributes, data)
else:
Expand Down
14 changes: 7 additions & 7 deletions cloudevents/tests/test_kafka_conversions.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def source_event(self) -> CloudEvent:
"source": "pytest",
"type": "com.pytest.test",
"time": datetime.datetime(2000, 1, 1, 6, 42, 33).isoformat(),
"content-type": "foo",
"datacontenttype": "foo",
"partitionkey": "test_key_123",
},
data=self.expected_data,
Expand Down Expand Up @@ -123,7 +123,7 @@ def test_sets_headers(self, source_event):
assert result.headers["ce_source"] == source_event["source"].encode("utf-8")
assert result.headers["ce_type"] == source_event["type"].encode("utf-8")
assert result.headers["ce_time"] == source_event["time"].encode("utf-8")
assert result.headers["content-type"] == source_event["content-type"].encode(
assert result.headers["content-type"] == source_event["datacontenttype"].encode(
"utf-8"
)
assert "data" not in result.headers
Expand Down Expand Up @@ -163,7 +163,7 @@ def source_binary_bytes_message(self) -> KafkaMessage:
"ce_time": datetime.datetime(2000, 1, 1, 6, 42, 33)
.isoformat()
.encode("utf-8"),
"content-type": "foo".encode("utf-8"),
"datacontenttype": "foo".encode("utf-8"),
},
value=simple_serialize(self.expected_data),
key="test_key_123",
Expand Down Expand Up @@ -205,7 +205,7 @@ def test_sets_attrs_from_headers(self, source_binary_json_message):
assert result["type"] == source_binary_json_message.headers["ce_type"].decode()
assert result["time"] == source_binary_json_message.headers["ce_time"].decode()
assert (
result["content-type"]
result["datacontenttype"]
== source_binary_json_message.headers["content-type"].decode()
)

Expand Down Expand Up @@ -328,7 +328,7 @@ def test_no_key(self, source_event):
def test_sets_headers(self, source_event):
result = to_structured(source_event)
assert len(result.headers) == 1
assert result.headers["content-type"] == source_event["content-type"].encode(
assert result.headers["content-type"] == source_event["datacontenttype"].encode(
"utf-8"
)

Expand Down Expand Up @@ -474,7 +474,7 @@ def test_sets_content_type_default_envelope_unmarshaller(
):
result = from_structured(source_structured_json_message)
assert (
result["content-type"]
result["datacontenttype"]
== source_structured_json_message.headers["content-type"].decode()
)

Expand All @@ -487,7 +487,7 @@ def test_sets_content_type_custom_envelope_unmarshaller(
envelope_unmarshaller=custom_unmarshaller,
)
assert (
result["content-type"]
result["datacontenttype"]
== source_structured_bytes_bytes_message.headers["content-type"].decode()
)

Expand Down

0 comments on commit 16441d7

Please sign in to comment.