Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCXDEV-12772: S3UploadEngine compatible with KafkaConsumer #205

Merged
merged 1 commit into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions ccx_messaging/consumers/idp_kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,27 @@
"""Kafka consumer implementation using Confluent Kafka library for Internal Data Pipeline."""
import logging
from confluent_kafka import Message
from ccx_messaging.consumers.kafka_consumer import KafkaConsumer
import json
import re

from confluent_kafka import Message
from insights import dr

from ccx_messaging.consumers.kafka_consumer import KafkaConsumer
from ccx_messaging.error import CCXMessagingError


# Path example: <org_id>/<cluster_id>/<year><month><day><time>-<id>
# Following RE matches with S3 archives like the previous example and allow
# to extract named groups for the different meaningful components
S3_ARCHIVE_PATTERN = re.compile(
r"(?P<org_id>[0-9]+)\/" # extract named group for organization id
r"(?P<cluster_id>[0-9,a-z,-]{36})\/" # extract named group for the cluster_id
r"(?P<archive>" # extract named group for the archive name, including the following 3 lines
r"(?P<timestamp>" # extract the timestamp named group, including the following line
# Next line extract year, month, day and time named groups from the timestamp
r"(?P<year>[0-9]{4})(?P<month>[0-9]{2})(?P<day>[0-9]{2})(?P<time>[0-9]{6}))-"
r"(?P<id>[a-z,A-Z,0-9]*))" # Extract the id of the file as named group
)
LOG = logging.getLogger(__name__)


Expand Down Expand Up @@ -54,5 +68,17 @@ def create_broker(self, input_msg):
"""Create a suitable `Broker` to be pass arguments to the `Engine`."""
broker = dr.Broker()
broker["cluster_id"] = input_msg.get("cluster_id")
broker["s3_path"] = input_msg.get("path")
broker["original_path"] = input_msg.get("path")

match_ = S3_ARCHIVE_PATTERN.match(input_msg.get("path"))
if not match_:
LOG.warning("The archive doesn't match the expected pattern: %s")
exception = CCXMessagingError("Archive pattern name incorrect")
self.fire("on_consumer_failure", broker, exception)
raise exception

# Cluster ID might be overrided by the one found in the `path`
for key, value in match_.groupdict():
broker[key] = value

return broker
25 changes: 25 additions & 0 deletions ccx_messaging/consumers/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
import time
from datetime import datetime
from threading import Thread

from confluent_kafka import (
Expand All @@ -11,6 +12,7 @@
Producer,
TIMESTAMP_NOT_AVAILABLE,
)
from insights import dr
from insights_messaging.consumers import Consumer

from ccx_messaging.error import CCXMessagingError
Expand Down Expand Up @@ -246,6 +248,29 @@ def check_last_message_received_time(self):
# To do the minimum interruptions possible, sleep for one hour
time.sleep(MAX_ELAPSED_TIME_BETWEEN_MESSAGES)

def create_broker(self, input_msg):
"""Create a suitable `Broker` to be pass arguments to the `Engine`."""
broker = dr.Broker()

# Some engines expect some data for its own usage, like the following:
org_id = (
input_msg.get("identity", {}).get("identity").get("internal", {}).get("org_id", None)
)
date = datetime.fromisoformat(input_msg.get("timestamp", "0"))

broker["org_id"] = org_id
broker["cluster_id"] = input_msg["cluster_name"]
broker["original_path"] = input_msg["url"]
broker["year"] = date.year
broker["month"] = date.month
broker["day"] = date.day
broker["time"] = f"{date.hour}:{date.minute}:{date.second}"
broker["hour"] = date.hour
broker["minute"] = date.minute
broker["second"] = date.second

return broker

def process_dead_letter(self, msg: Message) -> None:
"""Send the message to a dead letter queue in a different Kafka topic."""
if not self.dlq_producer:
Expand Down
41 changes: 7 additions & 34 deletions ccx_messaging/engines/s3_upload_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,14 @@
"""S3 Engine Class and related functions."""

import json
import re
import logging
from collections import ChainMap

from insights_messaging.engine import Engine

from ccx_messaging.utils.s3_uploader import S3Uploader
from ccx_messaging.utils.sliced_template import SlicedTemplate
from ccx_messaging.error import CCXMessagingError


# Path example: <org_id>/<cluster_id>/<year><month><day><time>-<id>
# Following RE matches with S3 archives like the previous example and allow
# to extract named groups for the different meaningful components
S3_ARCHIVE_PATTERN = re.compile(
r"(?P<org_id>[0-9]+)\/" # extract named group for organization id
r"(?P<cluster_id>[0-9,a-z,-]{36})\/" # extract named group for the cluster_id
r"(?P<archive>" # extract named group for the archive name, including the following 3 lines
r"(?P<timestamp>" # extract the timestamp named group, including the following line
# Next line extract year, month, day and time named groups from the timestamp
r"(?P<year>[0-9]{4})(?P<month>[0-9]{2})(?P<day>[0-9]{2})(?P<time>[0-9]{6}))-"
r"(?P<id>[a-z,A-Z,0-9]*))" # Extract the id of the file as named group
)


LOG = logging.getLogger(__name__)


Expand Down Expand Up @@ -99,36 +84,24 @@ def process(self, broker, local_path):
LOG.info("Processing %s for uploading", local_path)
self.fire("pre_extract", broker, local_path)

s3_path = broker["s3_path"]

for w in self.watchers:
w.watch_broker(broker)

match_ = S3_ARCHIVE_PATTERN.match(s3_path)
if not match_:
LOG.warning("The archive doesn't match the expected pattern: %s")
exception = CCXMessagingError("Archive pattern name incorrect")
self.fire("on_engine_failure", broker, exception)
raise exception

components = ChainMap(broker, match_.groupdict())
target_path = self.compute_target_path(components)
LOG.info(f"Uploading archive '{s3_path}' as {self.dest_bucket}/{target_path}")
target_path = self.compute_target_path(broker)
LOG.info(f"Uploading archive '{local_path}' as {self.dest_bucket}/{target_path}")
self.uploader.upload_file(local_path, self.dest_bucket, target_path)
LOG.info(f"Uploaded archive '{s3_path}' as {self.dest_bucket}/{target_path}")
LOG.info(f"Uploaded archive '{local_path}' as {self.dest_bucket}/{target_path}")

metadata = create_metadata(components)
metadata = create_metadata(broker)
report = {
"path": target_path,
"original_path": s3_path,
"original_path": broker.get("original_path", ""),
"metadata": metadata,
}

LOG.info("Generated report: %s", report)
self.fire("on_engine_success", broker, report)

del broker["cluster_id"]
del broker["s3_path"]
return json.dumps(report)

def compute_target_path(self, components: dict[str, str]) -> str:
Expand Down
11 changes: 11 additions & 0 deletions test/consumers/kafka_consumer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,3 +510,14 @@ def test_run_fail(consumer_init_mock):
assert consumer_mock.consume.call_count == 1
assert consumer_mock.close.call_count == 1
assert not sut.process_msg.called


@pytest.mark.parametrize("_,deserialized_msg", _VALID_MESSAGES)
@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer", lambda *a, **k: MagicMock())
def test_broker_creation(_, deserialized_msg):
"""Check that create_broker returns the expected values."""
sut = KafkaConsumer(None, None, None, "topic")
broker = sut.create_broker(deserialized_msg)

assert broker["original_path"] == deserialized_msg["url"]
assert broker["org_id"] == deserialized_msg["identity"]["identity"]["internal"]["org_id"]
43 changes: 20 additions & 23 deletions test/engines/s3_uploader_engine_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,36 @@
from unittest.mock import MagicMock

from ccx_messaging.engines.s3_upload_engine import S3UploadEngine
from ccx_messaging.error import CCXMessagingError
from ccx_messaging.utils.s3_uploader import S3Uploader


BROKER = {
"cluster_id": "11111111-2222-3333-4444-555555555555",
"s3_path": "00000000/11111111-2222-3333-4444-555555555555/66666666666666-77777777777777777777777777777777", # noqa: E501
"org_id": "00000000",
"original_path": "00000000/11111111-2222-3333-4444-555555555555/66666666666666-77777777777777777777777777777777", # noqa: E501
"year": "6666",
"month": "66",
"day": "66",
"time": "666666",
"hour": "66",
"minute": "66",
"second": "66",
"id": "77777777777777777777777777777777",
}


BROKER2 = {
"cluster_id": "22222222-3333-4444-5555-666666666666",
"s3_path": "00000000/22222222-3333-4444-5555-666666666666/77777777777777-88888888888888888888888888888888", # noqa: E501
"org_id": "00000000",
"original_path": "00000000/22222222-3333-4444-5555-666666666666/77777777777777-88888888888888888888888888888888", # noqa: E501
"year": "7777",
"month": "77",
"day": "77",
"time": "777777",
"hour": "77",
"minute": "77",
"second": "77",
"id": "88888888888888888888888888888888",
}


Expand Down Expand Up @@ -141,26 +158,6 @@ def test_uploader_no_existing_file():
uploader.upload_file(path="file_path", bucket=DEST_BUCKET, file_name=METADATA.get("path"))


def test_unmatched_pattern():
"""Test uploading a file with an unexpected path."""
engine = S3UploadEngine(
None,
access_key="test",
secret_key="test",
endpoint="https://s3.amazonaws.com",
dest_bucket=DEST_BUCKET,
)
engine.uploader = MagicMock()
S3Uploader.client = MagicMock()

broker = {
"cluster_id": "11111111-2222-3333-4444-555555555555",
"s3_path": "66666666666666-77777777777777777777777777777777",
}
with pytest.raises(CCXMessagingError):
engine.process(broker, LOCAL_FILE_PATH)


def test_path_using_timestamp():
"""Check path creation using timestamp elements."""
engine = S3UploadEngine(
Expand Down
Loading