-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #388 from joselsegura/new_publisher_processing_rules
New publisher for internal data pipeline rule processor
- Loading branch information
Showing
2 changed files
with
266 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
# Copyright 2025 Red Hat Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
"""Module that implements a custom Kafka publisher.""" | ||
|
||
import json | ||
import logging | ||
from typing import Any | ||
|
||
import jsonschema | ||
|
||
from ccx_messaging.error import CCXMessagingError | ||
from ccx_messaging.publishers.kafka_publisher import KafkaPublisher | ||
from ccx_messaging.schemas import ARCHIVE_SYNCED_SCHEMA | ||
|
||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
|
||
class IDPRuleProcessingPublisher(KafkaPublisher): | ||
"""RuleProcessingPublisher handles the results of the applied rules and publish them to Kafka. | ||
The results of the data analysis are received as a JSON (string) | ||
and turned into a byte array using UTF-8 encoding. | ||
The bytes are then sent to the output Kafka topic. | ||
Custom error handling for the whole pipeline is implemented here. | ||
""" | ||
|
||
def publish(self, input_msg: dict[str, Any], report: str | bytes) -> None: | ||
"""Publish an EOL-terminated JSON message to the output Kafka topic. | ||
The report is assumed to be a string representing a valid JSON object. | ||
A newline character will be appended to it, it will be converted into | ||
a byte array using UTF-8 encoding and the result of that will be sent | ||
to the producer to produce a message in the output Kafka topic. | ||
""" | ||
try: | ||
jsonschema.validate(input_msg, ARCHIVE_SYNCED_SCHEMA) | ||
|
||
except jsonschema.ValidationError as ex: | ||
raise CCXMessagingError("Invalid JSON format in the input message.") from ex | ||
|
||
try: | ||
report = json.loads(report) | ||
|
||
except (TypeError, json.decoder.JSONDecodeError): | ||
raise CCXMessagingError("Could not parse report; report is not in JSON format") | ||
|
||
output_msg = { | ||
"path": input_msg["path"], | ||
"metadata": input_msg["metadata"], | ||
"report": report, | ||
} | ||
|
||
message = json.dumps(output_msg) | ||
log.debug("Sending response to the %s topic.", self.topic) | ||
# Convert message string into a byte array. | ||
self.produce(message.encode("utf-8")) | ||
log.debug("Message has been sent successfully.") | ||
|
||
def error(self, input_msg: dict, ex: Exception): | ||
"""Handle pipeline errors by logging them.""" | ||
log.warning( | ||
"An error has ocurred during the processing of %s: %s", | ||
input_msg, | ||
ex, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,186 @@ | ||
# Copyright 2025 Red Hat, Inc | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
"""Tests for the RuleProcessingPublisher class.""" | ||
|
||
import json | ||
from unittest.mock import MagicMock, patch | ||
|
||
import pytest | ||
from confluent_kafka import KafkaException | ||
|
||
from ccx_messaging.error import CCXMessagingError | ||
from ccx_messaging.publishers.idp_rule_processing_publisher import IDPRuleProcessingPublisher | ||
|
||
|
||
def test_init(): | ||
"""Check that init creates a valid object.""" | ||
kakfa_config = { | ||
"bootstrap.servers": "kafka:9092", | ||
} | ||
IDPRuleProcessingPublisher(outgoing_topic="topic name", **kakfa_config) | ||
|
||
|
||
INVALID_TOPIC_NAMES = [ | ||
None, | ||
b"Topic name", | ||
[], | ||
{}, | ||
4, | ||
5.5, | ||
5 + 2j, | ||
] | ||
|
||
|
||
@pytest.mark.parametrize("topic_name", INVALID_TOPIC_NAMES) | ||
def test_init_invalid_topic(topic_name): | ||
"""Check what happens when the output_topic parameter is not valid.""" | ||
with pytest.raises(CCXMessagingError): | ||
IDPRuleProcessingPublisher(topic_name) | ||
|
||
|
||
INVALID_KWARGS = [ | ||
{}, | ||
{"bootstrap_servers": "kafka:9092"}, | ||
{"bootstrap.servers": "kafka:9092", "unknown_option": "value"}, | ||
] | ||
|
||
|
||
@pytest.mark.parametrize("kwargs", INVALID_KWARGS) | ||
def test_bad_initialization(kwargs): | ||
"""Check that init fails when using not valid kwargs.""" | ||
with pytest.raises(KafkaException): | ||
IDPRuleProcessingPublisher(outgoing_topic="topic", **kwargs) | ||
|
||
|
||
@pytest.mark.parametrize("kafka_broker_cfg", INVALID_KWARGS) | ||
@pytest.mark.parametrize("kwargs", INVALID_KWARGS) | ||
def test_bad_init_with_kafka_config(kafka_broker_cfg, kwargs): | ||
"""Check that init fails when using not valid kwargs.""" | ||
with pytest.raises(KafkaException): | ||
IDPRuleProcessingPublisher(outgoing_topic="topic", **kwargs) | ||
|
||
|
||
INVALID_INPUT_MSGS = [ | ||
None, | ||
"", | ||
1, | ||
2.0, | ||
3 + 1j, | ||
[], | ||
{}, # right type, missing path and metadata | ||
{"path": ""}, # missing metadata | ||
{"path": "", "metadata": {}}, # missing metadata-cluster_id | ||
] | ||
|
||
|
||
@pytest.mark.parametrize("wrong_input_msg", INVALID_INPUT_MSGS) | ||
def test_publish_bad_argument(wrong_input_msg): | ||
"""Check that invalid messages passed by the framework are handled gracefully.""" | ||
sut = IDPRuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"}) | ||
sut.producer = MagicMock() | ||
|
||
with pytest.raises(CCXMessagingError): | ||
sut.publish(wrong_input_msg, {}) | ||
assert not sut.producer.produce.called | ||
|
||
|
||
VALID_INPUT_MSG = [ | ||
pytest.param( | ||
{ | ||
"path": "bucket/path/to/archive.tgz", | ||
"metadata": { | ||
"cluster_id": "uuid", | ||
}, | ||
}, | ||
{ | ||
"path": "bucket/path/to/archive.tgz", | ||
"metadata": { | ||
"cluster_id": "uuid", | ||
}, | ||
"report": { | ||
"reports": [], | ||
}, | ||
}, | ||
id="minimal valid", | ||
), | ||
pytest.param( | ||
{ | ||
"path": "bucket/path/to/archive.tgz", | ||
"original_path": "other_than_current_path", | ||
"metadata": { | ||
"cluster_id": "uuid", | ||
"external_organization": "an organization" | ||
}, | ||
}, | ||
{ | ||
"path": "bucket/path/to/archive.tgz", | ||
"metadata": { | ||
"cluster_id": "uuid", | ||
"external_organization": "an organization" | ||
}, | ||
"report": { | ||
"reports": [], | ||
}, | ||
}, | ||
id="adding optional elements", | ||
), | ||
] | ||
|
||
|
||
@pytest.mark.parametrize("input, expected_output", VALID_INPUT_MSG) | ||
def test_publish_valid(input, expected_output): | ||
"""Check that Kafka producer is called with an expected message.""" | ||
report = '{"reports": []}' | ||
|
||
expected_output = json.dumps(expected_output) | ||
sut = IDPRuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"}) | ||
sut.producer = MagicMock() | ||
|
||
sut.publish(input, report) | ||
sut.producer.produce.assert_called_with("outgoing_topic", expected_output.encode()) | ||
|
||
|
||
INVALID_REPORTS = [ | ||
None, | ||
1, | ||
2.0, | ||
1 + 3j, | ||
[], | ||
{}, | ||
"", | ||
] | ||
|
||
|
||
@pytest.mark.parametrize("invalid_report", INVALID_REPORTS) | ||
def test_publish_invalid_report(invalid_report): | ||
"""Check the behaviour of publish when an invalid report is received.""" | ||
sut = IDPRuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"}) | ||
sut.producer = MagicMock() | ||
|
||
with pytest.raises(CCXMessagingError): | ||
sut.publish(VALID_INPUT_MSG, invalid_report) | ||
assert not sut.producer.produce.called | ||
|
||
|
||
@pytest.mark.parametrize("input,output", VALID_INPUT_MSG) | ||
def test_error(input, output): | ||
"""Check that error just prints a log.""" | ||
_ = output # output values are not needed | ||
|
||
sut = IDPRuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"}) | ||
|
||
with patch("ccx_messaging.publishers.idp_rule_processing_publisher.log") as log_mock: | ||
sut.error(input, None) | ||
assert log_mock.warning.called |