Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCXDEV-12772: S3UploadEngine compatible with KafkaConsumer #207

Merged
merged 3 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 47 additions & 5 deletions ccx_messaging/consumers/idp_kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,27 @@
"""Kafka consumer implementation using Confluent Kafka library for Internal Data Pipeline."""
import logging
from confluent_kafka import Message
from ccx_messaging.consumers.kafka_consumer import KafkaConsumer
import json
import re

from confluent_kafka import Message
from insights import dr

from ccx_messaging.consumers.kafka_consumer import KafkaConsumer
from ccx_messaging.error import CCXMessagingError


# Path example: <org_id>/<cluster_id>/<year><month><day><time>-<id>
# Following RE matches with S3 archives like the previous example and allow
# to extract named groups for the different meaningful components
S3_ARCHIVE_PATTERN = re.compile(
r"(?P<org_id>[0-9]+)\/" # extract named group for organization id
r"(?P<cluster_id>[0-9,a-z,-]{36})\/" # extract named group for the cluster_id
r"(?P<archive>" # extract named group for the archive name, including the following 3 lines
r"(?P<timestamp>" # extract the timestamp named group, including the following line
# Next line extract year, month, day and time named groups from the timestamp
r"(?P<year>[0-9]{4})(?P<month>[0-9]{2})(?P<day>[0-9]{2})(?P<time>[0-9]{6}))-"
r"(?P<id>[a-z,A-Z,0-9]*))" # Extract the id of the file as named group
)
LOG = logging.getLogger(__name__)


Expand Down Expand Up @@ -37,8 +51,20 @@ def handles(self, msg: Message) -> bool:

def deserialize(self, msg):
"""Deserialize JSON message received from kafka."""
if not msg:
raise CCXMessagingError("No incoming message %s", msg)

try:
deserialized_message = json.loads(msg.value())
value = msg.value()

except AttributeError as ex:
raise CCXMessagingError("Invalid incoming message type: %s", type(msg)) from ex

LOG.debug("Deserializing incoming message(%s): %s", self.log_pattern, value)

try:
deserialized_message = json.loads(value)

except TypeError as ex:
LOG.warning("Incorrect message type: %s", msg)
raise CCXMessagingError("Incorrect message type") from ex
Expand All @@ -52,7 +78,23 @@ def deserialize(self, msg):

def create_broker(self, input_msg):
"""Create a suitable `Broker` to be pass arguments to the `Engine`."""
path = input_msg.get("path")
broker = dr.Broker()
broker["cluster_id"] = input_msg.get("cluster_id")
broker["s3_path"] = input_msg.get("path")
broker["original_path"] = path

if "cluster_id" in input_msg:
broker["cluster_id"] = input_msg["cluster_id"]

match_ = S3_ARCHIVE_PATTERN.match(path)
if not match_:
LOG.warning("The archive doesn't match the expected pattern: %s", path)
exception = CCXMessagingError("Archive pattern name incorrect")
self.fire("on_consumer_failure", broker, exception)
raise exception

# Cluster ID might be overrided by the one found in the `path`
for key, value in match_.groupdict().items():
if key not in broker:
broker[key] = value

return broker
25 changes: 25 additions & 0 deletions ccx_messaging/consumers/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
import time
from datetime import datetime
from threading import Thread

from confluent_kafka import (
Expand All @@ -11,6 +12,7 @@
Producer,
TIMESTAMP_NOT_AVAILABLE,
)
from insights import dr
from insights_messaging.consumers import Consumer

from ccx_messaging.error import CCXMessagingError
Expand Down Expand Up @@ -246,6 +248,29 @@ def check_last_message_received_time(self):
# To do the minimum interruptions possible, sleep for one hour
time.sleep(MAX_ELAPSED_TIME_BETWEEN_MESSAGES)

def create_broker(self, input_msg):
"""Create a suitable `Broker` to be pass arguments to the `Engine`."""
broker = dr.Broker()

# Some engines expect some data for its own usage, like the following:
org_id = (
input_msg.get("identity", {}).get("identity").get("internal", {}).get("org_id", None)
)
date = datetime.fromisoformat(input_msg.get("timestamp", "0"))

broker["org_id"] = org_id
broker["cluster_id"] = input_msg["cluster_name"]
broker["original_path"] = input_msg["url"]
broker["year"] = date.year
broker["month"] = date.month
broker["day"] = date.day
broker["time"] = f"{date.hour}:{date.minute}:{date.second}"
broker["hour"] = date.hour
broker["minute"] = date.minute
broker["second"] = date.second

return broker

def process_dead_letter(self, msg: Message) -> None:
"""Send the message to a dead letter queue in a different Kafka topic."""
if not self.dlq_producer:
Expand Down
41 changes: 7 additions & 34 deletions ccx_messaging/engines/s3_upload_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,14 @@
"""S3 Engine Class and related functions."""

import json
import re
import logging
from collections import ChainMap

from insights_messaging.engine import Engine

from ccx_messaging.utils.s3_uploader import S3Uploader
from ccx_messaging.utils.sliced_template import SlicedTemplate
from ccx_messaging.error import CCXMessagingError


# Path example: <org_id>/<cluster_id>/<year><month><day><time>-<id>
# Following RE matches with S3 archives like the previous example and allow
# to extract named groups for the different meaningful components
S3_ARCHIVE_PATTERN = re.compile(
r"(?P<org_id>[0-9]+)\/" # extract named group for organization id
r"(?P<cluster_id>[0-9,a-z,-]{36})\/" # extract named group for the cluster_id
r"(?P<archive>" # extract named group for the archive name, including the following 3 lines
r"(?P<timestamp>" # extract the timestamp named group, including the following line
# Next line extract year, month, day and time named groups from the timestamp
r"(?P<year>[0-9]{4})(?P<month>[0-9]{2})(?P<day>[0-9]{2})(?P<time>[0-9]{6}))-"
r"(?P<id>[a-z,A-Z,0-9]*))" # Extract the id of the file as named group
)


LOG = logging.getLogger(__name__)


Expand Down Expand Up @@ -99,36 +84,24 @@ def process(self, broker, local_path):
LOG.info("Processing %s for uploading", local_path)
self.fire("pre_extract", broker, local_path)

s3_path = broker["s3_path"]

for w in self.watchers:
w.watch_broker(broker)

match_ = S3_ARCHIVE_PATTERN.match(s3_path)
if not match_:
LOG.warning("The archive doesn't match the expected pattern: %s")
exception = CCXMessagingError("Archive pattern name incorrect")
self.fire("on_engine_failure", broker, exception)
raise exception

components = ChainMap(broker, match_.groupdict())
target_path = self.compute_target_path(components)
LOG.info(f"Uploading archive '{s3_path}' as {self.dest_bucket}/{target_path}")
target_path = self.compute_target_path(broker)
LOG.info(f"Uploading archive '{local_path}' as {self.dest_bucket}/{target_path}")
self.uploader.upload_file(local_path, self.dest_bucket, target_path)
LOG.info(f"Uploaded archive '{s3_path}' as {self.dest_bucket}/{target_path}")
LOG.info(f"Uploaded archive '{local_path}' as {self.dest_bucket}/{target_path}")

metadata = create_metadata(components)
metadata = create_metadata(broker)
report = {
"path": target_path,
"original_path": s3_path,
"original_path": broker.get("original_path", ""),
"metadata": metadata,
}

LOG.info("Generated report: %s", report)
self.fire("on_engine_success", broker, report)

del broker["cluster_id"]
del broker["s3_path"]
return json.dumps(report)

def compute_target_path(self, components: dict[str, str]) -> str:
Expand Down
26 changes: 26 additions & 0 deletions test/consumers/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,27 @@
"""Module containing tests for the consumers."""

from confluent_kafka import TIMESTAMP_NOT_AVAILABLE


class KafkaMessage:
"""Test double for the confluent_kafka.Message class."""

def __init__(self, msg, headers=None, timestamp=None):
"""Initialize a KafkaMessage test double."""
self.msg = msg
self._headers = headers
self._timestamp = timestamp
self.topic = lambda: "topic"
self.partition = lambda: 0
self.offset = lambda: 0
self.value = lambda: self.msg
self.error = lambda: None
self.headers = lambda: self._headers

def timestamp(self):
"""Test double for the Message.timestamp function."""
if self._timestamp is None:
return TIMESTAMP_NOT_AVAILABLE, None

else:
return None, self._timestamp
Loading
Loading