Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CCXDEV-12259] New publisher for archive sync #159

Merged
merged 14 commits into from
Mar 13, 2024
18 changes: 18 additions & 0 deletions .github/workflows/linters.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
name: Linters
on:
- push
- pull_request

jobs:
ruff:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: chartboost/ruff-action@v1

pre-commit:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
- uses: pre-commit/action@v3.0.1
8 changes: 0 additions & 8 deletions .github/workflows/ruff.yml

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Pytest
name: Tests

on:
- push
Expand All @@ -16,16 +16,16 @@ jobs:
- "3.11"
- "3.12"
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- run: pip install --upgrade setuptools
- run: pip install --upgrade wheel
- run: pip install tox-gh>=1.2
- run: tox -vv
- name: Upload coverage
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v4
if: ${{ matrix.python-version == '3.8' }}
with:
token: c5c72a0d-fa95-4f38-8015-9aadbaf46466
token: ${{ secrets.CODECOV_TOKEN }}
1 change: 0 additions & 1 deletion ccx_messaging/consumers/idp_kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@


class IDPConsumer(KafkaConsumer):

"""Consumer based in Confluent Kafka for Internal Data Pipeline."""

def __init__(
Expand Down
1 change: 0 additions & 1 deletion ccx_messaging/consumers/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@


class KafkaConsumer(Consumer):

"""Consumer based in Confluent Kafka."""

def __init__(
Expand Down
1 change: 0 additions & 1 deletion ccx_messaging/downloaders/http_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ def parse_human_input(file_size):

# pylint: disable=too-few-public-methods
class HTTPDownloader:

"""Downloader for HTTP uris."""

# https://<hostname>/service_id/file_id?<credentials and other params>
Expand Down
1 change: 0 additions & 1 deletion ccx_messaging/engines/sha_extractor_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@


class SHAExtractorEngine(ICMEngine):

"""Engine for extraction of downloading tar archive and selecting a file to be processed."""

def process(self, broker, path):
Expand Down
1 change: 0 additions & 1 deletion ccx_messaging/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@


class CCXMessagingError(Exception):

"""Represents a CCX messaging exception.

This should make it easier to differentiate between
Expand Down
1 change: 0 additions & 1 deletion ccx_messaging/publishers/dvo_metrics_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@


class DVOMetricsPublisher(KafkaPublisher):

"""DVOMetricsPublisher handles the result of the extraction of DVO metrics from an archive."""

def publish(self, input_msg: Dict, report: str) -> None:
Expand Down
1 change: 0 additions & 1 deletion ccx_messaging/publishers/kafka_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@


class KafkaPublisher(Publisher):

"""KafkaPublisher is a base class for Kafka based publishers.

It relays on Confluent Kafka library to perform the Kafka related
Expand Down
1 change: 0 additions & 1 deletion ccx_messaging/publishers/rule_processing_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@


class RuleProcessingPublisher(KafkaPublisher):

"""RuleProcessingPublisher handles the results of the applied rules and publish them to Kafka.

The results of the data analysis are received as a JSON (string)
Expand Down
51 changes: 51 additions & 0 deletions ccx_messaging/publishers/synced_archive_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright 2024 Red Hat, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may naot use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Module implementing a IDP publisher to Kafka topic."""


import json
from typing import Dict
import logging
from ccx_messaging.publishers.kafka_publisher import KafkaPublisher

LOG = logging.getLogger(__name__)


class SyncedArchivePublisher(KafkaPublisher):
"""Publisher for interanal data pipeline.""" # noqa: D203

def publish(self, input_msg: Dict, report: str) -> None:
"""Publish response as Kafka message to outgoing topic."""
output_msg = json.loads(report)
output_msg.pop("reports", None)
message = json.dumps(output_msg) + "\n"

LOG.debug("Sending response to the %s topic.", self.topic)
# Convert message string into a byte array.
self.produce(message.encode("utf-8"))
LOG.debug("Message has been sent successfully.")
LOG.debug(
"Message context: path=%s, original_path=%s, " 'metadata="%s"',
output_msg["path"],
output_msg["original_path"],
output_msg["metadata"],
)

LOG.debug(
"Status: Success; " "Topic: %s; " "Partition: %s; " "Offset: %s; ",
input_msg.get("topic"),
input_msg.get("partition"),
input_msg.get("offset"),
)
1 change: 0 additions & 1 deletion ccx_messaging/publishers/workloads_info_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@


class WorkloadInfoPublisher(KafkaPublisher):

"""WorkloadInfoPublisher based on Confluent Kafka Producer.

The workload info is received from a custom engine as a JSON string.
Expand Down
1 change: 0 additions & 1 deletion ccx_messaging/utils/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ def setup_watchtower(logging_config=None):


class CloudWatchFormatter(jsonlogger.JsonFormatter):

"""Class that implements formatter for logging to CloudWatch."""

def __init__(self, *args, **kwargs):
Expand Down
1 change: 0 additions & 1 deletion ccx_messaging/watchers/cluster_id_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@


class ClusterIdWatcher(EngineWatcher, ConsumerWatcher):

"""Mixed `Watcher` that is able to watch both `Consumer` and `Engine`."""

CLUSTER_ID_LENGTH = 36
Expand Down
1 change: 0 additions & 1 deletion ccx_messaging/watchers/consumer_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@


class ConsumerWatcher(ICMConsumerWatcher):

"""Extension of the ConsumerWatcher class to allow handling new callbacks."""

def on_not_handled(self, input_msg):
Expand Down
1 change: 0 additions & 1 deletion ccx_messaging/watchers/payload_tracker_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@


class PayloadTrackerWatcher(ConsumerWatcher):

"""`Watcher` implementation to handle Payload Tracker updates."""

def __init__(self, topic, service_name="ccx-data-pipeline", kafka_broker_config=None, **kwargs):
Expand Down
1 change: 0 additions & 1 deletion ccx_messaging/watchers/stats_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

# pylint: disable=too-many-instance-attributes
class StatsWatcher(ConsumerWatcher):

"""A Watcher that stores different Prometheus `Counter`s."""

def __init__(self, prometheus_port=8000):
Expand Down
60 changes: 27 additions & 33 deletions constraints.txt
Original file line number Diff line number Diff line change
@@ -1,52 +1,46 @@
aiobotocore==2.7.0
aiohttp==3.9.1
aiobotocore==2.12.1
aiohttp==3.9.3
aioitertools==0.11.0
aiosignal==1.3.1
app-common-python==0.2.6
async-timeout==4.0.3
attrs==23.1.0
boto3==1.28.64
botocore==1.31.64
CacheControl==0.13.1
certifi==2023.11.17
attrs==23.2.0
boto3==1.34.51
botocore==1.34.51
CacheControl==0.14.0
certifi==2024.2.2
charset-normalizer==3.3.2
confluent-kafka==2.3.0
decorator==5.1.1
defusedxml==0.7.1
filelock==3.13.1
frozenlist==1.4.0
fsspec==2023.10.0
frozenlist==1.4.1
fsspec==2024.2.0
idna==3.6
importlib-resources==6.1.1
insights-core==3.2.24
insights-core-messaging @ git+https://github.com/RedHatInsights/insights-core-messaging@85ff15f7ccb0a5bd884787dfe8544e4112df8fd5
Jinja2==3.1.2
insights-core==3.3.12
insights-core-messaging @ git+https://github.com/RedHatInsights/insights-core-messaging@a3b63a9dbcf307b87909d77640b204b1093e3ef5
Jinja2==3.1.3
jmespath==1.0.1
jsonschema==4.20.0
jsonschema-specifications==2023.11.1
jsonschema==4.21.1
jsonschema-specifications==2023.12.1
lockfile==0.12.2
logstash-formatter==0.5.17
MarkupSafe==2.1.3
msgpack==1.0.7
multidict==6.0.4
pkgutil_resolve_name==1.3.10
prometheus-client==0.19.0
MarkupSafe==2.1.5
msgpack==1.0.8
multidict==6.0.5
prometheus_client==0.20.0
py==1.11.0
python-dateutil==2.8.2
python-dateutil==2.9.0.post0
python-json-logger==2.0.7
PyYAML==6.0.1
redis==5.0.1
referencing==0.31.0
redis==5.0.3
referencing==0.33.0
requests==2.31.0
retry==0.9.2
rpds-py==0.13.1
s3fs==2023.10.0
s3transfer==0.7.0
sentry-sdk==1.37.1
rpds-py==0.18.0
s3fs==2024.2.0
s3transfer==0.10.0
sentry-sdk==1.41.0
six==1.16.0
typing_extensions==4.8.0
urllib3==1.26.18
watchtower==3.0.1
watchtower==3.1.0
wrapt==1.16.0
yarl==1.9.3
zipp==3.17.0
yarl==1.9.4
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ classifiers = [
requires-python = ">= 3.8"
dependencies = [
"app-common-python>=0.2.3",
"boto3<=1.34.59",
"boto3>=1.34.1,<1.34.52", # limited due to incompatibility between s3fs and newer versions of boto
"confluent-kafka>=2.0.0",
"insights-core>=3.1.2",
"insights-core-messaging>=1.2.4",
Expand Down Expand Up @@ -69,5 +69,5 @@ dev = [

[tool.ruff]
select = ["E", "F", "W", "UP", "C", "D"]
ignore = ["D211", "C401", "D213", "UP006", "UP007", "UP009", "UP015", "UP035"]
ignore = ["D211", "C401", "D203", "D213", "UP006", "UP007", "UP009", "UP015", "UP035"]
line-length = 100
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
app-common-python>=0.2.3
boto3<=1.34.59
boto3>=1.34.1,<1.34.52
confluent-kafka>=2.0.0
insights-core>=3.1.2
insights-core-messaging @ git+https://github.com/RedHatInsights/insights-core-messaging@1.2.4
Expand Down
1 change: 0 additions & 1 deletion test/consumers/kafka_consumer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@


class KafkaMessage:

"""Test double for the confluent_kafka.Message class."""

def __init__(self, msg, headers=None, timestamp=None):
Expand Down
5 changes: 2 additions & 3 deletions test/publishers/dvo_metrics_publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ def test_publish_bad_argument(wrong_input_msg):
@pytest.mark.parametrize("input, expected_output", VALID_INPUT_MSG)
def test_publish_valid(input, expected_output):
"""Check that Kafka producer is called with an expected message."""
report = "{\"workload_recommendations\": []}"
report = '{"workload_recommendations": []}'

expected_output = json.dumps(expected_output) + "\n"
sut = DVOMetricsPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"})
Expand Down Expand Up @@ -388,7 +388,7 @@ def test_error(input, output):
"ClusterName": "uuid",
"Metrics": {"version": 1, "pass": [], "info": [], "workload_recommendations": []},
"RequestId": "a request id",
"LastChecked": "a timestamp"
"LastChecked": "a timestamp",
},
id="valid_report",
)
Expand All @@ -414,4 +414,3 @@ def test_empty_dvo_results():
input = json.dumps({"version": 1, "reports": [], "pass": [], "info": []})
sut.publish(VALID_INPUT_MSG[0][0][0], input)
assert not sut.producer.produce.called

4 changes: 2 additions & 2 deletions test/publishers/rule_processing_publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ def test_publish_bad_argument(wrong_input_msg):
@pytest.mark.parametrize("input, expected_output", VALID_INPUT_MSG)
def test_publish_valid(input, expected_output):
"""Check that Kafka producer is called with an expected message."""
report = "{\"reports\": []}"
report = '{"reports": []}'

expected_output = json.dumps(expected_output) + "\n"
sut = RuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"})
Expand Down Expand Up @@ -424,6 +424,7 @@ def test_filter_ocp_rules_results(input, expected_output):
sut.publish(VALID_INPUT_MSG[0][0][0], input)
sut.producer.produce.assert_called_with("outgoing_topic", expected_output.encode())


def test_empty_ocp_rules_results():
"""Check that the publisher does not send empty message."""
sut = RuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"})
Expand All @@ -432,4 +433,3 @@ def test_empty_ocp_rules_results():
input = json.dumps({"version": 1, "workload_recommendations": [], "pass": [], "info": []})
sut.publish(VALID_INPUT_MSG[0][0][0], input)
assert not sut.producer.produce.called

Loading
Loading