Skip to content

Commit

Permalink
Added new consumer for SyncedArchivePublisher messages
Browse files Browse the repository at this point in the history
  • Loading branch information
joselsegura committed Jan 10, 2025
1 parent 3c62e61 commit 8d5bed8
Show file tree
Hide file tree
Showing 5 changed files with 352 additions and 2 deletions.
80 changes: 80 additions & 0 deletions ccx_messaging/consumers/synced_archive_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""Module containing the consumer for the Kafka topic produced by the Archive Sync service."""

import logging
from typing import Any

from confluent_kafka import Message, KafkaException
from insights import dr
from insights_messaging.consumers import Consumer

from ccx_messaging.consumers.kafka_consumer import KafkaConsumer
from ccx_messaging.internal_pipeline import parse_archive_sync_msg
from ccx_messaging.error import CCXMessagingError


LOG = logging.getLogger(__name__)


class SyncedArchiveConsumer(KafkaConsumer):
"""Consumer for the topic produced by `synced_archive_publisher.SyncedArchivePublisher`."""

def get_url(self, input_msg: dict[str:str]) -> str:
"""Retrieve path to the archive in the S3 storage from Kafka message."""
# it's safe to asume the "path" is there because the message format is validated
return input_msg["path"]

def process_msg(self, msg: Message) -> None:
"""Process a single message received from the topic."""
if not msg:
LOG.debug("Empty record. Should not happen")
return

if msg.error():
raise KafkaException(msg.error())

try:
# Deserialize
value = self.deserialize(msg)
# Core Messaging process
self.process(value)

except CCXMessagingError as ex:
LOG.warning(
"Unexpected error processing incoming message. (%s): %s. Error: %s",
self.log_pattern,
msg.value(),
ex,
)
self.process_dead_letter(msg)

except TimeoutError as ex:
self.fire("on_process_timeout")
LOG.exception(ex)
self.process_dead_letter(msg)

except Exception as ex: # pylint: disable=broad-exception-caught
LOG.exception(ex)
self.process_dead_letter(msg)

def deserialize(self, msg: Message) -> dict:
"""Deserialize the message received from Kafka into a dictionary."""
if not msg:
raise CCXMessagingError("No incoming message: %s", msg)

try:
value = msg.value()
except AttributeError as ex:
raise CCXMessagingError("Invalid incoming message type: %s", type(msg)) from ex

LOG.debug("Deserializing incoming message(%s): %s", self.log_pattern, value)

if not value:
raise CCXMessagingError("Unable to read incoming message: %s", value)

deseralized_msg = parse_archive_sync_msg(value)
LOG.debug("JSON message deserialized (%s): %s", self.log_pattern, deseralized_msg)
return deseralized_msg

def create_broker(self, input_msg: dict[str, Any]) -> dr.Broker:
"""Create a suitable `Broker`."""
return Consumer.create_broker(self, input_msg)
34 changes: 34 additions & 0 deletions ccx_messaging/internal_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Utilities related to Ingress message format."""

import json
import logging

import jsonschema

from ccx_messaging.error import CCXMessagingError
from ccx_messaging.schemas import ARCHIVE_SYNCED_SCHEMA


LOG = logging.getLogger(__name__)


def parse_archive_sync_msg(message: bytes) -> dict:
"""Parse a bytes messages into a dictionary, decoding encoded values."""
try:
deserialized_message = json.loads(message)
jsonschema.validate(instance=deserialized_message, schema=ARCHIVE_SYNCED_SCHEMA)

except TypeError as ex:
LOG.warning("Incorrect message type: %s", message)
raise CCXMessagingError("Incorrect message type") from ex

except json.JSONDecodeError as ex:
LOG.warning("Unable to decode received message: %s", message)
raise CCXMessagingError("Unable to decode received message") from ex

except jsonschema.ValidationError as ex:
LOG.warning("Invalid input message JSON schema: %s", deserialized_message)
raise CCXMessagingError("Invalid input message JSON schema") from ex

LOG.debug("JSON schema validated: %s", deserialized_message)
return deserialized_message
16 changes: 16 additions & 0 deletions ccx_messaging/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,19 @@
},
"required": ["identity"],
}

ARCHIVE_SYNCED_SCHEMA = {
"type": "object",
"properties": {
"path": {"type": "string"},
"original_path": {"type": "string"},
"metadata": {
"type": "object",
"properties": {
"cluster_id": {"type": "string"},
"external_organization": {"type": "string"},
},
},
},
"required": ["path", "metadata"],
}
8 changes: 6 additions & 2 deletions test/consumers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
class KafkaMessage:
"""Test double for the confluent_kafka.Message class."""

def __init__(self, msg, headers=None, timestamp=None):
def __init__(self, msg=None, headers=None, timestamp=None, error=False):
"""Initialize a KafkaMessage test double."""
self.msg = msg
self._headers = headers
Expand All @@ -15,7 +15,11 @@ def __init__(self, msg, headers=None, timestamp=None):
self.partition = lambda: 0
self.offset = lambda: 0
self.value = lambda: self.msg
self.error = lambda: None
if error:
self.error = lambda: "error"
else:
self.error = lambda: None

self.headers = lambda: self._headers

def timestamp(self):
Expand Down
216 changes: 216 additions & 0 deletions test/consumers/synced_archive_consumer_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
# Copyright 2024 Red Hat, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Module containing unit tests for the `KafkaConsumer` class."""

from unittest.mock import MagicMock, patch

import pytest

from confluent_kafka import KafkaException

from ccx_messaging.consumers.synced_archive_consumer import SyncedArchiveConsumer
from ccx_messaging.error import CCXMessagingError

from . import KafkaMessage


# _REGEX_BAD_SCHEMA = r"^Unable to extract URL from input message: "
_INVALID_TYPE_VALUES = [
None,
42,
3.14,
True,
[],
{},
]


@pytest.mark.parametrize("value", _INVALID_TYPE_VALUES)
@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer")
def test_deserialize_invalid_type(mock_consumer, value):
"""Test that passing invalid data type to `deserialize` raises an exception."""
sut = SyncedArchiveConsumer(None, None, None, incoming_topic=None)
with pytest.raises(CCXMessagingError):
sut.deserialize(value)


@pytest.mark.parametrize("value", _INVALID_TYPE_VALUES)
@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer")
def test_deserialize_invalid_type_as_kafka_message(mock_consumer, value):
"""Test that passing invalid data type to `deserialize` raises an exception."""
sut = SyncedArchiveConsumer(None, None, None, incoming_topic=None)
with pytest.raises(CCXMessagingError):
sut.deserialize(KafkaMessage(value))


_VALID_MESSAGES = [
(
'{"path": "", "metadata": {}}',
{
"path": "",
"metadata": {},
},
),
(
'{"path": "path/to/archive.tgz", "metadata": {}}',
{
"path": "path/to/archive.tgz",
"metadata": {},
},
),
(
'{"path": "path/to/archive.tgz", "metadata": {}, "original_path": ""}',
{
"path": "path/to/archive.tgz",
"metadata": {},
"original_path": "",
},
),
(
'{"path": "path/to/archive.tgz", "original_path": "other/path/archive.tgz", '
'"metadata": {"cluster_id": "", "external_organization": ""}}',
{
"path": "path/to/archive.tgz",
"metadata": {
"cluster_id": "",
"external_organization": "",
},
"original_path": "other/path/archive.tgz",
},
),
]


@pytest.mark.parametrize("msg,value", _VALID_MESSAGES)
@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer", lambda *a, **k: MagicMock())
def test_deserialize_valid_str(msg, value):
"""Test that proper string JSON input messages are correctly deserialized."""
sut = SyncedArchiveConsumer(None, None, None, incoming_topic=None)
assert sut.deserialize(KafkaMessage(msg)) == value


@pytest.mark.parametrize("msg,value", _VALID_MESSAGES)
@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer", lambda *a, **k: MagicMock())
def test_deserialize_valid_bytes(msg, value):
"""Test that proper string JSON input messages are correctly deserialized."""
sut = SyncedArchiveConsumer(None, None, None, incoming_topic=None)
assert sut.deserialize(KafkaMessage(msg.encode())) == value


@pytest.mark.parametrize("msg,value", _VALID_MESSAGES)
@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer", lambda *a, **k: MagicMock())
def test_deserialize_valid_bytearray(msg, value):
"""Test that proper string JSON input messages are correctly deserialized."""
sut = SyncedArchiveConsumer(None, None, None, incoming_topic=None)
assert sut.deserialize(KafkaMessage(bytearray(msg.encode()))) == value


_INVALID_MESSAGES = [
"",
'"path": "value"',
]


@pytest.mark.parametrize("msg", _INVALID_MESSAGES)
@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer", lambda *a, **k: MagicMock())
def test_deserialize_invalid_str(msg):
"""Test that invalid string JSON is not correctly deserialized."""
sut = SyncedArchiveConsumer(None, None, None, incoming_topic=None)
with pytest.raises(CCXMessagingError):
sut.deserialize(KafkaMessage(msg))


@pytest.mark.parametrize("msg, _", _VALID_MESSAGES)
@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer", lambda *a, **k: MagicMock())
def test_handles_valid(msg, _):
"""Test that `handles` method returns True for valid messages."""
sut = SyncedArchiveConsumer(None, None, None, incoming_topic=None)
assert sut.handles(KafkaMessage(msg))


# This would have been a valid input, but it's supposed to be a `dict`, not `str`.
_DICT_STR = '{"path": "bucket/file"}'

_INVALID_RECORD_VALUES = [
"",
_DICT_STR.encode("utf-8"),
bytearray(_DICT_STR.encode("utf-8")),
[],
]

_VALID_RECORD_VALUES = [
{"path": ""},
{"path": "bucket/file"},
{"path": "https://a-valid-domain.com/precious_url"},
]


@pytest.mark.parametrize("value", _INVALID_RECORD_VALUES)
def test_get_url_invalid(value):
"""Test that `SyncedArchiveConsumer.get_url` raises the appropriate exception."""
with pytest.raises(Exception):
SyncedArchiveConsumer.get_url(None, value)


@pytest.mark.parametrize("value", _VALID_RECORD_VALUES)
def test_get_url_valid(value):
"""Test that `SyncedArchiveConsumer.get_url` returns the expected value."""
assert SyncedArchiveConsumer.get_url(None, value) == value["path"]


def test_create_broker():
"""Test that `SyncedArchiveConsumer.create_broker` doesn't report any error."""
assert SyncedArchiveConsumer.create_broker(None, {}) is not None



_VALID_KAFKA_MESSAGES = [KafkaMessage(value) for value in _VALID_RECORD_VALUES]
_VALID_KAFKA_MESSAGES.extend([
None,
])

@pytest.mark.parametrize("value", _VALID_KAFKA_MESSAGES)
@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer", lambda *a, **k: MagicMock())
def test_process_msg(value: dict[str,str]):
"""Test right `process_msg` behaviour."""
sut = SyncedArchiveConsumer(None, None, None, incoming_topic=None)
with patch(
"ccx_messaging.consumers.synced_archive_consumer.SyncedArchiveConsumer.process",
lambda: None
):
sut.process_msg(value)


_INVALID_KAFKA_MESSAGES = [KafkaMessage(value) for value in _INVALID_RECORD_VALUES]


@pytest.mark.parametrize("value", _INVALID_KAFKA_MESSAGES)
@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer", lambda *a, **k: MagicMock())
def test_process_invalid_msg(value: dict[str,str]):
"""Test right `process_msg` behaviour."""
sut = SyncedArchiveConsumer(None, None, None, incoming_topic=None)
with patch(
"ccx_messaging.consumers.synced_archive_consumer.SyncedArchiveConsumer.process_dead_letter",
) as dlq_mock:
sut.process_msg(value)
assert dlq_mock.called


@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer", lambda *a, **k: MagicMock())
def test_process_msg_raise_error():
"""Test when the `process` message raises an error."""
sut = SyncedArchiveConsumer(None, None, None, incoming_topic=None)
with pytest.raises(KafkaException):
sut.process_msg(KafkaMessage(error=True))

0 comments on commit 8d5bed8

Please sign in to comment.