Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publisher of rule processing reports using Confluent #52

Merged
merged 2 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions ccx_messaging/publishers/rule_processing_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# Copyright 2019, 2020, 2021, 2022 Red Hat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Module that implements a custom Kafka publisher."""

import json
import logging
from json import JSONDecodeError

from confluent_kafka import KafkaException, Producer
from insights_messaging.publishers import Publisher

from ccx_messaging.error import CCXMessagingError

LOG = logging.getLogger(__name__)


class RuleProcessingPublisher(Publisher):
"""
RuleProcessingPublisher will handle the results of the applied rules and publish them to Kafka.

The results of the data analysis are received as a JSON (string)
and turned into a byte array using UTF-8 encoding.
The bytes are then sent to the output Kafka topic.

Custom error handling for the whole pipeline is implemented here.
"""

def __init__(self, outgoing_topic, kafka_broker_config=None, **kwargs):
"""Construct a new `RuleProcessingPublisher` given `kwargs` from the config YAML."""
self.topic = outgoing_topic
if type(self.topic) is not str:
raise CCXMessagingError("outgoing_topic should be an str")
joselsegura marked this conversation as resolved.
Show resolved Hide resolved

if kafka_broker_config:
kwargs.update(kafka_broker_config)

if "bootstrap.servers" not in kwargs:
raise KafkaException("Broker not configured")

self.producer = Producer(kwargs)
LOG.info(
"Producing to topic '%s' on brokers %s", self.topic, kwargs.get("bootstrap.servers")
)
self.outdata_schema_version = 2

def publish(self, input_msg, response):
"""
Publish an EOL-terminated JSON message to the output Kafka topic.

The response is assumed to be a string representing a valid JSON object.
A newline character will be appended to it, it will be converted into
a byte array using UTF-8 encoding and the result of that will be sent
to the producer to produce a message in the output Kafka topic.
"""
# Response is already a string, no need to JSON dump.
output_msg = {}
try:
org_id = int(input_msg["identity"]["identity"]["internal"]["org_id"])
except (ValueError, KeyError, TypeError) as err:
raise CCXMessagingError(f"Error extracting the OrgID: {err}") from err

try:
account_number = int(input_msg["identity"]["identity"]["account_number"])
except (ValueError, KeyError, TypeError) as err:
raise CCXMessagingError(f"Error extracting the Account number: {err}") from err

try:
msg_timestamp = input_msg["timestamp"]
output_msg = {
"OrgID": org_id,
"AccountNumber": account_number,
"ClusterName": input_msg["cluster_name"],
"Report": json.loads(response),
"LastChecked": msg_timestamp,
"Version": self.outdata_schema_version,
"RequestId": input_msg.get("request_id"),
}

message = json.dumps(output_msg) + "\n"

LOG.debug("Sending response to the %s topic.", self.topic)
# Convert message string into a byte array.
self.producer.produce(self.topic, message.encode("utf-8"))
LOG.debug("Message has been sent successfully.")
LOG.debug(
"Message context: OrgId=%s, AccountNumber=%s, "
'ClusterName="%s", LastChecked="%s, Version=%d"',
output_msg["OrgID"],
output_msg["AccountNumber"],
output_msg["ClusterName"],
output_msg["LastChecked"],
output_msg["Version"],
)

LOG.info(
"Status: Success; "
"Topic: %s; "
"Partition: %s; "
"Offset: %s; "
"LastChecked: %s",
input_msg.get("topic"),
input_msg.get("partition"),
input_msg.get("offset"),
msg_timestamp,
)

except KeyError as err:
raise CCXMessagingError("Missing expected keys in the input message") from err

except (TypeError, UnicodeEncodeError, JSONDecodeError) as err:
raise CCXMessagingError(f"Error encoding the response to publish: {response}") from err

def error(self, input_msg, ex):
"""Handle pipeline errors by logging them."""
# The super call is probably unnecessary because the default behavior
# is to do nothing, but let's call it in case it ever does anything.
super().error(input_msg, ex)

if not isinstance(ex, CCXMessagingError):
ex = CCXMessagingError(ex)

LOG.error(ex.format(input_msg))
196 changes: 196 additions & 0 deletions test/publishers/rule_processing_publisher_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
# Copyright 2023 Red Hat, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests for the RuleProcessingPublisher class."""

import json
from unittest.mock import MagicMock, patch

import pytest
from confluent_kafka import KafkaException

from ccx_messaging.error import CCXMessagingError
from ccx_messaging.publishers.rule_processing_publisher import RuleProcessingPublisher


def test_init():
"""Check that init creates a valid object."""
kakfa_config = {
"bootstrap.servers": "kafka:9092",
}
RuleProcessingPublisher(outgoing_topic="topic name", **kakfa_config)


INVALID_TOPIC_NAMES = [
None,
b"Topic name",
[],
{},
4,
5.5,
5 + 2j,
]


@pytest.mark.parametrize("topic_name", INVALID_TOPIC_NAMES)
def test_init_invalid_topic(topic_name):
"""Check what happens when the output_topic parameter is not valid."""
with pytest.raises(CCXMessagingError):
RuleProcessingPublisher(topic_name)


INVALID_KWARGS = [
{},
{"bootstrap_servers": "kafka:9092"},
{"bootstrap.servers": "kafka:9092", "unknown_option": "value"},
]


@pytest.mark.parametrize("kwargs", INVALID_KWARGS)
def test_bad_initialization(kwargs):
"""Check that init fails when using not valid kwargs."""
with pytest.raises(KafkaException):
RuleProcessingPublisher(outgoing_topic="topic", **kwargs)


@pytest.mark.parametrize("kafka_broker_cfg", INVALID_KWARGS)
@pytest.mark.parametrize("kwargs", INVALID_KWARGS)
def test_bad_init_with_kafka_config(kafka_broker_cfg, kwargs):
"""Check that init fails when using not valid kwargs."""
with pytest.raises(KafkaException):
RuleProcessingPublisher(outgoing_topic="topic", **kwargs)


INVALID_INPUT_MSGS = [
None,
"",
1,
2.0,
3 + 1j,
[],
{}, # right type, missing identity
{"identity": {}}, # missing identity-identity
{"identity": {"identity": {}}}, # missing identity-identity-internal
{"identity": {"identity": {"internal": {}}}}, # missing identity-identity-internal-org_id
{"identity": {"identity": {"internal": {"org_id": 15.2}}}}, # incorrect org_id type
{"identity": {"identity": {"internal": {"org_id": 10}}}}, # missing "account_number"
{
"identity": {
"identity": {
"internal": {"org_id": 10},
"account_number": 1 + 2j, # incorrect account number type
},
},
},
{
"identity": {
"identity": {
"internal": {"org_id": 10},
"account_number": 1,
},
},
}, # missing timestamp
{
"identity": {
"identity": {
"internal": {"org_id": 10},
"account_number": 1,
},
},
"timestamp": "a timestamp",
}, # missing cluster_name
]


@pytest.mark.parametrize("wrong_input_msg", INVALID_INPUT_MSGS)
def test_publish_bad_argument(wrong_input_msg):
"""Check that invalid messages passed by the framework are handled gracefully."""
sut = RuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"})
sut.producer = MagicMock()

with pytest.raises(CCXMessagingError):
sut.publish(wrong_input_msg, {})
assert not sut.producer.produce.called


VALID_INPUT_MSG = {
"identity": {
"identity": {
"internal": {"org_id": 10},
"account_number": 1,
},
},
"timestamp": "a timestamp",
"cluster_name": "uuid",
"request_id": "a request id",
"topic": "incoming_topic",
"partition": 0,
"offset": 100,
}


def test_publish_valid():
"""Check that Kafka producer is called with an expected message."""
report = "{}"

expected_output = (
json.dumps(
{
"OrgID": 10,
"AccountNumber": 1,
"ClusterName": "uuid",
"Report": {},
"LastChecked": "a timestamp",
"Version": 2,
"RequestId": "a request id",
}
)
+ "\n"
)
sut = RuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"})
sut.producer = MagicMock()

sut.publish(VALID_INPUT_MSG, report)
sut.producer.produce.assert_called_with("outgoing_topic", expected_output.encode())


INVALID_REPORTS = [
None,
1,
2.0,
1 + 3j,
[],
{},
"",
]


@pytest.mark.parametrize("invalid_report", INVALID_REPORTS)
def test_publish_invalid_report(invalid_report):
"""Check the behaviour of publish when an invalid report is received."""
sut = RuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"})
sut.producer = MagicMock()

with pytest.raises(CCXMessagingError):
sut.publish(VALID_INPUT_MSG, invalid_report)
assert not sut.producer.produce.called


def test_error():
"""Check that error just prints a log."""
sut = RuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"})

with patch("ccx_messaging.publishers.rule_processing_publisher.LOG") as log_mock:
sut.error(VALID_INPUT_MSG, None)
assert log_mock.error.called