Skip to content

Commit

Permalink
New publisher for archive sync
Browse files Browse the repository at this point in the history
  • Loading branch information
Jakub Drobena committed Mar 13, 2024
1 parent f421501 commit 0d4c1e5
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 8 deletions.
4 changes: 3 additions & 1 deletion ccx_messaging/publishers/idp_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
from typing import Dict
import logging
from ccx_messaging.publishers.kafka_publisher import KafkaPublisher

LOG = logging.getLogger(__name__)


class IDPPublisher(KafkaPublisher):
"""Publisher for interanal data pipeline.""" # noqa: D203

"""Publisher for interanal data pipeline."""

def publish(self, input_msg: Dict, report: str) -> None:
"""Publish response as Kafka message to outgoing topic."""
Expand Down
5 changes: 2 additions & 3 deletions test/publishers/dvo_metrics_publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ def test_publish_bad_argument(wrong_input_msg):
@pytest.mark.parametrize("input, expected_output", VALID_INPUT_MSG)
def test_publish_valid(input, expected_output):
"""Check that Kafka producer is called with an expected message."""
report = "{\"workload_recommendations\": []}"
report = '{"workload_recommendations": []}'

expected_output = json.dumps(expected_output) + "\n"
sut = DVOMetricsPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"})
Expand Down Expand Up @@ -388,7 +388,7 @@ def test_error(input, output):
"ClusterName": "uuid",
"Metrics": {"version": 1, "pass": [], "info": [], "workload_recommendations": []},
"RequestId": "a request id",
"LastChecked": "a timestamp"
"LastChecked": "a timestamp",
},
id="valid_report",
)
Expand All @@ -414,4 +414,3 @@ def test_empty_dvo_results():
input = json.dumps({"version": 1, "reports": [], "pass": [], "info": []})
sut.publish(VALID_INPUT_MSG[0][0][0], input)
assert not sut.producer.produce.called

4 changes: 2 additions & 2 deletions test/publishers/idp_publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
{
"path": "target/path",
"original_path": "original/path",
"metadata": {"cluster_id": "12345","external_organization":"54321"},
"metadata": {"cluster_id": "12345", "external_organization": "54321"},
}
)
]


def timeStampMasking(message):
"""Mask four bytes in Gzip stream that contain timestamp."""
message = list(message)
Expand Down Expand Up @@ -76,4 +77,3 @@ def test_compressing_enabled(input):
outgoing_topic = pub.producer.produce.call_args[0][0]
outgoing_message = timeStampMasking(pub.producer.produce.call_args[0][1])
assert outgoing_message == expected_output and outgoing_topic == "topic-name"

4 changes: 2 additions & 2 deletions test/publishers/rule_processing_publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ def test_publish_bad_argument(wrong_input_msg):
@pytest.mark.parametrize("input, expected_output", VALID_INPUT_MSG)
def test_publish_valid(input, expected_output):
"""Check that Kafka producer is called with an expected message."""
report = "{\"reports\": []}"
report = '{"reports": []}'

expected_output = json.dumps(expected_output) + "\n"
sut = RuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"})
Expand Down Expand Up @@ -424,6 +424,7 @@ def test_filter_ocp_rules_results(input, expected_output):
sut.publish(VALID_INPUT_MSG[0][0][0], input)
sut.producer.produce.assert_called_with("outgoing_topic", expected_output.encode())


def test_empty_ocp_rules_results():
"""Check that the publisher does not send empty message."""
sut = RuleProcessingPublisher("outgoing_topic", {"bootstrap.servers": "kafka:9092"})
Expand All @@ -432,4 +433,3 @@ def test_empty_ocp_rules_results():
input = json.dumps({"version": 1, "workload_recommendations": [], "pass": [], "info": []})
sut.publish(VALID_INPUT_MSG[0][0][0], input)
assert not sut.producer.produce.called

0 comments on commit 0d4c1e5

Please sign in to comment.