Skip to content

Commit

Permalink
New publisher for internal data pipeline rule processor
Browse files Browse the repository at this point in the history
  • Loading branch information
joselsegura committed Jan 13, 2025
1 parent 77a0c6c commit c406428
Show file tree
Hide file tree
Showing 2 changed files with 307 additions and 0 deletions.
80 changes: 80 additions & 0 deletions ccx_messaging/publishers/idp_rule_processing_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright 2025 Red Hat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Module that implements a custom Kafka publisher."""

import json
import logging
from typing import Any

import jsonschema

from ccx_messaging.error import CCXMessagingError
from ccx_messaging.publishers.kafka_publisher import KafkaPublisher
from ccx_messaging.schemas import ARCHIVE_SYNCED_SCHEMA


log = logging.getLogger(__name__)



class IDPRuleProcessingPublisher(KafkaPublisher):
"""RuleProcessingPublisher handles the results of the applied rules and publish them to Kafka.
The results of the data analysis are received as a JSON (string)
and turned into a byte array using UTF-8 encoding.
The bytes are then sent to the output Kafka topic.
Custom error handling for the whole pipeline is implemented here.
"""

def publish(self, input_msg: dict[str, Any], report: str | bytes) -> None:
"""Publish an EOL-terminated JSON message to the output Kafka topic.
The report is assumed to be a string representing a valid JSON object.
A newline character will be appended to it, it will be converted into
a byte array using UTF-8 encoding and the result of that will be sent
to the producer to produce a message in the output Kafka topic.
"""
try:
jsonschema.validate(input_msg, ARCHIVE_SYNCED_SCHEMA)

except jsonschema.ValidationError as ex:
raise CCXMessagingError("Invalid JSON format in the input message.") from ex

try:
report = json.loads(report)

except (TypeError, json.decoder.JSONDecodeError):
raise CCXMessagingError("Could not parse report; report is not in JSON format")

output_msg = {
"path": input_msg["path"],
"metadata": input_msg["metadata"],
"report": report,
}

message = json.dumps(output_msg)
log.debug("Sending response to the %s topic.", self.topic)
# Convert message string into a byte array.
self.produce(message.encode("utf-8"))
log.debug("Message has been sent successfully.")

def error(self, input_msg: dict, ex: Exception):
"""Handle pipeline errors by logging them."""
log.warning(
"An error has ocurred during the processing of %s: %s",
input_msg,
ex,
)
227 changes: 227 additions & 0 deletions test/publishers/idp_rule_processing_publisher_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
# Copyright 2025 Red Hat, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests for the RuleProcessingPublisher class."""

import json
from unittest.mock import MagicMock, patch

import pytest
from confluent_kafka import KafkaException

from ccx_messaging.error import CCXMessagingError
from ccx_messaging.publishers.idp_rule_processing_publisher import IDPRuleProcessingPublisher


def test_init():
"""Check that init creates a valid object."""
kakfa_config = {
"bootstrap.servers": "kafka:9092",
}
IDPRuleProcessingPublisher(outgoing_topic="topic name", **kakfa_config)


INVALID_TOPIC_NAMES = [
None,
b"Topic name",
[],
{},
4,
5.5,
5 + 2j,
]


@pytest.mark.parametrize("topic_name", INVALID_TOPIC_NAMES)
def test_init_invalid_topic(topic_name):
"""Check what happens when the output_topic parameter is not valid."""
with pytest.raises(CCXMessagingError):
IDPRuleProcessingPublisher(topic_name)


INVALID_KWARGS = [
{},
{"bootstrap_servers": "kafka:9092"},
{"bootstrap.servers": "kafka:9092", "unknown_option": "value"},
]


@pytest.mark.parametrize("kwargs", INVALID_KWARGS)
def test_bad_initialization(kwargs):
"""Check that init fails when using not valid kwargs."""
with pytest.raises(KafkaException):
IDPRuleProcessingPublisher(outgoing_topic="topic", **kwargs)


@pytest.mark.parametrize("kafka_broker_cfg", INVALID_KWARGS)
@pytest.mark.parametrize("kwargs", INVALID_KWARGS)
def test_bad_init_with_kafka_config(kafka_broker_cfg, kwargs):
"""Check that init fails when using not valid kwargs."""
with pytest.raises(KafkaException):
IDPRuleProcessingPublisher(outgoing_topic="topic", **kwargs)


INVALID_INPUT_MSGS = [
None,
"",
1,
2.0,
3 + 1j,
[],
{}, # right type, missing path and metadata
{"path": ""}, # missing metadata
{"path": "", "metadata": {}}, # missing metadata-cluster_id
]


@pytest.mark.parametrize("wrong_input_msg", INVALID_INPUT_MSGS)
def test_publish_bad_argument(wrong_input_msg):
"""Check that invalid messages passed by the framework are handled gracefully."""
sut = IDPRuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"})
sut.producer = MagicMock()

with pytest.raises(CCXMessagingError):
sut.publish(wrong_input_msg, {})
assert not sut.producer.produce.called


VALID_INPUT_MSG = [
pytest.param(
{
"path": "bucket/path/to/archive.tgz",
"metadata": {
"cluster_id": "uuid",
},
},
{
"path": "bucket/path/to/archive.tgz",
"metadata": {
"cluster_id": "uuid",
},
"report": {
"reports": [],
},
},
id="minimal valid",
),
pytest.param(
{
"path": "bucket/path/to/archive.tgz",
"original_path": "other_than_current_path",
"metadata": {
"cluster_id": "uuid",
"external_organization": "an organization"
},
},
{
"path": "bucket/path/to/archive.tgz",
"metadata": {
"cluster_id": "uuid",
"external_organization": "an organization"
},
"report": {
"reports": [],
},
},
id="adding optional elements",
),
]


@pytest.mark.parametrize("input, expected_output", VALID_INPUT_MSG)
def test_publish_valid(input, expected_output):
"""Check that Kafka producer is called with an expected message."""
report = '{"reports": []}'

expected_output = json.dumps(expected_output)
sut = IDPRuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"})
sut.producer = MagicMock()

sut.publish(input, report)
sut.producer.produce.assert_called_with("outgoing_topic", expected_output.encode())


INVALID_REPORTS = [
None,
1,
2.0,
1 + 3j,
[],
{},
"",
]


@pytest.mark.parametrize("invalid_report", INVALID_REPORTS)
def test_publish_invalid_report(invalid_report):
"""Check the behaviour of publish when an invalid report is received."""
sut = IDPRuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"})
sut.producer = MagicMock()

with pytest.raises(CCXMessagingError):
sut.publish(VALID_INPUT_MSG, invalid_report)
assert not sut.producer.produce.called


@pytest.mark.parametrize("input,output", VALID_INPUT_MSG)
def test_error(input, output):
"""Check that error just prints a log."""
_ = output # output values are not needed

sut = IDPRuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"})

with patch("ccx_messaging.publishers.idp_rule_processing_publisher.log") as log_mock:
sut.error(input, None)
assert log_mock.warning.called


VALID_REPORTS = [
pytest.param(
json.dumps(
{"reports": [], "pass": [], "info": [], "workload_recommendations": []}
),
{
"path": "bucket/path/to/archive.tgz",
"metadata": {
"cluster_id": "uuid",
},
"report": {"reports": [], "pass": [], "info": [], "workload_recommendations": []},
},
id="valid_report",
),
pytest.param(
json.dumps(
{"reports": [], "pass": [], "info": [], "workload_recommendations": []}
),
{
"path": "bucket/path/to/archive.tgz",
"metadata": {
"cluster_id": "uuid",
},
"report": {"reports": [], "pass": [], "info": [], "workload_recommendations": []},
},
id="valid_report",
)
]


@pytest.mark.parametrize("input,expected_output", VALID_REPORTS)
def test_filter_ocp_rules_results(input, expected_output):
"""Check that the workload recommendations are filtered out from the engine results."""
sut = IDPRuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"})
sut.producer = MagicMock()
expected_output = json.dumps(expected_output)

sut.publish(VALID_INPUT_MSG[0][0][0], input)
sut.producer.produce.assert_called_with("outgoing_topic", expected_output.encode())

0 comments on commit c406428

Please sign in to comment.