diff --git a/CHANGELOG.md b/CHANGELOG.md index dac430f8..5968a346 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.0.0] +### Added +- Added a user friendly CloudEvent class with data validation ([#36]) +- CloudEvent structured cloudevent support ([#47]) + +### Removed +- Removed support for Cloudevents V0.2 and V0.1 ([#43]) + ## [0.3.0] ### Added - Added Cloudevents V0.3 and V1 implementations ([#22]) @@ -66,3 +74,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 [#23]: https://github.com/cloudevents/sdk-python/pull/23 [#25]: https://github.com/cloudevents/sdk-python/pull/25 [#27]: https://github.com/cloudevents/sdk-python/pull/27 +[#36]: https://github.com/cloudevents/sdk-python/pull/36 +[#43]: https://github.com/cloudevents/sdk-python/pull/43 +[#47]: https://github.com/cloudevents/sdk-python/pull/47 \ No newline at end of file diff --git a/README.md b/README.md index 5e392270..5e60476a 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,57 @@ This SDK current supports the following versions of CloudEvents: Package **cloudevents** provides primitives to work with CloudEvents specification: https://github.com/cloudevents/spec. +Sending CloudEvents: + +### Binary HTTP CloudEvent + +```python +from cloudevents.sdk.http_events import CloudEvent +import requests + + +# This data defines a binary cloudevent +headers = { + "Content-Type": "application/json", + "ce-specversion": "1.0", + "ce-type": "README.sample.binary", + "ce-id": "binary-event", + "ce-time": "2018-10-23T12:28:22.4579346Z", + "ce-source": "README", +} +data = {"message": "Hello World!"} + +event = CloudEvent(data, headers=headers) +headers, body = event.to_request() + +# POST +requests.post("", json=body, headers=headers) +``` + +### Structured HTTP CloudEvent + +```python +from cloudevents.sdk.http_events import CloudEvent +import requests + + +# This data defines a structured cloudevent +data = { + "specversion": "1.0", + "type": "README.sample.structured", + "id": "structured-event", + "source": "README", + "data": {"message": "Hello World!"} +} +event = CloudEvent(data) +headers, body = event.to_request() + +# POST +requests.post("", json=body, headers=headers) +``` + +### Event base classes usage + Parsing upstream structured Event from HTTP request: ```python @@ -68,22 +119,6 @@ event = m.FromRequest( ) ``` -Creating a minimal CloudEvent in version 0.1: - -```python -from cloudevents.sdk.event import v1 - -event = ( - v1.Event() - .SetContentType("application/json") - .SetData('{"name":"john"}') - .SetEventID("my-id") - .SetSource("from-galaxy-far-far-away") - .SetEventTime("tomorrow") - .SetEventType("cloudevent.greet.you") -) -``` - Creating HTTP request from CloudEvent: ```python diff --git a/cloudevents/sdk/event/base.py b/cloudevents/sdk/event/base.py index d392ae8b..1f5d8358 100644 --- a/cloudevents/sdk/event/base.py +++ b/cloudevents/sdk/event/base.py @@ -20,61 +20,141 @@ # TODO(slinkydeveloper) is this really needed? class EventGetterSetter(object): + # ce-specversion def CloudEventVersion(self) -> str: raise Exception("not implemented") - # CloudEvent attribute getters - def EventType(self) -> str: - raise Exception("not implemented") + @property + def specversion(self): + return self.CloudEventVersion() - def Source(self) -> str: + def SetCloudEventVersion(self, specversion: str) -> object: raise Exception("not implemented") - def EventID(self) -> str: - raise Exception("not implemented") + @specversion.setter + def specversion(self, value: str): + self.SetCloudEventVersion(value) - def EventTime(self) -> str: + # ce-type + def EventType(self) -> str: raise Exception("not implemented") - def SchemaURL(self) -> str: - raise Exception("not implemented") + @property + def type(self): + return self.EventType() - def Data(self) -> object: + def SetEventType(self, eventType: str) -> object: raise Exception("not implemented") - def Extensions(self) -> dict: - raise Exception("not implemented") + @type.setter + def type(self, value: str): + self.SetEventType(value) - def ContentType(self) -> str: + # ce-source + def Source(self) -> str: raise Exception("not implemented") - # CloudEvent attribute constructors - # Each setter return an instance of its class - # in order to build a pipeline of setter - def SetEventType(self, eventType: str) -> object: - raise Exception("not implemented") + @property + def source(self): + return self.Source() def SetSource(self, source: str) -> object: raise Exception("not implemented") + @source.setter + def source(self, value: str): + self.SetSource(value) + + # ce-id + def EventID(self) -> str: + raise Exception("not implemented") + + @property + def id(self): + return self.EventID() + def SetEventID(self, eventID: str) -> object: raise Exception("not implemented") + @id.setter + def id(self, value: str): + self.SetEventID(value) + + # ce-time + def EventTime(self) -> str: + raise Exception("not implemented") + + @property + def time(self): + return self.EventTime() + def SetEventTime(self, eventTime: str) -> object: raise Exception("not implemented") + @time.setter + def time(self, value: str): + self.SetEventTime(value) + + # ce-schema + def SchemaURL(self) -> str: + raise Exception("not implemented") + + @property + def schema(self) -> str: + return self.SchemaURL() + def SetSchemaURL(self, schemaURL: str) -> object: raise Exception("not implemented") + @schema.setter + def schema(self, value: str): + self.SetSchemaURL(value) + + # data + def Data(self) -> object: + raise Exception("not implemented") + + @property + def data(self) -> object: + return self.Data() + def SetData(self, data: object) -> object: raise Exception("not implemented") + @data.setter + def data(self, value: object): + self.SetData(value) + + # ce-extensions + def Extensions(self) -> dict: + raise Exception("not implemented") + + @property + def extensions(self) -> dict: + return self.Extensions() + def SetExtensions(self, extensions: dict) -> object: raise Exception("not implemented") + @extensions.setter + def extensions(self, value: dict): + self.SetExtensions(value) + + # Content-Type + def ContentType(self) -> str: + raise Exception("not implemented") + + @property + def content_type(self) -> str: + return self.ContentType() + def SetContentType(self, contentType: str) -> object: raise Exception("not implemented") + @content_type.setter + def content_type(self, value: str): + self.SetContentType(value) + class BaseEvent(EventGetterSetter): def Properties(self, with_nullable=False) -> dict: @@ -105,7 +185,6 @@ def Set(self, key: str, value: object): attr.set(value) setattr(self, formatted_key, attr) return - exts = self.Extensions() exts.update({key: value}) self.Set("extensions", exts) @@ -117,6 +196,7 @@ def MarshalJSON(self, data_marshaller: typing.Callable) -> typing.IO: def UnmarshalJSON(self, b: typing.IO, data_unmarshaller: typing.Callable): raw_ce = json.load(b) + for name, value in raw_ce.items(): if name == "data": value = data_unmarshaller(value) @@ -134,7 +214,6 @@ def UnmarshalBinary( self.SetContentType(value) elif header.startswith("ce-"): self.Set(header[3:], value) - self.Set("data", data_unmarshaller(body)) def MarshalBinary( diff --git a/cloudevents/sdk/event/opt.py b/cloudevents/sdk/event/opt.py index 2a18a52a..bf630c32 100644 --- a/cloudevents/sdk/event/opt.py +++ b/cloudevents/sdk/event/opt.py @@ -35,3 +35,9 @@ def get(self): def required(self): return self.is_required + + def __eq__(self, obj): + return isinstance(obj, Option) and \ + obj.name == self.name and \ + obj.value == self.value and \ + obj.is_required == self.is_required diff --git a/cloudevents/sdk/event/v03.py b/cloudevents/sdk/event/v03.py index 4207e400..3b6a3222 100644 --- a/cloudevents/sdk/event/v03.py +++ b/cloudevents/sdk/event/v03.py @@ -17,6 +17,21 @@ class Event(base.BaseEvent): + _ce_required_fields = { + 'id', + 'source', + 'type', + 'specversion' + } + + _ce_optional_fields = { + 'datacontentencoding', + 'datacontenttype', + 'schemaurl', + 'subject', + 'time' + } + def __init__(self): self.ce__specversion = opt.Option("specversion", "0.3", True) self.ce__id = opt.Option("id", None, True) @@ -68,6 +83,10 @@ def ContentType(self) -> str: def ContentEncoding(self) -> str: return self.ce__datacontentencoding.get() + @property + def datacontentencoding(self): + return self.ContentEncoding() + def SetEventType(self, eventType: str) -> base.BaseEvent: self.Set("type", eventType) return self @@ -107,3 +126,7 @@ def SetContentType(self, contentType: str) -> base.BaseEvent: def SetContentEncoding(self, contentEncoding: str) -> base.BaseEvent: self.Set("datacontentencoding", contentEncoding) return self + + @datacontentencoding.setter + def datacontentencoding(self, value: str): + self.SetContentEncoding(value) diff --git a/cloudevents/sdk/event/v1.py b/cloudevents/sdk/event/v1.py index 655111ae..6034a9b4 100644 --- a/cloudevents/sdk/event/v1.py +++ b/cloudevents/sdk/event/v1.py @@ -17,6 +17,20 @@ class Event(base.BaseEvent): + _ce_required_fields = { + 'id', + 'source', + 'type', + 'specversion' + } + + _ce_optional_fields = { + 'datacontenttype', + 'dataschema', + 'subject', + 'time' + } + def __init__(self): self.ce__specversion = opt.Option("specversion", "1.0", True) self.ce__id = opt.Option("id", None, True) diff --git a/cloudevents/sdk/http_events.py b/cloudevents/sdk/http_events.py new file mode 100644 index 00000000..d8db759c --- /dev/null +++ b/cloudevents/sdk/http_events.py @@ -0,0 +1,193 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import copy + +import io + +import json +import typing + +from cloudevents.sdk import converters +from cloudevents.sdk import marshaller + +from cloudevents.sdk.event import v03, v1 + + +class CloudEvent(): + """ + Python-friendly cloudevent class supporting v1 events + Currently only supports binary content mode CloudEvents + """ + + def __init__( + self, + data: typing.Union[dict, None], + headers: dict = {}, + data_unmarshaller: typing.Callable = lambda x: x, + ): + """ + Event HTTP Constructor + :param data: a nullable dict to be stored inside Event. + :type data: dict or None + :param headers: a dict with HTTP headers + e.g. { + "content-type": "application/cloudevents+json", + "ce-id": "16fb5f0b-211e-1102-3dfe-ea6e2806f124", + "ce-source": "", + "ce-type": "cloudevent.event.type", + "ce-specversion": "0.2" + } + :type headers: dict + :param binary: a bool indicating binary events + :type binary: bool + :param data_unmarshaller: callable function for reading/extracting data + :type data_unmarshaller: typing.Callable + """ + self.required_attribute_values = {} + self.optional_attribute_values = {} + if data is None: + data = {} + + headers = {key.lower(): value for key, value in headers.items()} + data = {key.lower(): value for key, value in data.items()} + + # returns an event class depending on proper version + event_version = CloudEvent.detect_event_version(headers, data) + self.isbinary = CloudEvent.is_binary_cloud_event( + event_version, + headers + ) + + self.marshall = marshaller.NewDefaultHTTPMarshaller() + self.event_handler = event_version() + + self.__event = self.marshall.FromRequest( + self.event_handler, + headers, + io.BytesIO(json.dumps(data).encode()), + data_unmarshaller + ) + + # headers validation for binary events + for field in event_version._ce_required_fields: + + # prefixes with ce- if this is a binary event + fieldname = f"ce-{field}" if self.isbinary else field + + # fields_refs holds a reference to where fields should be + fields_refs = headers if self.isbinary else data + + fields_refs_name = 'headers' if self.isbinary else 'data' + + # verify field exists else throw TypeError + if fieldname not in fields_refs: + raise TypeError( + f"parameter {fields_refs_name} has no required " + f"attribute {fieldname}." + ) + + elif not isinstance(fields_refs[fieldname], str): + raise TypeError( + f"in parameter {fields_refs_name}, {fieldname} " + f"expected type str but found type " + f"{type(fields_refs[fieldname])}." + ) + + else: + self.required_attribute_values[f"ce-{field}"] = \ + fields_refs[fieldname] + + for field in event_version._ce_optional_fields: + fieldname = f"ce-{field}" if self.isbinary else field + if (fieldname in fields_refs) and not \ + isinstance(fields_refs[fieldname], str): + raise TypeError( + f"in parameter {fields_refs_name}, {fieldname} " + f"expected type str but found type " + f"{type(fields_refs[fieldname])}." + ) + else: + self.optional_attribute_values[f"ce-{field}"] = field + + # structured data is inside json resp['data'] + self.data = copy.deepcopy(data) if self.isbinary else \ + copy.deepcopy(data.get('data', {})) + + self.headers = { + **self.required_attribute_values, + **self.optional_attribute_values + } + + def to_request( + self, + data_unmarshaller: typing.Callable = lambda x: json.loads( + x.read() + .decode('utf-8') + ) + ) -> (dict, dict): + """ + Returns a tuple of HTTP headers/body dicts representing this cloudevent + + :param data_unmarshaller: callable function used to read the data io + object + :type data_unmarshaller: typing.Callable + :returns: (http_headers: dict, http_body: dict) + """ + converter_type = converters.TypeBinary if self.isbinary else \ + converters.TypeStructured + + headers, data = self.marshall.ToRequest( + self.__event, + converter_type, + data_unmarshaller + ) + data = data if self.isbinary else data_unmarshaller(data)['data'] + return headers, data + + def __getitem__(self, key): + return self.data if key == 'data' else self.headers[key] + + @staticmethod + def is_binary_cloud_event(event_version, headers): + for field in event_version._ce_required_fields: + if f"ce-{field}" not in headers: + return False + return True + + @staticmethod + def detect_event_version(headers, data): + """ + Returns event handler depending on specversion within + headers for binary cloudevents or within data for structured + cloud events + """ + specversion = headers.get('ce-specversion', data.get('specversion')) + if specversion == '1.0': + return v1.Event + elif specversion == '0.3': + return v03.Event + else: + raise TypeError(f"specversion {specversion} " + "currently unsupported") + + def __repr__(self): + return json.dumps( + { + 'Event': { + 'headers': self.headers, + 'data': self.data + } + }, + indent=4 + ) diff --git a/cloudevents/tests/data.py b/cloudevents/tests/data.py index 6605c7f5..ffe63aee 100644 --- a/cloudevents/tests/data.py +++ b/cloudevents/tests/data.py @@ -23,7 +23,7 @@ headers = { v03.Event: { - "ce-specversion": "0.3", + "ce-specversion": "1.0", "ce-type": ce_type, "ce-id": ce_id, "ce-time": eventTime, @@ -42,7 +42,7 @@ json_ce = { v03.Event: { - "specversion": "0.3", + "specversion": "1.0", "type": ce_type, "id": ce_id, "time": eventTime, diff --git a/cloudevents/tests/test_data_encaps_refs.py b/cloudevents/tests/test_data_encaps_refs.py new file mode 100644 index 00000000..84bc91ef --- /dev/null +++ b/cloudevents/tests/test_data_encaps_refs.py @@ -0,0 +1,114 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import io +import json +import copy +import pytest + +from uuid import uuid4 + +from cloudevents.sdk import converters +from cloudevents.sdk import marshaller + +from cloudevents.sdk.converters import structured +from cloudevents.sdk.event import v03, v1 + + +from cloudevents.tests import data + + +@pytest.mark.parametrize("event_class", [ v03.Event, v1.Event]) +def test_general_binary_properties(event_class): + m = marshaller.NewDefaultHTTPMarshaller() + event = m.FromRequest( + event_class(), + {"Content-Type": "application/cloudevents+json"}, + io.StringIO(json.dumps(data.json_ce[event_class])), + lambda x: x.read(), + ) + + new_headers, _ = m.ToRequest(event, converters.TypeBinary, lambda x: x) + assert new_headers is not None + assert "ce-specversion" in new_headers + + # Test properties + assert event is not None + assert event.type == data.ce_type + assert event.id == data.ce_id + assert event.content_type == data.contentType + assert event.source == data.source + + # Test setters + new_type = str(uuid4()) + new_id = str(uuid4()) + new_content_type = str(uuid4()) + new_source = str(uuid4()) + + event.extensions = {'test': str(uuid4)} + event.type = new_type + event.id = new_id + event.content_type = new_content_type + event.source = new_source + + assert event is not None + assert (event.type == new_type) and (event.type == event.EventType()) + assert (event.id == new_id) and (event.id == event.EventID()) + assert (event.content_type == new_content_type) and (event.content_type == event.ContentType()) + assert (event.source == new_source) and (event.source == event.Source()) + assert event.extensions['test'] == event.Extensions()['test'] + assert (event.specversion == event.CloudEventVersion()) + + +@pytest.mark.parametrize("event_class", [v03.Event, v1.Event]) +def test_general_structured_properties(event_class): + copy_of_ce = copy.deepcopy(data.json_ce[event_class]) + m = marshaller.NewDefaultHTTPMarshaller() + http_headers = {"content-type": "application/cloudevents+json"} + event = m.FromRequest( + event_class(), http_headers, io.StringIO(json.dumps(data.json_ce[event_class])), lambda x: x.read() + ) + # Test python properties + assert event is not None + assert event.type == data.ce_type + assert event.id == data.ce_id + assert event.content_type == data.contentType + assert event.source == data.source + + new_headers, _ = m.ToRequest(event, converters.TypeStructured, lambda x: x) + for key in new_headers: + if key == "content-type": + assert new_headers[key] == http_headers[key] + continue + assert key in copy_of_ce + + # Test setters + new_type = str(uuid4()) + new_id = str(uuid4()) + new_content_type = str(uuid4()) + new_source = str(uuid4()) + + event.extensions = {'test': str(uuid4)} + event.type = new_type + event.id = new_id + event.content_type = new_content_type + event.source = new_source + + assert event is not None + assert (event.type == new_type) and (event.type == event.EventType()) + assert (event.id == new_id) and (event.id == event.EventID()) + assert (event.content_type == new_content_type) and (event.content_type == event.ContentType()) + assert (event.source == new_source) and (event.source == event.Source()) + assert event.extensions['test'] == event.Extensions()['test'] + assert (event.specversion == event.CloudEventVersion()) diff --git a/cloudevents/tests/test_http_events.py b/cloudevents/tests/test_http_events.py new file mode 100644 index 00000000..8d703949 --- /dev/null +++ b/cloudevents/tests/test_http_events.py @@ -0,0 +1,303 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import io + +import json + +import copy + +from cloudevents.sdk.http_events import CloudEvent + +from sanic import response +from sanic import Sanic + +import pytest + + +invalid_test_headers = [ + { + "ce-source": "", + "ce-type": "cloudevent.event.type", + "ce-specversion": "1.0" + }, { + "ce-id": "my-id", + "ce-type": "cloudevent.event.type", + "ce-specversion": "1.0" + }, { + "ce-id": "my-id", + "ce-source": "", + "ce-specversion": "1.0" + }, { + "ce-id": "my-id", + "ce-source": "", + "ce-type": "cloudevent.event.type", + } +] + +invalid_cloudevent_request_bodie = [ + { + "source": "", + "type": "cloudevent.event.type", + "specversion": "1.0" + }, { + "id": "my-id", + "type": "cloudevent.event.type", + "specversion": "1.0" + }, { + "id": "my-id", + "source": "", + "specversion": "1.0" + }, { + "id": "my-id", + "source": "", + "type": "cloudevent.event.type", + } +] + +test_data = { + "payload-content": "Hello World!" +} + +app = Sanic(__name__) + + +def post(url, headers, json): + return app.test_client.post(url, headers=headers, data=json) + + +@app.route("/event", ["POST"]) +async def echo(request): + assert isinstance(request.json, dict) + event = CloudEvent(request.json, headers=dict(request.headers)) + return response.text(json.dumps(event.data), headers=event.headers) + + +@pytest.mark.parametrize("body", invalid_cloudevent_request_bodie) +def test_missing_required_fields_structured(body): + with pytest.raises((TypeError, NotImplementedError)): + # CloudEvent constructor throws TypeError if missing required field + # and NotImplementedError because structured calls aren't + # implemented. In this instance one of the required keys should have + # prefix e-id instead of ce-id therefore it should throw + _ = CloudEvent(body, headers={'Content-Type': 'application/json'}) + + +@pytest.mark.parametrize("headers", invalid_test_headers) +def test_missing_required_fields_binary(headers): + with pytest.raises((TypeError, NotImplementedError)): + # CloudEvent constructor throws TypeError if missing required field + # and NotImplementedError because structured calls aren't + # implemented. In this instance one of the required keys should have + # prefix e-id instead of ce-id therefore it should throw + _ = CloudEvent(test_data, headers=headers) + + +@pytest.mark.parametrize("specversion", ['1.0', '0.3']) +def test_emit_binary_event(specversion): + headers = { + "ce-id": "my-id", + "ce-source": "", + "ce-type": "cloudevent.event.type", + "ce-specversion": specversion, + "Content-Type": "application/cloudevents+json" + } + event = CloudEvent(test_data, headers=headers) + _, r = app.test_client.post( + "/event", + headers=event.headers, + data=json.dumps(event.data) + ) + + # Convert byte array to dict + # e.g. r.body = b'{"payload-content": "Hello World!"}' + body = json.loads(r.body.decode('utf-8')) + + # Check response fields + for key in test_data: + assert body[key] == test_data[key] + for key in headers: + if key != 'Content-Type': + assert r.headers[key] == headers[key] + assert r.status_code == 200 + + +@pytest.mark.parametrize("specversion", ['1.0', '0.3']) +def test_emit_structured_event(specversion): + headers = { + "Content-Type": "application/json" + } + body = { + "id": "my-id", + "source": "", + "type": "cloudevent.event.type", + "specversion": specversion, + "data": test_data + } + event = CloudEvent(body, headers=headers) + _, r = app.test_client.post( + "/event", + headers=event.headers, + data=json.dumps(event.data) + ) + + # Convert byte array to dict + # e.g. r.body = b'{"payload-content": "Hello World!"}' + body = json.loads(r.body.decode('utf-8')) + + # Check response fields + for key in test_data: + assert body[key] == test_data[key] + assert r.status_code == 200 + + +@pytest.mark.parametrize("specversion", ['1.0', '0.3']) +def test_missing_ce_prefix_binary_event(specversion): + prefixed_headers = {} + headers = { + "ce-id": "my-id", + "ce-source": "", + "ce-type": "cloudevent.event.type", + "ce-specversion": specversion + } + for key in headers: + + # breaking prefix e.g. e-id instead of ce-id + prefixed_headers[key[1:]] = headers[key] + + with pytest.raises((TypeError, NotImplementedError)): + # CloudEvent constructor throws TypeError if missing required field + # and NotImplementedError because structured calls aren't + # implemented. In this instance one of the required keys should have + # prefix e-id instead of ce-id therefore it should throw + _ = CloudEvent(test_data, headers=prefixed_headers) + + +@pytest.mark.parametrize("specversion", ['1.0', '0.3']) +def test_valid_binary_events(specversion): + # Test creating multiple cloud events + events_queue = [] + headers = {} + num_cloudevents = 30 + for i in range(num_cloudevents): + headers = { + "ce-id": f"id{i}", + "ce-source": f"source{i}.com.test", + "ce-type": f"cloudevent.test.type", + "ce-specversion": specversion + } + data = {'payload': f"payload-{i}"} + events_queue.append(CloudEvent(data, headers=headers)) + + for i, event in enumerate(events_queue): + headers = event.headers + data = event.data + assert headers['ce-id'] == f"id{i}" + assert headers['ce-source'] == f"source{i}.com.test" + assert headers['ce-specversion'] == specversion + assert data['payload'] == f"payload-{i}" + + +@pytest.mark.parametrize("specversion", ['1.0', '0.3']) +def test_structured_to_request(specversion): + data = { + "specversion": specversion, + "type": "word.found.name", + "id": "96fb5f0b-001e-0108-6dfe-da6e2806f124", + "source": "pytest", + "data": {"message": "Hello World!"} + } + event = CloudEvent(data) + headers, body = event.to_request() + assert isinstance(body, dict) + + assert headers['content-type'] == 'application/cloudevents+json' + for key in data: + assert body[key] == data[key] + + +@pytest.mark.parametrize("specversion", ['1.0', '0.3']) +def test_binary_to_request(specversion): + test_headers = { + "ce-specversion": specversion, + "ce-type": "word.found.name", + "ce-id": "96fb5f0b-001e-0108-6dfe-da6e2806f124", + "ce-source": "pytest" + } + data = { + "message": "Hello World!" + } + event = CloudEvent(data, headers=test_headers) + headers, body = event.to_request() + assert isinstance(body, dict) + + for key in data: + assert body[key] == data[key] + for key in test_headers: + assert test_headers[key] == headers[key] + + +@pytest.mark.parametrize("specversion", ['1.0', '0.3']) +def test_empty_data_structured_event(specversion): + # Testing if cloudevent breaks when no structured data field present + data = { + "specversion": specversion, + "datacontenttype": "application/json", + "type": "word.found.name", + "id": "96fb5f0b-001e-0108-6dfe-da6e2806f124", + "time": "2018-10-23T12:28:22.4579346Z", + "source": "", + "data": {"message": "Hello World!"} + } + _ = CloudEvent(data) + + +@pytest.mark.parametrize("specversion", ['1.0', '0.3']) +def test_empty_data_binary_event(specversion): + # Testing if cloudevent breaks when no structured data field present + headers = { + "Content-Type": "application/cloudevents+json", + "ce-specversion": specversion, + "ce-type": "word.found.name", + "ce-id": "96fb5f0b-001e-0108-6dfe-da6e2806f124", + "ce-time": "2018-10-23T12:28:22.4579346Z", + "ce-source": "", + } + _ = CloudEvent(None, headers=headers) + + +@pytest.mark.parametrize("specversion", ['1.0', '0.3']) +def test_valid_structured_events(specversion): + # Test creating multiple cloud events + events_queue = [] + headers = {} + num_cloudevents = 30 + for i in range(num_cloudevents): + data = { + "id": f"id{i}", + "source": f"source{i}.com.test", + "type": f"cloudevent.test.type", + "specversion": specversion, + "data": { + 'payload': f"payload-{i}" + } + } + events_queue.append(CloudEvent(data)) + + for i, event in enumerate(events_queue): + headers = event.headers + data = event.data + assert headers['ce-id'] == f"id{i}" + assert headers['ce-source'] == f"source{i}.com.test" + assert headers['ce-specversion'] == specversion + assert data['payload'] == f"payload-{i}" diff --git a/requirements/test.txt b/requirements/test.txt index e9df186e..12894086 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -7,4 +7,4 @@ pytest==4.0.0 pytest-cov==2.4.0 # web app tests sanic -aiohttp \ No newline at end of file +aiohttp diff --git a/samples/http-cloudevents/README.md b/samples/http-cloudevents/README.md new file mode 100644 index 00000000..a72244dc --- /dev/null +++ b/samples/http-cloudevents/README.md @@ -0,0 +1,20 @@ +## Quickstart + +Install dependencies: + +```sh +pip3 install -r requirements.txt +``` + +Start server: + +```sh +python3 server.py +``` + +In a new shell, run the client code which sends a structured and binary +cloudevent to your local server: + +```sh +python3 client.py http://localhost:3000/ +``` diff --git a/samples/http-cloudevents/client.py b/samples/http-cloudevents/client.py new file mode 100644 index 00000000..89d03680 --- /dev/null +++ b/samples/http-cloudevents/client.py @@ -0,0 +1,80 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import sys +import io +from cloudevents.sdk.http_events import CloudEvent + +import requests + + +def send_binary_cloud_event(url): + # define cloudevents data + headers = { + "ce-id": "binary-event-id", + "ce-source": "localhost", + "ce-type": "template.http.binary", + "ce-specversion": "1.0", + "Content-Type": "application/json", + "ce-time": "2018-10-23T12:28:23.3464579Z" + } + data = {"payload-content": "Hello World!"} + + # create a CloudEvent + event = CloudEvent(data, headers=headers) + headers, body = event.to_request() + + # send and print event + requests.post(url, headers=headers, json=body) + print( + f"Sent {event['ce-id']} from {event['ce-source']} with " + f"{event['data']}" + ) + + +def send_structured_cloud_event(url): + # define cloudevents data + data = { + "id": "structured-event-id", + "source": "localhost", + "type": "template.http.structured", + "specversion": "1.0", + "Content-Type": "application/json", + "time": "2018-10-23T12:28:23.3464579Z", + "data": { + "payload-content": "Hello World!" + } + } + + # create a CloudEvent + event = CloudEvent(data) + headers, body = event.to_request() + + # send and print event + requests.post(url, headers=headers, json=body) + print( + f"Sent {event['ce-id']} from {event['ce-source']} with " + f"{event['data']}" + ) + + +if __name__ == "__main__": + # expects a url from command line. + # e.g. python3 client.py http://localhost:3000/ + if len(sys.argv) < 2: + sys.exit("Usage: python with_requests.py " + "") + + url = sys.argv[1] + send_binary_cloud_event(url) + send_structured_cloud_event(url) diff --git a/samples/http-cloudevents/requirements.txt b/samples/http-cloudevents/requirements.txt new file mode 100644 index 00000000..5eaf725f --- /dev/null +++ b/samples/http-cloudevents/requirements.txt @@ -0,0 +1,2 @@ +flask +requests \ No newline at end of file diff --git a/samples/http-cloudevents/server.py b/samples/http-cloudevents/server.py new file mode 100644 index 00000000..11e48419 --- /dev/null +++ b/samples/http-cloudevents/server.py @@ -0,0 +1,35 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from cloudevents.sdk.http_events import CloudEvent +from flask import Flask, request +app = Flask(__name__) + + +# create an endpoint at http://localhost:/3000/ +@app.route('/', methods=['POST']) +def home(): + # convert headers to dict + headers = dict(request.headers) + print(request.json) + print(headers) + # create a CloudEvent + event = CloudEvent(headers=headers, data=request.json) + + # print the received CloudEvent + print(f"Received CloudEvent {event}") + return '', 204 + + +if __name__ == '__main__': + app.run(port=3000)