Skip to content

Commit

Permalink
Merge pull request #60 from RedHatInsights/cleanup_old_kafka
Browse files Browse the repository at this point in the history
Removing old Kafka bit and pieces that are not longer used
  • Loading branch information
joselsegura authored Apr 19, 2023
2 parents 9260024 + b59cb63 commit 4978f38
Show file tree
Hide file tree
Showing 14 changed files with 149 additions and 1,904 deletions.
404 changes: 0 additions & 404 deletions ccx_messaging/consumers/consumer.py

This file was deleted.

45 changes: 34 additions & 11 deletions ccx_messaging/consumers/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
import time
from threading import Thread

from confluent_kafka import Consumer as ConfluentConsumer, KafkaException, Message, Producer
from confluent_kafka import (
Consumer as ConfluentConsumer,
KafkaException,
Message,
Producer,
TIMESTAMP_NOT_AVAILABLE,
)
from insights_messaging.consumers import Consumer

from ccx_messaging.error import CCXMessagingError
Expand Down Expand Up @@ -128,19 +134,37 @@ def run(self):

def handles(self, msg: Message) -> bool:
"""Check headers, format and other characteristics that can make the message unusable."""
if not self.platform_service:
if self.platform_service:
headers = msg.headers()
if not headers:
LOG.debug("Message filtered: no headers in message")
return False

headers = dict(headers)
destination_service = headers.get("service", b"").decode()

if destination_service != self.platform_service:
LOG.debug("Message filtered: wrong detination service: %s", destination_service)
self.fire("on_filter")
return False

return self._handles_timestamp_check(msg)

def _handles_timestamp_check(self, msg: Message):
"""Check the timestamp of the msg."""
if self.max_record_age == -1:
return True

headers = msg.headers()
if not headers:
LOG.debug("Message filtered: no headers in message")
return False
timestamp_type, timestamp = msg.timestamp()

headers = dict(headers)
destination_service = headers.get("service", b"").decode()
if timestamp_type == TIMESTAMP_NOT_AVAILABLE:
LOG.debug("Cannot check the incoming message timestamp.")
return True

if destination_service != self.platform_service:
LOG.debug("Message filtered: wrong detination service: %s", destination_service)
# Kafka record timestamp is int64 in milliseconds.
current = time.time()
if (timestamp / 1000) < (current - self.max_record_age):
LOG.debug("Skipping message due to its timestamp (too old)")
return False

return True
Expand All @@ -156,7 +180,6 @@ def process_msg(self, msg: Message) -> None:

if not self.handles(msg):
# already logged in self.handles
self.fire("on_filter")
return

try:
Expand Down
145 changes: 0 additions & 145 deletions ccx_messaging/publishers/data_pipeline_publisher.py

This file was deleted.

144 changes: 0 additions & 144 deletions ccx_messaging/publishers/sha_publisher.py

This file was deleted.

8 changes: 0 additions & 8 deletions ccx_messaging/utils/kafka_config.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,5 @@
"""Functions to adapt kafka-python configuration parameters."""

from kafka import KafkaProducer


def producer_config(config):
"""Clean up the provided configuration in order to be used by a Kafka producer."""
producer_allowed_arguments = list(KafkaProducer.DEFAULT_CONFIG.keys())
return {key: value for key, value in config.items() if key in producer_allowed_arguments}


def translate_kafka_configuration(config: dict) -> dict:
"""Transform a dict with default Kafka configuration to kafka-python configuration."""
Expand Down
2 changes: 1 addition & 1 deletion ccx_messaging/watchers/payload_tracker_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self, topic, service_name="ccx-data-pipeline", kafka_broker_config=

if kafka_broker_config:
kwargs.update(kafka_broker_config)

kwargs = kafka_producer_config_cleanup(kwargs)

LOG.debug(
Expand Down
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ boto3==1.14.27
insights-core==3.0.277
git+https://github.com/RedHatInsights/insights-core-messaging@1.2.2
jsonschema==3.2.0
kafka-python==2.0.1
python-json-logger==0.1.11
requests==2.24.0
sentry-sdk==0.19.5
Expand Down
1 change: 0 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ install_requires =
insights-core>=3.1.2
insights-core-messaging>=1.2.1
jsonschema>=4.0.0
kafka-python>=2.0.2
prometheus-client>=0.16.0
PyYAML>=6.0
requests
Expand Down
Loading

0 comments on commit 4978f38

Please sign in to comment.