From cd457585839634531e7868e4404a2417a8d7337c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Luis=20Segura=20Lucas?= Date: Tue, 18 Apr 2023 16:19:56 +0200 Subject: [PATCH 1/2] Removing old Kafka bit and pieces that are not longer used --- ccx_messaging/consumers/consumer.py | 404 ------------- ccx_messaging/consumers/kafka_consumer.py | 45 +- .../publishers/data_pipeline_publisher.py | 145 ----- ccx_messaging/publishers/sha_publisher.py | 144 ----- ccx_messaging/utils/kafka_config.py | 9 - .../watchers/payload_tracker_watcher.py | 2 +- requirements.txt | 1 - setup.cfg | 1 - test/consumers/consumer_test.py | 568 ------------------ test/consumers/kafka_consumer_test.py | 130 +++- .../data_pipeline_publisher_test.py | 278 --------- test/publishers/sha_publisher_test.py | 303 ---------- test/test_utils.py | 20 - test/utils/kafka_config_test.py | 5 +- 14 files changed, 149 insertions(+), 1906 deletions(-) delete mode 100644 ccx_messaging/consumers/consumer.py delete mode 100644 ccx_messaging/publishers/data_pipeline_publisher.py delete mode 100644 ccx_messaging/publishers/sha_publisher.py delete mode 100644 test/consumers/consumer_test.py delete mode 100644 test/publishers/data_pipeline_publisher_test.py delete mode 100644 test/publishers/sha_publisher_test.py diff --git a/ccx_messaging/consumers/consumer.py b/ccx_messaging/consumers/consumer.py deleted file mode 100644 index f550b79..0000000 --- a/ccx_messaging/consumers/consumer.py +++ /dev/null @@ -1,404 +0,0 @@ -# Copyright 2022 Red Hat Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Consumer implementation based on base Kafka class.""" - -import json -import logging -import base64 - -import binascii -import time -from threading import Thread -from signal import signal, alarm, SIGALRM - -import jsonschema -from insights_messaging.consumers import Consumer as ICMConsumer - -from kafka import KafkaConsumer, KafkaProducer -from kafka.consumer.fetcher import ConsumerRecord - -from ccx_messaging.error import CCXMessagingError -from ccx_messaging.schemas import INPUT_MESSAGE_SCHEMA, IDENTITY_SCHEMA -from ccx_messaging.utils.kafka_config import producer_config, translate_kafka_configuration - - -LOG = logging.getLogger(__name__) -MAX_ELAPSED_TIME_BETWEEN_MESSAGES = 60 * 60 - - -def handle_message_processing_timeout(signalnum, handler): - """ - Handle alarm raised when message processing takes too much time. - - An exception is raised, and is handled by the insights-core-messaging - Consumer's process method. This way, the currently monitored metrics are still - applied, and we can handle the behavior when TimeoutError is raised. - """ - raise TimeoutError("Couldn't process message in the given time frame.") - - -class Consumer(ICMConsumer): - """ - Consumer implementation based on base Kafka class. - - This consumer retrieves a message at a time from a configure source (which is Kafka), - extracts an URL from it, downloads an archive using the configured downloader, and - then passes the file to an internal engine for further processing. - """ - - def __init__( - self, - publisher, - downloader, - engine, - incoming_topic, - kafka_broker_config=None, - dead_letter_queue_topic=None, - max_record_age=7200, - retry_backoff_ms=1000, - processing_timeout_s=0, - **kwargs, - ): # pylint: disable=too-many-arguments - """Construct a new external data pipeline Kafka consumer.""" - bootstrap_servers = kwargs.get("bootstrap_servers", None) - if isinstance(bootstrap_servers, str): - bootstrap_servers = bootstrap_servers.split(",") - - if bootstrap_servers: - kwargs["bootstrap_servers"] = bootstrap_servers - - LOG.info( - "Consuming topic '%s' from brokers %s as group '%s'", - incoming_topic, - kwargs.get("bootstrap_servers", None), - kwargs.get("group_id", None), - ) - - requeuer = kwargs.pop("requeuer", None) - - super().__init__(publisher, downloader, engine, requeuer=requeuer) - - kafka_broker_cfg = translate_kafka_configuration(kafka_broker_config) - kwargs.update(kafka_broker_cfg) - - self.consumer = KafkaConsumer( - incoming_topic, - value_deserializer=self.deserialize, - retry_backoff_ms=retry_backoff_ms, - **kwargs, - ) - - self.max_record_age = max_record_age - self.log_pattern = f"topic: {incoming_topic}, group_id: {kwargs.get('group_id', None)}" - - self.last_received_message_time = time.time() - - self.check_elapsed_time_thread = Thread( - target=self.check_last_message_received_time, daemon=True - ) - self.check_elapsed_time_thread.start() - - self.processing_timeout = processing_timeout_s - - self.dlq_producer = None - self.dead_letter_queue_topic = dead_letter_queue_topic - - if self.dead_letter_queue_topic is not None: - dlq_producer_config = producer_config(kwargs) - self.dlq_producer = KafkaProducer(**dlq_producer_config) - - def _consume(self, msg): - try: - alarm(self.processing_timeout) - if self.handles(msg): - self.process(msg) - else: - self.process_dead_letter(msg) - alarm(0) - except TimeoutError as ex: - LOG.exception(ex) - self.process_dead_letter(msg) - self.fire("on_process_timeout") - except Exception as ex: - LOG.exception(ex) - self.process_dead_letter(msg) - - # pylint: disable=broad-except - def run(self): - """Execute the consumer logic.""" - signal(SIGALRM, handle_message_processing_timeout) - for msg in self.consumer: - self._consume(msg) - - def process_dead_letter(self, msg): - """Send unprocessed message to the dead letter queue topic.""" - if not self.dlq_producer: - return - - if isinstance(msg, ConsumerRecord): - self.dlq_producer.send(self.dead_letter_queue_topic, str(msg.value).encode("utf-8")) - else: - # just add at least some record in case that the message is not of the expected type - self.dlq_producer.send(self.dead_letter_queue_topic, str(msg).encode("utf-8")) - - def _validate(self, msg): - try: - jsonschema.validate(instance=msg, schema=INPUT_MESSAGE_SCHEMA) - LOG.debug("JSON schema validated (%s)", self.log_pattern) - b64_identity = msg["b64_identity"] - - if isinstance(b64_identity, str): - b64_identity = b64_identity.encode() - - decoded_identity = json.loads(base64.b64decode(b64_identity)) - jsonschema.validate(instance=decoded_identity, schema=IDENTITY_SCHEMA) - LOG.debug("Identity schema validated (%s)", self.log_pattern) - - msg["ClusterName"] = ( - decoded_identity.get("identity", {}).get("system", {}).get("cluster_id", None) - ) - - msg["identity"] = decoded_identity - del msg["b64_identity"] - return msg - - except json.JSONDecodeError as ex: - return CCXMessagingError(f"Unable to decode received message: {ex}") - - except jsonschema.ValidationError as ex: - return CCXMessagingError(f"Invalid input message JSON schema: {ex}") - - except binascii.Error as ex: - return CCXMessagingError(f"Base64 encoded identity could not be parsed: {ex}") - - def deserialize(self, bytes_): - """ - Deserialize JSON message received from Kafka. - - Returns: - dict: Deserialized input message if successful. - DataPipelineError: Exception containing error message if anything failed. - - The exception is returned instead of being thrown in order to prevent - breaking the message handling / polling loop in `Consumer.run`. - """ - LOG.debug("Deserializing incoming bytes (%s)", self.log_pattern) - - if isinstance(bytes_, (str, bytes, bytearray)): - try: - return self._validate(json.loads(bytes_)) - except json.JSONDecodeError as ex: - return CCXMessagingError(f"Unable to decode received message: {ex}") - else: - return CCXMessagingError(f"Unexpected input message type: {bytes_.__class__.__name__}") - - def _handles_timestamp_check(self, input_msg): - if not isinstance(input_msg.timestamp, int): - LOG.error( - "Unexpected Kafka record timestamp type (expected 'int', got '%s')(%s)", - input_msg.timestamp.__class__.__name__, - Consumer.get_stringfied_record(input_msg), - ) - return False - - if self.max_record_age == -1: - return True - - # Kafka record timestamp is int64 in milliseconds. - if (input_msg.timestamp / 1000) < (time.time() - self.max_record_age): - LOG.debug("Skipping old message (%s)", Consumer.get_stringfied_record(input_msg)) - return False - - return True - - def handles(self, input_msg): - """Check format of the input message and decide if it can be handled by this consumer.""" - if not isinstance(input_msg, ConsumerRecord): - LOG.debug( - "Unexpected input message type (expected 'ConsumerRecord', got %s)(%s)", - input_msg.__class__.__name__, - Consumer.get_stringfied_record(input_msg), - ) - self.fire("on_not_handled", input_msg) - return False - - if isinstance(input_msg.value, CCXMessagingError): - LOG.error( - "%s (topic: '%s', partition: %d, offset: %d, timestamp: %d)", - input_msg.value.format(input_msg), - input_msg.topic, - input_msg.partition, - input_msg.offset, - input_msg.timestamp, - ) - return False - - if not self._handles_timestamp_check(input_msg): - return False - - # ---- Redundant checks. Already checked by JSON schema in `deserialize`. ---- - # These checks are actually triggered by some of the unit tests for this method. - if not isinstance(input_msg.value, dict): - LOG.debug( - "Unexpected input message value type (expected 'dict', got '%s') (%s)", - input_msg.value.__class__.__name__, - Consumer.get_stringfied_record(input_msg), - ) - self.fire("on_not_handled", input_msg) - return False - - if "url" not in input_msg.value: - LOG.debug( - "Input message is missing a 'url' field: %s " "(%s)", - input_msg.value, - Consumer.get_stringfied_record(input_msg), - ) - self.fire("on_not_handled", input_msg) - return False - # ---------------------------------------------------------------------------- - # Set timestamp of last processed message - self.last_received_message_time = time.time() - - return True - - def get_url(self, input_msg): - """ - Retrieve URL to storage (S3/Minio) from Kafka message. - - Same as previous 2 methods, when we receive and figure out the - message format, we can modify this method - """ - try: - url = input_msg.value["url"] - LOG.debug( - "Extracted URL from input message: %s (%s)", - url, - Consumer.get_stringfied_record(input_msg), - ) - return url - - # This should never happen, but let's check it just to be absolutely sure. - # The `handles` method should prevent this from - # being called if the input message format is wrong. - except Exception as ex: - raise CCXMessagingError(f"Unable to extract URL from input message: {ex}") from ex - - @staticmethod - def get_stringfied_record(input_record): - """Retrieve a string with information about the received record ready to log.""" - return ( - f"topic: '{input_record.topic}', partition: {input_record.partition}, " - f"offset: {input_record.offset}, timestamp: {input_record.timestamp}" - ) - - def check_last_message_received_time(self): - """ - Verify elapsed time between received messages and warn if too long. - - Checks if the last received message was received more than one hour ago - and sends an alert if it is the case - """ - while True: - if time.time() - self.last_received_message_time >= MAX_ELAPSED_TIME_BETWEEN_MESSAGES: - last_received_time_str = time.strftime( - "%Y-%m-%d- %H:%M:%S", time.gmtime(self.last_received_message_time) - ) - LOG.warning("No new messages in the queue since %s", last_received_time_str) - # To do the minimum interruptions possible, sleep for one hour - time.sleep(MAX_ELAPSED_TIME_BETWEEN_MESSAGES) - - -class AnemicConsumer(Consumer): - """ - Consumer implementation based on base Kafka class. - - This consumer retrieves a message at a time from a configure source (which is Kafka), - verifies if the message should be processed by it, and if so, extracts an URL from it, - downloads an archive using the configured downloader, and then passes the file to an - internal engine for further processing. - """ - - EXPECTED_SERVICE_DEBUG_MESSAGE = "Received message for expected service." - OTHER_SERVICE_DEBUG_MESSAGE = "Message is for {} service. Ignoring" - NO_SERVICE_DEBUG_MESSAGE = "Message does not specify destination service. Ignoring" - NO_HEADER_DEBUG_MESSAGE = "Message does not contain headers. Ignoring" - - def __init__( - self, - publisher, - downloader, - engine, - incoming_topic, - platform_service=None, - dead_letter_queue_topic=None, - max_record_age=7200, - retry_backoff_ms=1000, - processing_timeout_s=0, - **kwargs, - ): - """Initialize an `AnemicConsumer` object.""" - super().__init__( - publisher, - downloader, - engine, - incoming_topic, - dead_letter_queue_topic, - max_record_age, - retry_backoff_ms, - processing_timeout_s, - **kwargs, - ) - self.platform_service = platform_service.encode("utf-8") - - def deserialize(self, bytes_): - """ - Deserialize JSON message received from Kafka. - - Returns: - dict: Deserialized input message if successful. - DataPipelineError: Exception containing error message if anything failed. - - The exception is returned instead of being thrown in order to prevent - breaking the message handling / polling loop in `Consumer.run`. - """ - LOG.debug("Deserializing incoming bytes (%s)", self.log_pattern) - - if isinstance(bytes_, (str, bytes, bytearray)): - try: - return json.loads(bytes_) - except json.JSONDecodeError as ex: - return CCXMessagingError(f"Unable to decode received message: {ex}") - else: - return CCXMessagingError(f"Unexpected input message type: {bytes_.__class__.__name__}") - - def run(self): - """Execute the consumer logic.""" - signal(SIGALRM, handle_message_processing_timeout) - for msg in self.consumer: - headers = dict(msg.headers) - if not headers: - LOG.debug(AnemicConsumer.NO_HEADER_DEBUG_MESSAGE) - continue - service = headers.get("service") - if service: - if service != self.platform_service: - LOG.debug(AnemicConsumer.OTHER_SERVICE_DEBUG_MESSAGE.format(service)) - continue - LOG.debug(AnemicConsumer.EXPECTED_SERVICE_DEBUG_MESSAGE) - msg_value = self._validate(msg.value) - msg = msg._replace(value=msg_value) - self._consume(msg) - else: - LOG.debug(AnemicConsumer.NO_SERVICE_DEBUG_MESSAGE) diff --git a/ccx_messaging/consumers/kafka_consumer.py b/ccx_messaging/consumers/kafka_consumer.py index 7bc8cb9..a53bd07 100644 --- a/ccx_messaging/consumers/kafka_consumer.py +++ b/ccx_messaging/consumers/kafka_consumer.py @@ -4,7 +4,13 @@ import time from threading import Thread -from confluent_kafka import Consumer as ConfluentConsumer, KafkaException, Message, Producer +from confluent_kafka import ( + Consumer as ConfluentConsumer, + KafkaException, + Message, + Producer, + TIMESTAMP_NOT_AVAILABLE, +) from insights_messaging.consumers import Consumer from ccx_messaging.error import CCXMessagingError @@ -128,19 +134,37 @@ def run(self): def handles(self, msg: Message) -> bool: """Check headers, format and other characteristics that can make the message unusable.""" - if not self.platform_service: + if self.platform_service: + headers = msg.headers() + if not headers: + LOG.debug("Message filtered: no headers in message") + return False + + headers = dict(headers) + destination_service = headers.get("service", b"").decode() + + if destination_service != self.platform_service: + LOG.debug("Message filtered: wrong detination service: %s", destination_service) + self.fire("on_filter") + return False + + return self._handles_timestamp_check(msg) + + def _handles_timestamp_check(self, msg: Message): + """Check the timestamp of the msg.""" + if self.max_record_age == -1: return True - headers = msg.headers() - if not headers: - LOG.debug("Message filtered: no headers in message") - return False + timestamp_type, timestamp = msg.timestamp() - headers = dict(headers) - destination_service = headers.get("service", b"").decode() + if timestamp_type == TIMESTAMP_NOT_AVAILABLE: + LOG.debug("Cannot check the incoming message timestamp.") + return True - if destination_service != self.platform_service: - LOG.debug("Message filtered: wrong detination service: %s", destination_service) + # Kafka record timestamp is int64 in milliseconds. + current = time.time() + if (timestamp / 1000) < (current - self.max_record_age): + LOG.debug("Skipping message due to its timestamp (too old)") return False return True @@ -156,7 +180,6 @@ def process_msg(self, msg: Message) -> None: if not self.handles(msg): # already logged in self.handles - self.fire("on_filter") return try: diff --git a/ccx_messaging/publishers/data_pipeline_publisher.py b/ccx_messaging/publishers/data_pipeline_publisher.py deleted file mode 100644 index a059dd9..0000000 --- a/ccx_messaging/publishers/data_pipeline_publisher.py +++ /dev/null @@ -1,145 +0,0 @@ -# Copyright 2019, 2020, 2021, 2022 Red Hat Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Module that implements a custom Kafka publisher.""" - -import json -import logging - -from insights_messaging.publishers import Publisher -from kafka import KafkaProducer - -from ccx_messaging.error import CCXMessagingError -from ccx_messaging.utils.kafka_config import producer_config, translate_kafka_configuration - -LOG = logging.getLogger(__name__) - - -class DataPipelinePublisher(Publisher): - """ - DataPipelinePublisher based on the base Kafka publisher. - - The results of the data analysis are received as a JSON (string) - and turned into a byte array using UTF-8 encoding. - The bytes are then sent to the output Kafka topic. - - Custom error handling for the whole pipeline is implemented here. - """ - - def __init__(self, outgoing_topic, bootstrap_servers, kafka_broker_config=None, **kwargs): - """Construct a new `DataPipelinePublisher` given `kwargs` from the config YAML.""" - self.topic = outgoing_topic - self.bootstrap_servers = bootstrap_servers - if self.topic is None: - raise KeyError("outgoing_topic") - - kafka_broker_cfg = translate_kafka_configuration(kafka_broker_config) - kwargs.update(kafka_broker_cfg) - - if "bootstrap_servers" not in kwargs: - kwargs["bootstrap_servers"] = self.bootstrap_servers - else: - self.bootstrap_servers = kwargs["bootstrap_servers"] - - kwargs = producer_config(kwargs) - LOG.debug( - "Confluent Kafka publisher configuration arguments: " - "Server: %s. " - "Topic: %s. " - "Security protocol: %s. " - "", - kwargs.get("bootstrap_servers"), - self.topic, - kwargs.get("security_protocol"), - ) - - self.producer = KafkaProducer(**kwargs) - LOG.info("Producing to topic '%s' on brokers %s", self.topic, self.bootstrap_servers) - self.outdata_schema_version = 2 - - def publish(self, input_msg, response): - """ - Publish an EOL-terminated JSON message to the output Kafka topic. - - The response is assumed to be a string representing a valid JSON object. - A newline character will be appended to it, it will be converted into - a byte array using UTF-8 encoding and the result of that will be sent - to the producer to produce a message in the output Kafka topic. - """ - # Response is already a string, no need to JSON dump. - output_msg = {} - try: - org_id = int(input_msg["identity"]["identity"]["internal"]["org_id"]) - except ValueError as err: - raise CCXMessagingError(f"Error extracting the OrgID: {err}") from err - - try: - account_number = int(input_msg["identity"]["identity"]["account_number"]) - except ValueError as err: - raise CCXMessagingError(f"Error extracting the Account number: {err}") from err - - message = "" - try: - msg_timestamp = input_msg["timestamp"] - output_msg = { - "OrgID": org_id, - "AccountNumber": account_number, - "ClusterName": input_msg["cluster_name"], - "Report": json.loads(response), - "LastChecked": msg_timestamp, - "Version": self.outdata_schema_version, - "RequestId": input_msg.get("request_id"), - } - - message = json.dumps(output_msg) + "\n" - - LOG.debug("Sending response to the %s topic.", self.topic) - # Convert message string into a byte array. - self.producer.send(self.topic, message.encode("utf-8")) - LOG.debug("Message has been sent successfully.") - LOG.debug( - "Message context: OrgId=%s, AccountNumber=%s, " - 'ClusterName="%s", LastChecked="%s, Version=%d"', - output_msg["OrgID"], - output_msg["AccountNumber"], - output_msg["ClusterName"], - output_msg["LastChecked"], - output_msg["Version"], - ) - - LOG.info( - "Status: Success; " - "Topic: %s; " - "Partition: %s; " - "Offset: %s; " - "LastChecked: %s", - input_msg.get("topic"), - input_msg.get("partition"), - input_msg.get("offset"), - msg_timestamp, - ) - - except UnicodeEncodeError as err: - raise CCXMessagingError(f"Error encoding the response to publish: {message}") from err - - def error(self, input_msg, ex): - """Handle pipeline errors by logging them.""" - # The super call is probably unnecessary because the default behavior - # is to do nothing, but let's call it in case it ever does anything. - super().error(input_msg, ex) - - if not isinstance(ex, CCXMessagingError): - ex = CCXMessagingError(ex) - - LOG.error(ex.format(input_msg)) diff --git a/ccx_messaging/publishers/sha_publisher.py b/ccx_messaging/publishers/sha_publisher.py deleted file mode 100644 index 97d8a61..0000000 --- a/ccx_messaging/publishers/sha_publisher.py +++ /dev/null @@ -1,144 +0,0 @@ -# Copyright 2022 Red Hat Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Module that implements a custom Kafka publisher.""" - -import json -import logging - -from insights_messaging.publishers import Publisher -from kafka import KafkaProducer - -from ccx_messaging.error import CCXMessagingError -from ccx_messaging.utils.kafka_config import producer_config, translate_kafka_configuration - -LOG = logging.getLogger(__name__) - - -class SHAPublisher(Publisher): - """ - SHAPublisher based on the base Kafka publisher. - - The SHA records are received as a JSON (string) - and turned into a byte array using UTF-8 encoding. - The bytes are then sent to the output Kafka topic. - - Custom error handling for the whole pipeline is implemented here. - """ - - def __init__(self, outgoing_topic, bootstrap_servers, kafka_broker_config=None, **kwargs): - """Construct a new `SHAPublisher` given `kwargs` from the config YAML.""" - self.topic = outgoing_topic - self.bootstrap_servers = bootstrap_servers - if self.topic is None: - raise KeyError("outgoing_topic") - - kafka_broker_cfg = translate_kafka_configuration(kafka_broker_config) - kwargs.update(kafka_broker_cfg) - - if "bootstrap_servers" not in kwargs: - kwargs["bootstrap_servers"] = self.bootstrap_servers - - self.producer = KafkaProducer(**producer_config(kwargs)) - LOG.info("Producing to topic '%s' on brokers %s", self.topic, self.bootstrap_servers) - self.outdata_schema_version = 2 - - def publish(self, input_msg, response): - """ - Publish an EOL-terminated JSON message to the output Kafka topic. - - The input_msg contains content of message read from incoming Kafka - topic. Such message should contains account info, cluster ID etc. - - The response is assumed to be a string representing a valid JSON object - (it is read from file config/workload_info.json). - - Outgoing message is constructed by joining input_msg with response. - - A newline character will be appended to it, it will be converted into - a byte array using UTF-8 encoding and the result of that will be sent - to the producer to produce a message in the output Kafka topic. - """ - # read all required attributes from input_msg - try: - org_id = int(input_msg["identity"]["identity"]["internal"]["org_id"]) - except ValueError as err: - raise CCXMessagingError(f"Error extracting the OrgID: {err}") from err - - try: - account_number = int(input_msg["identity"]["identity"]["account_number"]) - except ValueError as err: - raise CCXMessagingError(f"Error extracting the Account number: {err}") from err - - # outgoing message in form of JSON - message = "" - - if response is not None: - try: - msg_timestamp = input_msg["timestamp"] - output_msg = { - "OrgID": org_id, - "AccountNumber": account_number, - "ClusterName": input_msg["cluster_name"], - "Images": json.loads(response), - "LastChecked": msg_timestamp, - "Version": self.outdata_schema_version, - "RequestId": input_msg.get("request_id"), - } - - # convert dictionary to JSON (string) - message = json.dumps(output_msg) + "\n" - - LOG.debug("Sending response to the %s topic.", self.topic) - - # Convert message string into a byte array. - self.producer.send(self.topic, message.encode("utf-8")) - LOG.debug("Message has been sent successfully.") - LOG.debug( - "Message context: OrgId=%s, AccountNumber=%s, " - 'ClusterName="%s", NumImages: %d, LastChecked="%s, Version=%d"', - output_msg["OrgID"], - output_msg["AccountNumber"], - output_msg["ClusterName"], - len(output_msg["Images"]), - output_msg["LastChecked"], - output_msg["Version"], - ) - - LOG.info( - "Status: Success; " - "Topic: %s; " - "Partition: %s; " - "Offset: %s; " - "LastChecked: %s", - input_msg["topic"], - input_msg["partition"], - input_msg["offset"], - msg_timestamp, - ) - except UnicodeEncodeError as err: - raise CCXMessagingError( - f"Error encoding the response to publish: {message}" - ) from err - - def error(self, input_msg, ex): - """Handle pipeline errors by logging them.""" - # The super call is probably unnecessary because the default behavior - # is to do nothing, but let's call it in case it ever does anything. - super().error(input_msg, ex) - - if not isinstance(ex, CCXMessagingError): - ex = CCXMessagingError(ex) - - LOG.error(ex.format(input_msg)) diff --git a/ccx_messaging/utils/kafka_config.py b/ccx_messaging/utils/kafka_config.py index 3bfd034..27abb6b 100644 --- a/ccx_messaging/utils/kafka_config.py +++ b/ccx_messaging/utils/kafka_config.py @@ -1,14 +1,5 @@ """Functions to adapt kafka-python configuration parameters.""" -from kafka import KafkaProducer - - -def producer_config(config): - """Clean up the provided configuration in order to be used by a Kafka producer.""" - producer_allowed_arguments = list(KafkaProducer.DEFAULT_CONFIG.keys()) - return {key: value for key, value in config.items() if key in producer_allowed_arguments} - - def translate_kafka_configuration(config: dict) -> dict: """Transform a dict with default Kafka configuration to kafka-python configuration.""" lib_config = {} diff --git a/ccx_messaging/watchers/payload_tracker_watcher.py b/ccx_messaging/watchers/payload_tracker_watcher.py index 5dfa52f..710e4bf 100644 --- a/ccx_messaging/watchers/payload_tracker_watcher.py +++ b/ccx_messaging/watchers/payload_tracker_watcher.py @@ -39,7 +39,7 @@ def __init__(self, topic, service_name="ccx-data-pipeline", kafka_broker_config= if kafka_broker_config: kwargs.update(kafka_broker_config) - + kwargs = kafka_producer_config_cleanup(kwargs) LOG.debug( diff --git a/requirements.txt b/requirements.txt index 2460836..9921194 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,6 @@ boto3==1.14.27 insights-core==3.0.277 git+https://github.com/RedHatInsights/insights-core-messaging@1.2.2 jsonschema==3.2.0 -kafka-python==2.0.1 python-json-logger==0.1.11 requests==2.24.0 sentry-sdk==0.19.5 diff --git a/setup.cfg b/setup.cfg index e772a71..0d81e95 100644 --- a/setup.cfg +++ b/setup.cfg @@ -22,7 +22,6 @@ install_requires = insights-core>=3.1.2 insights-core-messaging>=1.2.1 jsonschema>=4.0.0 - kafka-python>=2.0.2 prometheus-client>=0.16.0 PyYAML>=6.0 requests diff --git a/test/consumers/consumer_test.py b/test/consumers/consumer_test.py deleted file mode 100644 index af4906d..0000000 --- a/test/consumers/consumer_test.py +++ /dev/null @@ -1,568 +0,0 @@ -# Copyright 2021, 2022 Red Hat, Inc -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Module containing unit tests for the `Consumer` class.""" - -import logging -import io -import time - -from unittest.mock import patch - -import pytest - -from kafka import KafkaConsumer, KafkaProducer -from kafka.consumer.fetcher import ConsumerRecord - -from ccx_messaging.consumers.consumer import Consumer -from ccx_messaging.consumers.consumer import AnemicConsumer -from ccx_messaging.error import CCXMessagingError - -from test.test_utils import ( - mock_consumer_record, - mock_consumer_process_no_action_catch_exception, -) - - -@pytest.fixture(autouse=True) -def mock_consumer(monkeypatch): - """Mock KafkaConsumer.""" - monkeypatch.setattr(KafkaConsumer, "__init__", lambda *args, **kargs: None) - - -@pytest.fixture(autouse=True) -def mock_producer(monkeypatch): - """Mock KafkaProducer.""" - monkeypatch.setattr(KafkaProducer, "__init__", lambda *args, **kargs: None) - - -@pytest.fixture(autouse=True) -def mock_producer_send(monkeypatch): - """Mock KafkaProducer send method.""" - monkeypatch.setattr(KafkaProducer, "send", lambda *args, **kargs: None) - - -_REGEX_BAD_SCHEMA = r"^Unable to extract URL from input message: " -_INVALID_TYPE_VALUES = [None, 42, 3.14, True, [], {}] - - -@pytest.mark.parametrize("value", _INVALID_TYPE_VALUES) -def test_deserialize_invalid_type(value): - """Test that passing invalid data type to `deserialize` raises an exception.""" - deserialized = Consumer.deserialize(Consumer(None, None, None, None), value) - assert isinstance(deserialized, CCXMessagingError) - assert str(deserialized).startswith("Unexpected input message type: ") - - -_ERR_UNABLE_TO_DECODE = "Unable to decode received message: " -_ERR_JSON_SCHEMA = "Invalid input message JSON schema: " -_ERR_BASE64 = "Base64 encoded identity could not be parsed: " - -_INVALID_MESSAGES = [ - ("", _ERR_UNABLE_TO_DECODE), - ("{}", _ERR_JSON_SCHEMA), - ('{"noturl":"https://s3.com/hash"}', _ERR_JSON_SCHEMA), - ('{"url":"value"', _ERR_UNABLE_TO_DECODE), - ('"url":"value"}', _ERR_UNABLE_TO_DECODE), - ('"url":"value"', _ERR_UNABLE_TO_DECODE), - ('"{"url":"value"}"', _ERR_UNABLE_TO_DECODE), - # incorrect content of b64_identity (org_id missing) - ( - '{"url": "https://s3.com/hash", "b64_identity": "eyJpZGVudGl0eSI6IHsiaW50ZXJuYWwiOiB7Im1pc' - '3Npbmdfb3JnX2lkIjogIjEyMzQ1Njc4In19fQo=", "timestamp": "2020-01-23T16:15:59.478901889Z"}', - _ERR_JSON_SCHEMA, - ), - # incorrect format of base64 encoding - ( - '{"url": "https://s3.com/hash", "b64_identity": "eyJpZGVudGl0eSI6IHsiaW50ZXJuYWwiOiB7Im9yZ1' - '9pZCI6ICIxMjM0NTY3OCJ9Cg=", "timestamp": "2020-01-23T16:15:59.478901889Z"}', - _ERR_BASE64, - ), - # org_id not string - ( - '{"url": "https://s3.com/hash", "b64_identity": "eyJpZGVudGl0eSI6IHsKICAgICJhY2NvdW50X251bW' - "JlciI6ICI5ODc2NTQzIiwKICAgICJhdXRoX3R5cGUiOiAiYmFzaWMtYXV0aCIsCiAgICAiaW50ZXJuYWwiOiB7CiAg" - "ICAgICAgImF1dGhfdGltZSI6IDE0MDAsCiAgICAgICAgIm9yZ19pZCI6IDEyMzQ1Njc4CiAgICB9LAogICAgInR5cG" - "UiOiAiVXNlciIsCiAgICAidXNlciI6IHsKICAgICAgICAiZW1haWwiOiAiam9obi5kb2VAcmVkaGF0LmNvbSIsCiAg" - "ICAgICAgImZpcnN0X25hbWUiOiAiSW5zaWdodHMiLAogICAgICAgICJpc19hY3RpdmUiOiB0cnVlLAogICAgICAgIC" - "Jpc19pbnRlcm5hbCI6IGZhbHNlLAogICAgICAgICJpc19vcmdfYWRtaW4iOiB0cnVlLAogICAgICAgICJsYXN0X25h" - "bWUiOiAiUUUiLAogICAgICAgICJsb2NhbGUiOiAiZW5fVVMiLAogICAgICAgICJ1c2VybmFtZSI6ICJpbnNpZ2h0cy" - '1tYXN0ZXIiCiAgICB9Cn0KfQo=", "timestamp": "2020-01-23T16:15:59.478901889Z"}', - _ERR_JSON_SCHEMA, - ), -] - - -@pytest.mark.parametrize("msg", _INVALID_MESSAGES) -def test_deserialize_invalid_format_str(msg): - """Test that passing a malformed message to `deserialize` raises an exception.""" - deserialized = Consumer.deserialize(Consumer(None, None, None, None), msg[0]) - assert isinstance(deserialized, CCXMessagingError) - assert str(deserialized).startswith(msg[1]) - - -@pytest.mark.parametrize("msg", _INVALID_MESSAGES) -def test_deserialize_invalid_format_bytes(msg): - """Test that passing a malformed message to `deserialize` raises an exception.""" - deserialized = Consumer.deserialize(Consumer(None, None, None, None), msg[0].encode("utf-8")) - assert isinstance(deserialized, CCXMessagingError) - assert str(deserialized).startswith(msg[1]) - - -@pytest.mark.parametrize("msg", _INVALID_MESSAGES) -def test_deserialize_invalid_format_bytearray(msg): - """Test that passing a malformed message to `deserialize` raises an exception.""" - deserialized = Consumer.deserialize( - Consumer(None, None, None, None), bytearray(msg[0].encode("utf-8")) - ) - assert isinstance(deserialized, CCXMessagingError) - assert str(deserialized).startswith(msg[1]) - - -_VALID_MESSAGES = [ - ( - '{"url": "",' - '"b64_identity": "eyJpZGVudGl0eSI6IHsiaW50ZXJuYWwiOiB7Im9yZ19pZCI6ICIxMjM0NTY3OCJ9fX0K",' - '"timestamp": "2020-01-23T16:15:59.478901889Z"}', - { - "url": "", - "identity": {"identity": {"internal": {"org_id": "12345678"}}}, - "timestamp": "2020-01-23T16:15:59.478901889Z", - "ClusterName": None, - }, - ), - ( - '{"url": "https://s3.com/hash", "unused-property": null, ' - '"b64_identity": "eyJpZGVudGl0eSI6IHsiaW50ZXJuYWwiOiB7Im9yZ19pZCI6ICIxMjM0NTY3OCJ9fX0K",' - '"timestamp": "2020-01-23T16:15:59.478901889Z"}', - { - "url": "https://s3.com/hash", - "unused-property": None, - "identity": {"identity": {"internal": {"org_id": "12345678"}}}, - "timestamp": "2020-01-23T16:15:59.478901889Z", - "ClusterName": None, - }, - ), - ( - '{"account":12345678, "url":"any/url", ' - '"b64_identity": "eyJpZGVudGl0eSI6IHsiaW50ZXJuYWwiOiB7Im9yZ19pZCI6ICIxMjM0NTY3OCJ9fX0K",' - '"timestamp": "2020-01-23T16:15:59.478901889Z"}', - { - "account": 12345678, - "url": "any/url", - "identity": {"identity": {"internal": {"org_id": "12345678"}}}, - "timestamp": "2020-01-23T16:15:59.478901889Z", - "ClusterName": None, - }, - ), -] - - -@pytest.mark.parametrize("msg,value", _VALID_MESSAGES) -def test_deserialize_valid_str(msg, value): - """Test that proper string JSON input messages are correctly deserialized.""" - assert Consumer.deserialize(Consumer(None, None, None, None), msg) == value - - -@pytest.mark.parametrize("msg,value", _VALID_MESSAGES) -def test_deserialize_valid_bytes(msg, value): - """Test that proper bytes JSON input messages are correctly deserialized.""" - retval = Consumer.deserialize(Consumer(None, None, None, None), msg.encode("utf-8")) - assert retval == value - - -@pytest.mark.parametrize("msg,value", _VALID_MESSAGES) -def test_deserialize_valid_bytearray(msg, value): - """Test that proper bytearray JSON input messages are correctly deserialized.""" - retval = Consumer.deserialize(Consumer(None, None, None, None), bytearray(msg.encode("utf-8"))) - assert retval == value - - -# This would have been a valid input, but it's supposed to be a `dict`, not `str`. -_DICT_STR = '{"url": "bucket/file"}' - -_INVALID_RECORD_VALUES = [ - "", - _DICT_STR, - _DICT_STR.encode("utf-8"), - bytearray(_DICT_STR.encode("utf-8")), - [], - {}, - {"noturl": "bucket/file"}, -] - - -@pytest.mark.parametrize("value", _INVALID_RECORD_VALUES) -@patch( - "ccx_messaging.consumers.consumer.KafkaConsumer.__init__", - lambda *args, **kwargs: None, -) -def test_handles_invalid(value): - """Test that `Consumer` refuses to handle malformed input messages.""" - consumer = Consumer(None, None, None, None) - assert not consumer.handles(mock_consumer_record(value)) - - -_VALID_RECORD_VALUES = [ - {"url": ""}, - {"url": "bucket/file"}, - {"url": "https://a-valid-domain.com/precious_url"}, -] - - -@pytest.mark.parametrize("value", _VALID_RECORD_VALUES) -@patch("insights_messaging.consumers.Consumer.__init__", lambda *a, **k: None) -def test_handles_valid(value): - """Test that `Consumer` accepts handling of correctly formatted input messages.""" - sut = Consumer(None, None, None, None) - assert sut.handles(mock_consumer_record(value)) - - -@pytest.mark.parametrize("value", _INVALID_RECORD_VALUES) -def test_get_url_invalid(value): - """Test that `Consumer.get_url` raises the appropriate exception.""" - with pytest.raises(CCXMessagingError, match=_REGEX_BAD_SCHEMA): - Consumer.get_url(None, value) - - -@pytest.mark.parametrize("value", _VALID_RECORD_VALUES) -def test_get_url_valid(value): - """Test that `Consumer.get_url` returns the expected value.""" - assert Consumer.get_url(None, mock_consumer_record(value)) == value["url"] - - -_VALID_TOPICS = ["topic", "funny-topic"] - -_VALID_GROUPS = ["group", "good-boys"] - -_VALID_SERVERS = ["server", "great.server.net"] - - -@pytest.mark.parametrize("topic", _VALID_TOPICS) -@pytest.mark.parametrize("group", _VALID_GROUPS) -@pytest.mark.parametrize("server", _VALID_SERVERS) -def test_consumer_init_direct(topic, group, server): - """Test of our Consumer constructor, using direct configuration options.""" - with patch("insights_messaging.consumers.Consumer.__init__") as mock_consumer_init: - with patch("os.environ", new=dict()): - Consumer(None, None, None, topic, group_id=group, bootstrap_servers=[server]) - - mock_consumer_init.assert_called_with(None, None, None, requeuer=None) - - -MAX_ELAPSED_TIME_BETWEEN_MESSAGES_TEST = 2 - - -@patch("insights_messaging.consumers.Consumer.__init__", lambda *a, **k: None) -@patch( - "ccx_messaging.consumers.consumer.MAX_ELAPSED_TIME_BETWEEN_MESSAGES", - MAX_ELAPSED_TIME_BETWEEN_MESSAGES_TEST, -) -def test_elapsed_time_thread_no_warning_when_message_received(): - """ - Test elapsed time thread if new message received on time. - - Test that no warnings are sent if a new message is received before - the defined MAX_ELAPSED_TIME_BETWEEN_MESSAGES. - """ - buf = io.StringIO() - log_handler = logging.StreamHandler(buf) - - logger = logging.getLogger() - logger.level = logging.DEBUG - logger.addHandler(log_handler) - - with patch("ccx_messaging.consumers.consumer.LOG", logger): - sut = Consumer(None, None, None, None) - assert sut.check_elapsed_time_thread - buf.truncate(0) # Empty buffer to make sure this test does what it should do - sut.last_received_message_time = time.time() - assert "No new messages in the queue since " not in buf.getvalue() - time.sleep(MAX_ELAPSED_TIME_BETWEEN_MESSAGES_TEST - 1) - sut.last_received_message_time = time.time() - assert "No new messages in the queue since " not in buf.getvalue() - - logger.removeHandler(log_handler) - - -@patch("insights_messaging.consumers.Consumer.__init__", lambda *a, **k: None) -@patch( - "ccx_messaging.consumers.consumer.MAX_ELAPSED_TIME_BETWEEN_MESSAGES", - MAX_ELAPSED_TIME_BETWEEN_MESSAGES_TEST, -) -def test_elapsed_time_thread_warning_when_no_message_received(): - """ - Test elapsed time thread if no new message received on time. - - Test that warnings are sent if no new messages are received before - the defined MAX_ELAPSED_TIME_BETWEEN_MESSAGES. - """ - buf = io.StringIO() - log_handler = logging.StreamHandler(buf) - - logger = logging.getLogger() - logger.level = logging.DEBUG - logger.addHandler(log_handler) - - with patch("ccx_messaging.consumers.consumer.LOG", logger): - sut = Consumer(None, None, None, "group", "topic", ["server"]) - assert sut.check_elapsed_time_thread - alert_time = time.strftime( - "%Y-%m-%d- %H:%M:%S", time.gmtime(sut.last_received_message_time) - ) - alert_message = "No new messages in the queue since " + alert_time - # Make sure the thread woke up at least once - time.sleep(2 * MAX_ELAPSED_TIME_BETWEEN_MESSAGES_TEST) - assert alert_message in buf.getvalue() - - logger.removeHandler(log_handler) - - -@patch("ccx_messaging.consumers.consumer.Consumer.handles", lambda *a, **k: True) -@patch("ccx_messaging.consumers.consumer.Consumer.fire", lambda *a, **k: None) -@patch( - "ccx_messaging.consumers.consumer.Consumer.get_stringfied_record", - lambda *a, **k: None, -) -def test_process_message_timeout_no_kafka_requeuer(): - """Test timeout mechanism that wraps the process function.""" - process_message_timeout = 2 - process_message_timeout_elapsed = 3 - process_message_timeout_not_elapsed = 1 - consumer_messages_to_process = _VALID_MESSAGES[0] - expected_alert_message = "Couldn't process message in the given time frame." - - buf = io.StringIO() - log_handler = logging.StreamHandler(buf) - - logger = logging.getLogger() - logger.level = logging.DEBUG - logger.addHandler(log_handler) - - with patch("ccx_messaging.consumers.consumer.LOG", logger): - sut = Consumer(None, None, None, None) - sut.consumer = consumer_messages_to_process - assert sut.processing_timeout == 0 # Should be 0 if not changed in config file - - with patch( - "ccx_messaging.consumers.consumer.Consumer.process", - lambda *a, **k: mock_consumer_process_no_action_catch_exception(0), - ): - sut.run() - assert expected_alert_message not in buf.getvalue() - - sut.processing_timeout = process_message_timeout - - with patch( - "ccx_messaging.consumers.consumer.Consumer.process", - lambda *a, **k: mock_consumer_process_no_action_catch_exception( - process_message_timeout_not_elapsed - ), - ): - sut.run() - assert expected_alert_message not in buf.getvalue() - - with patch( - "ccx_messaging.consumers.consumer.Consumer.process", - lambda *a, **k: mock_consumer_process_no_action_catch_exception( - process_message_timeout_elapsed - ), - ): - sut.run() - assert expected_alert_message in buf.getvalue() - - logger.removeHandler(log_handler) - - -_VALID_SERVICES = [("test_service")] - -_VALID_MESSAGES_WITH_UNEXPECTED_SERVICE_HEADER = [ - ConsumerRecord( - topic="platform.upload.announce", - partition=0, - offset=24, - timestamp=1661327909633, - timestamp_type=0, - key=None, - value={ - "account": "0369233", - "category": "archive", - "service": "test_service", - "timestamp": "2022-08-24T07:58:29.6326987Z", - }, - headers=[("service", b"some_unexpected_service")], - checksum=1234, - serialized_key_size=12, - serialized_value_size=1, - serialized_header_size=1, - ) -] - -_VALID_MESSAGES_WITH_EXPECTED_SERVICE_HEADER = [ - ConsumerRecord( - topic="platform.upload.announce", - partition=0, - offset=24, - timestamp=1661327909633, - timestamp_type=0, - key=None, - value={ - "account": "0369233", - "category": "archive", - "service": "test_service", - "timestamp": "2022-08-24T07:58:29.6326987Z", - }, - headers=[("service", b"test_service")], - checksum=1234, - serialized_key_size=12, - serialized_value_size=1, - serialized_header_size=1, - ) -] - -_VALID_MESSAGES_WITH_NO_SERVICE_HEADER = [ - ConsumerRecord( - topic="platform.upload.announce", - partition=0, - offset=24, - timestamp=1661327909633, - timestamp_type=0, - key=None, - value={ - "account": "0369233", - "category": "archive", - "service": "test_service", - "timestamp": "2022-08-24T07:58:29.6326987Z", - }, - headers=[("some_header", "some_value")], - checksum=1234, - serialized_key_size=12, - serialized_value_size=1, - serialized_header_size=1, - ) -] - - -@pytest.mark.parametrize("topic", _VALID_TOPICS) -@pytest.mark.parametrize("group", _VALID_GROUPS) -@pytest.mark.parametrize("server", _VALID_SERVERS) -@pytest.mark.parametrize("service", _VALID_SERVICES) -def test_init_anemic_consumer(topic, group, server, service): - """Test `AnemicConsumer` initialization.""" - sut = AnemicConsumer( - None, - None, - None, - topic, - group_id=group, - platform_service=service, - bootstrap_servers=[server], - ) - assert sut.platform_service == service.encode("utf-8") - assert isinstance(sut.consumer, KafkaConsumer) - assert sut.check_elapsed_time_thread.is_alive() - - -def test_anemic_consumer_deserialize(): - """Test deserialize method of `AnemicConsumer`.""" - consumer_message = _VALID_MESSAGES[0] - buf = io.StringIO() - log_handler = logging.StreamHandler(buf) - logger = logging.getLogger() - logger.level = logging.DEBUG - logger.addHandler(log_handler) - - with patch("ccx_messaging.consumers.consumer.LOG", logger): - sut = AnemicConsumer(None, None, None, None, platform_service="any") - deserialized = sut.deserialize(consumer_message[0]) - assert isinstance(deserialized, dict) - assert deserialized.get("url") == "" - assert ( - deserialized.get("b64_identity") - == "eyJpZGVudGl0eSI6IHsiaW50ZXJuYWwiOiB7Im9yZ19pZCI6ICIxMjM0NTY3OCJ9fX0K" - ) - assert deserialized.get("timestamp") == "2020-01-23T16:15:59.478901889Z" - - logger.removeHandler(log_handler) - - -@patch("ccx_messaging.consumers.consumer.Consumer.handles", lambda *a, **k: True) -@patch("ccx_messaging.consumers.consumer.Consumer.fire", lambda *a, **k: None) -@patch( - "ccx_messaging.consumers.consumer.Consumer.get_stringfied_record", - lambda *a, **k: None, -) -@pytest.mark.parametrize("service", _VALID_SERVICES) -def test_anemic_consumer_run_no_service_in_header(service): - """Test run method of `AnemicConsumer` with no service in received message's header.""" - buf = io.StringIO() - log_handler = logging.StreamHandler(buf) - logger = logging.getLogger() - logger.level = logging.DEBUG - logger.addHandler(log_handler) - - with patch("ccx_messaging.consumers.consumer.LOG", logger): - sut = AnemicConsumer(None, None, None, None, platform_service=service) - sut.consumer = _VALID_MESSAGES_WITH_NO_SERVICE_HEADER - sut.run() - assert AnemicConsumer.NO_SERVICE_DEBUG_MESSAGE in buf.getvalue() - - logger.removeHandler(log_handler) - - -@pytest.mark.parametrize("service", _VALID_SERVICES) -def test_anemic_consumer_run_unexpected_service(service): - """Test run method of `AnemicConsumer` with unexpected service in received message's header.""" - buf = io.StringIO() - log_handler = logging.StreamHandler(buf) - logger = logging.getLogger() - logger.level = logging.DEBUG - logger.addHandler(log_handler) - - with patch("ccx_messaging.consumers.consumer.LOG", logger): - sut = AnemicConsumer(None, None, None, None, platform_service=service) - sut.consumer = _VALID_MESSAGES_WITH_UNEXPECTED_SERVICE_HEADER - sut.run() - assert ( - AnemicConsumer.OTHER_SERVICE_DEBUG_MESSAGE.format(b"some_unexpected_service") - in buf.getvalue() - ) - - logger.removeHandler(log_handler) - - -@patch("ccx_messaging.consumers.consumer.Consumer.handles", lambda *a, **k: True) -@patch("ccx_messaging.consumers.consumer.Consumer.fire", lambda *a, **k: None) -@patch( - "ccx_messaging.consumers.consumer.Consumer.get_stringfied_record", - lambda *a, **k: None, -) -@pytest.mark.parametrize("service", _VALID_SERVICES) -def test_anemic_consumer_run_expected_service(service): - """Test run method of `AnemicConsumer` with expected service in received message's header.""" - buf = io.StringIO() - log_handler = logging.StreamHandler(buf) - logger = logging.getLogger() - logger.level = logging.DEBUG - logger.addHandler(log_handler) - - with patch("ccx_messaging.consumers.consumer.LOG", logger): - sut = AnemicConsumer(None, None, None, None, platform_service=service) - sut.consumer = _VALID_MESSAGES_WITH_EXPECTED_SERVICE_HEADER - sut.run() - logs = buf.getvalue() - assert AnemicConsumer.OTHER_SERVICE_DEBUG_MESSAGE not in logs - assert AnemicConsumer.NO_SERVICE_DEBUG_MESSAGE not in logs - assert AnemicConsumer.EXPECTED_SERVICE_DEBUG_MESSAGE in logs - logger.removeHandler(log_handler) diff --git a/test/consumers/kafka_consumer_test.py b/test/consumers/kafka_consumer_test.py index 0c12bfa..ec641b3 100644 --- a/test/consumers/kafka_consumer_test.py +++ b/test/consumers/kafka_consumer_test.py @@ -17,9 +17,10 @@ import logging import io import time -from unittest.mock import MagicMock, patch +from unittest.mock import call,MagicMock, patch import pytest +from confluent_kafka import KafkaException, TIMESTAMP_NOT_AVAILABLE from ccx_messaging.consumers.kafka_consumer import KafkaConsumer from ccx_messaging.error import CCXMessagingError @@ -28,10 +29,11 @@ class KafkaMessage: """Test double for the confluent_kafka.Message class.""" - def __init__(self, msg, headers=None): + def __init__(self, msg, headers=None, timestamp=None): """Initialize a KafkaMessage test double.""" self.msg = msg self._headers = headers + self._timestamp = timestamp self.topic = lambda: "topic" self.partition = lambda: 0 self.offset = lambda: 0 @@ -39,6 +41,13 @@ def __init__(self, msg, headers=None): self.error = lambda: None self.headers = lambda: self._headers + def timestamp(self): + if self._timestamp is None: + return TIMESTAMP_NOT_AVAILABLE, None + + else: + return None, self._timestamp + # _REGEX_BAD_SCHEMA = r"^Unable to extract URL from input message: " _INVALID_TYPE_VALUES = [ @@ -136,24 +145,24 @@ def test_deserialize_invalid_format_str(mock_consumer, msg): @pytest.mark.parametrize("msg,value", _VALID_MESSAGES) -@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer") -def test_deserialize_valid_str(consumer_mock, msg, value): +@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer", lambda *a, **k: MagicMock()) +def test_deserialize_valid_str(msg, value): """Test that proper string JSON input messages are correctly deserialized.""" sut = KafkaConsumer(None, None, None, None) assert sut.deserialize(KafkaMessage(msg)) == value @pytest.mark.parametrize("msg,value", _VALID_MESSAGES) -@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer") -def test_deserialize_valid_bytes(consumer_mock, msg, value): +@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer", lambda *a, **k: MagicMock()) +def test_deserialize_valid_bytes(msg, value): """Test that proper string JSON input messages are correctly deserialized.""" sut = KafkaConsumer(None, None, None, None) assert sut.deserialize(KafkaMessage(msg.encode())) == value @pytest.mark.parametrize("msg,value", _VALID_MESSAGES) -@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer") -def test_deserialize_valid_bytearray(consumer_mock, msg, value): +@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer", lambda *a, **k: MagicMock()) +def test_deserialize_valid_bytearray(msg, value): """Test that proper string JSON input messages are correctly deserialized.""" sut = KafkaConsumer(None, None, None, None) assert sut.deserialize(KafkaMessage(bytearray(msg.encode()))) == value @@ -171,23 +180,23 @@ def test_deserialize_valid_bytearray(consumer_mock, msg, value): @pytest.mark.parametrize("headers", _NO_HANDLE_HEADERS) -@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer") -def test_platform_filter(consumer_mock, headers): +@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer", lambda *a, **k: MagicMock()) +def test_platform_filter(headers): """Test that filter by platform behaves as expected.""" sut = KafkaConsumer(None, None, None, None, platform_service=PLATFORM_SERVICE) assert not sut.handles(KafkaMessage("{}", headers)) @pytest.mark.parametrize("headers", _NO_HANDLE_HEADERS + [_HANDLE_HEADER]) -@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer") -def test_platform_filter_no_platform_service(consumer_mock, headers): +@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer", lambda *a, **k: MagicMock()) +def test_platform_filter_no_platform_service(headers): """Test that filter by platform behaves as expected.""" sut = KafkaConsumer(None, None, None, None) assert sut.handles(KafkaMessage("{}", headers)) -@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer") -def test_platform_filter_platform_service(consumer_mock): +@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer", lambda *a, **k: MagicMock()) +def test_platform_filter_platform_service(): """Test that filter by platform behaves as expected.""" sut = KafkaConsumer(None, None, None, None, platform_service=PLATFORM_SERVICE) assert sut.handles(KafkaMessage("{}", _HANDLE_HEADER)) @@ -253,12 +262,12 @@ def test_consumer_init_direct(topic, group, server): MAX_ELAPSED_TIME_BETWEEN_MESSAGES_TEST = 2 -@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer") +@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer", lambda *a, **k: MagicMock()) @patch( - "ccx_messaging.consumers.consumer.MAX_ELAPSED_TIME_BETWEEN_MESSAGES", + "ccx_messaging.consumers.kafka_consumer.MAX_ELAPSED_TIME_BETWEEN_MESSAGES", MAX_ELAPSED_TIME_BETWEEN_MESSAGES_TEST, ) -def test_elapsed_time_thread_no_warning_when_message_received(consumer_mock): +def test_elapsed_time_thread_no_warning_when_message_received(): """ Test elapsed time thread if new message received on time. @@ -285,12 +294,12 @@ def test_elapsed_time_thread_no_warning_when_message_received(consumer_mock): logger.removeHandler(log_handler) -@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer") +@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer", lambda *a, **k: MagicMock()) @patch( "ccx_messaging.consumers.kafka_consumer.MAX_ELAPSED_TIME_BETWEEN_MESSAGES", MAX_ELAPSED_TIME_BETWEEN_MESSAGES_TEST, ) -def test_elapsed_time_thread_warning_when_no_message_received(consumer_mock): +def test_elapsed_time_thread_warning_when_no_message_received(): """ Test elapsed time thread if no new message received on time. @@ -304,7 +313,7 @@ def test_elapsed_time_thread_warning_when_no_message_received(consumer_mock): logger.level = logging.DEBUG logger.addHandler(log_handler) - with patch("ccx_messaging.consumers.consumer.LOG", logger): + with patch("ccx_messaging.consumers.kafka_consumer.LOG", logger): sut = KafkaConsumer( None, None, None, "topic", group_id="group", bootstrap_servers=["server"] ) @@ -320,6 +329,47 @@ def test_elapsed_time_thread_warning_when_no_message_received(consumer_mock): logger.removeHandler(log_handler) +@patch( + "ccx_messaging.consumers.kafka_consumer.ConfluentConsumer", lambda *args, **kwargs: MagicMock() +) +def test_handles_old_message(): + """Check that too old message is discarded.""" + msg = KafkaMessage(b"", None, 0) + sut = KafkaConsumer( + None, + None, + None, + "topic", + max_record_age=100, + group_id="group", + bootstrap_servers=["server"], + ) + + assert not sut.handles(msg) + + msg = KafkaMessage(b"", None, time.time() * 1000) # using ms + assert sut.handles(msg) + + +@patch( + "ccx_messaging.consumers.kafka_consumer.ConfluentConsumer", lambda *args, **kwargs: MagicMock() +) +def test_handles_disable_age_message_filtering(): + """Check that too old message is discarded.""" + msg = KafkaMessage(b"", None, 0) + sut = KafkaConsumer( + None, + None, + None, + "topic", + max_record_age=-1, + group_id="group", + bootstrap_servers=["server"], + ) + + assert sut.handles(msg) + + @patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer", lambda *a, **k: MagicMock()) def test_process_msg_not_handled(): """Test the `process_msg` method in the `KafkaConsumer` class. @@ -424,3 +474,43 @@ def test_process_dead_letter_message(producer_init_mock, value, expected): sut.process_dead_letter(message_mock) producer_mock.send.assert_called_with(dlq_topic_name, value) + + +@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer") +def test_run_success(consumer_init_mock): + """Check the run method process the message as many times as needed.""" + consumer_mock = MagicMock() + consumer_init_mock.return_value = consumer_mock + + consumer_mock.consume.side_effect = [ + [KafkaMessage(b"0")], + [KafkaMessage(b"1"), KafkaMessage(b"2")], + KeyboardInterrupt(), + ] + + sut = KafkaConsumer(None, None, None, "topic") + sut.process_msg = MagicMock() + sut.run() + + assert consumer_mock.consume.call_count == 3 + assert consumer_mock.close.call_count == 1 + assert sut.process_msg.call_count == 3 + + +@patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer") +def test_run_fail(consumer_init_mock): + """Check the run method process the message as many times as needed.""" + consumer_mock = MagicMock() + consumer_init_mock.return_value = consumer_mock + + consumer_mock.consume.side_effect = [ + KafkaException(), + ] + + sut = KafkaConsumer(None, None, None, "topic") + sut.process_msg = MagicMock() + sut.run() + + assert consumer_mock.consume.call_count == 1 + assert consumer_mock.close.call_count == 1 + assert not sut.process_msg.called diff --git a/test/publishers/data_pipeline_publisher_test.py b/test/publishers/data_pipeline_publisher_test.py deleted file mode 100644 index b9b57ba..0000000 --- a/test/publishers/data_pipeline_publisher_test.py +++ /dev/null @@ -1,278 +0,0 @@ -# Copyright 2022 Red Hat, Inc -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Module for testing the ccx_data_pipeline.kafka_publisher module.""" - -import unittest -from unittest.mock import MagicMock, patch - -from ccx_messaging.publishers.data_pipeline_publisher import DataPipelinePublisher -from ccx_messaging.error import CCXMessagingError - - -input_msg = { - "topic": "topic name", - "partition": "partition name", - "offset": 1234, - "url": "any/url", - "identity": {"identity": {"internal": {"org_id": "12345678"}}}, - "timestamp": "2020-01-23T16:15:59.478901889Z", - "cluster_name": "clusterName", -} - - -class DataPipelinePublisherTest(unittest.TestCase): - """Test cases for testing the class DataPipelinePublisher.""" - - def test_init(self): - """ - Test DataPipelinePublisher initializer. - - The test mocks the KafkaProducer from kafka module in order - to avoid real usage of the library - """ - producer_kwargs = { - "bootstrap_servers": ["kafka_server1"], - "outgoing_topic": "a topic name", - "client_id": "ccx-data-pipeline", - } - - with patch( - "ccx_messaging.publishers.data_pipeline_publisher.KafkaProducer" - ) as kafka_producer_mock: - sut = DataPipelinePublisher(**producer_kwargs) - - kafka_producer_mock.assert_called_with( - bootstrap_servers=["kafka_server1"], client_id="ccx-data-pipeline" - ) - self.assertEqual(sut.topic, "a topic name") - - def test_init_improper_params(self): - """Test DataPipelinePublisher initializer without outgoing topic.""" - producer_kwargs = { - "bootstrap_servers": ["kafka_server1"], - "client_id": "ccx-data-pipeline", - } - - with self.assertRaises(TypeError): - DataPipelinePublisher(**producer_kwargs) - - def test_init_none_topic(self): - """Test DataPipelinePublisher initializer without outgoing topic.""" - producer_kwargs = { - "bootstrap_servers": ["kafka_server1"], - "client_id": "ccx-data-pipeline", - } - - with self.assertRaises(KeyError): - _ = DataPipelinePublisher(outgoing_topic=None, **producer_kwargs) - - def test_init_non_valid_params(self): - """Test DataPipelinePublisher initializer with improper Kafka config.""" - producer_kwargs = { - "bootstrap_servers": ["kafka_server1"], - "client_id": "ccx-data-pipeline", - "client.id": "ccx-data-pipeline", - } - - with patch( - "ccx_messaging.publishers.data_pipeline_publisher.KafkaProducer" - ) as kafka_producer_mock: - DataPipelinePublisher(outgoing_topic="topic", **producer_kwargs) - kafka_producer_mock.assert_called_with( - bootstrap_servers=["kafka_server1"], client_id="ccx-data-pipeline" - ) - - # pylint: disable=no-self-use - def test_publish_no_request_id(self): - """ - Test Producer.publish method without request_id field. - - The kafka.KafkaProducer class is mocked in order to avoid the usage - of the real library - """ - producer_kwargs = { - "bootstrap_servers": ["kafka_server1"], - "client_id": "ccx-data-pipeline", - } - - topic_name = "KAFKATOPIC" - input_msg = { - "cluster_name": "the cluster name", - "identity": {"identity": {"account_number": "3000", "internal": {"org_id": "5000"}}}, - "timestamp": "2020-01-23T16:15:59.478901889Z", - } - message_to_publish = '{"key1": "value1"}' - expected_message = ( - b'{"OrgID": 5000, "AccountNumber": 3000, "ClusterName": "the cluster name", ' - b'"Report": {"key1": "value1"}, "LastChecked": "2020-01-23T16:15:59.478901889Z", ' - b'"Version": 2, "RequestId": null}\n' - ) - - with patch( - "ccx_messaging.publishers.data_pipeline_publisher.KafkaProducer" - ) as kafka_producer_init_mock: - producer_mock = MagicMock() - kafka_producer_init_mock.return_value = producer_mock - - sut = DataPipelinePublisher(outgoing_topic=topic_name, **producer_kwargs) - - sut.publish(input_msg, message_to_publish) - producer_mock.send.assert_called_with(topic_name, expected_message) - - def test_publish(self): - """ - Test Producer.publish method. - - The kafka.KafkaProducer class is mocked in order to avoid the usage - of the real library - """ - producer_kwargs = { - "bootstrap_servers": ["kafka_server1"], - "client_id": "ccx-data-pipeline", - } - - topic_name = "KAFKATOPIC" - input_msg = { - "cluster_name": "the cluster name", - "identity": {"identity": {"account_number": "3000", "internal": {"org_id": "5000"}}}, - "timestamp": "2020-01-23T16:15:59.478901889Z", - "request_id": "REQUEST_ID", - } - message_to_publish = '{"key1": "value1"}' - expected_message = ( - b'{"OrgID": 5000, "AccountNumber": 3000, "ClusterName": "the cluster name", ' - b'"Report": {"key1": "value1"}, "LastChecked": "2020-01-23T16:15:59.478901889Z", ' - b'"Version": 2, "RequestId": "REQUEST_ID"}\n' - ) - - with patch( - "ccx_messaging.publishers.data_pipeline_publisher.KafkaProducer" - ) as kafka_producer_init_mock: - producer_mock = MagicMock() - kafka_producer_init_mock.return_value = producer_mock - - sut = DataPipelinePublisher(outgoing_topic=topic_name, **producer_kwargs) - - sut.publish(input_msg, message_to_publish) - producer_mock.send.assert_called_with(topic_name, expected_message) - - def test_publish_bad_orgID(self): - """ - Test Producer.publish method with invalid orgID. - - The kafka.KafkaProducer class is mocked in order to avoid the usage - of the real library - """ - producer_kwargs = { - "bootstrap_servers": ["kafka_server1"], - "client_id": "ccx-data-pipeline", - } - - topic_name = "KAFKATOPIC" - input_msg = { - "cluster_name": "the cluster name", - "identity": {"identity": {"account_number": "3000", "internal": {"org_id": "NaN"}}}, - "timestamp": "2020-01-23T16:15:59.478901889Z", - } - message_to_publish = '{"key1": "value1"}' - - with patch( - "ccx_messaging.publishers.data_pipeline_publisher.KafkaProducer" - ) as kafka_producer_init_mock: - producer_mock = MagicMock() - kafka_producer_init_mock.return_value = producer_mock - - sut = DataPipelinePublisher(outgoing_topic=topic_name, **producer_kwargs) - - with self.assertRaises(CCXMessagingError): - sut.publish(input_msg, message_to_publish) - - def test_publish_bad_accountNumber(self): - """ - Test Producer.publish method with invalid orgID. - - The kafka.KafkaProducer class is mocked in order to avoid the usage - of the real library - """ - producer_kwargs = { - "bootstrap_servers": ["kafka_server1"], - "client_id": "ccx-data-pipeline", - } - - topic_name = "KAFKATOPIC" - input_msg = { - "cluster_name": "the cluster name", - "identity": {"identity": {"account_number": "NaN", "internal": {"org_id": "5000"}}}, - "timestamp": "2020-01-23T16:15:59.478901889Z", - } - message_to_publish = '{"key1": "value1"}' - - with patch( - "ccx_messaging.publishers.data_pipeline_publisher.KafkaProducer" - ) as kafka_producer_init_mock: - producer_mock = MagicMock() - kafka_producer_init_mock.return_value = producer_mock - - sut = DataPipelinePublisher(outgoing_topic=topic_name, **producer_kwargs) - - with self.assertRaises(CCXMessagingError): - sut.publish(input_msg, message_to_publish) - - def test_error(self): - """Test Producer.error() method.""" - err = CCXMessagingError("foobar") - - producer_kwargs = { - "bootstrap_servers": ["kafka_server1"], - "client_id": "ccx-data-pipeline", - } - - topic_name = "KAFKATOPIC" - - with patch( - "ccx_messaging.publishers.data_pipeline_publisher.KafkaProducer" - ) as kafka_producer_init_mock: - producer_mock = MagicMock() - kafka_producer_init_mock.return_value = producer_mock - - sut = DataPipelinePublisher(outgoing_topic=topic_name, **producer_kwargs) - - err = CCXMessagingError("foobar") - - sut.error(input_msg, err) - - def test_error_wrong_type(self): - """Test Producer.error() method.""" - err = CCXMessagingError("foobar") - - producer_kwargs = { - "bootstrap_servers": ["kafka_server1"], - "client_id": "ccx-data-pipeline", - } - - topic_name = "KAFKATOPIC" - - with patch( - "ccx_messaging.publishers.data_pipeline_publisher.KafkaProducer" - ) as kafka_producer_init_mock: - producer_mock = MagicMock() - kafka_producer_init_mock.return_value = producer_mock - - sut = DataPipelinePublisher(outgoing_topic=topic_name, **producer_kwargs) - - # some error with type different from CCXMessagingError - err = ArithmeticError("foobar") - - sut.error(input_msg, err) diff --git a/test/publishers/sha_publisher_test.py b/test/publishers/sha_publisher_test.py deleted file mode 100644 index beb9828..0000000 --- a/test/publishers/sha_publisher_test.py +++ /dev/null @@ -1,303 +0,0 @@ -# Copyright 2022 Red Hat, Inc -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Module for testing the ccx-messaging.sha_publisher module.""" - -import unittest -from unittest.mock import MagicMock, patch - -from ccx_messaging.publishers.sha_publisher import SHAPublisher -from ccx_messaging.error import CCXMessagingError - -from .unicode_encode_error_thower import UnicodeEncodeErrorThrower - - -input_msg = { - "topic": "topic name", - "partition": "partition name", - "offset": 1234, - "url": "any/url", - "identity": { - "identity": { - "internal": { - "org_id": "12345678", - }, - "account_number": "999999", - }, - }, - "timestamp": "2020-01-23T16:15:59.478901889Z", - "cluster_name": "clusterName", -} - - -class SHAPublisherTest(unittest.TestCase): - """Test cases for testing the class SHAPublisher.""" - - def test_init(self): - """ - Test SHAPublisher initializer. - - The test mocks the KafkaProducer from kafka module in order - to avoid real usage of the library - """ - producer_kwargs = { - "bootstrap_servers": ["kafka_server1"], - "outgoing_topic": "a topic name", - "client_id": "ccx-data-pipeline", - } - - with patch("ccx_messaging.publishers.sha_publisher.KafkaProducer") as kafka_producer_mock: - sut = SHAPublisher(**producer_kwargs) - - kafka_producer_mock.assert_called_with( - bootstrap_servers=["kafka_server1"], client_id="ccx-data-pipeline" - ) - self.assertEqual(sut.topic, "a topic name") - - def test_init_improper_params(self): - """Test SHAPublisher initializer without improper parameters.""" - producer_kwargs = { - "bootstrap_servers": ["kafka_server1"], - "client_id": "ccx-data-pipeline", - } - - with self.assertRaises(TypeError): - _ = SHAPublisher(**producer_kwargs) - - def test_init_none_topic(self): - """Test SHAPublisher initializer without outgoing topic.""" - producer_kwargs = { - "bootstrap_servers": ["kafka_server1"], - "client_id": "ccx-data-pipeline", - } - - with self.assertRaises(KeyError): - _ = SHAPublisher(outgoing_topic=None, **producer_kwargs) - - def test_publish(self): - """ - Test Producer.publish method. - - The kafka.KafkaProducer class is mocked in order to avoid the usage - of the real library - """ - producer_kwargs = { - "bootstrap_servers": ["kafka_server1"], - "client_id": "ccx-data-pipeline", - } - - topic_name = "KAFKATOPIC" - message_to_publish = '{"key1": "value1"}' - expected_message = ( - b'{"OrgID": 12345678, "AccountNumber": 999999, "ClusterName": "clusterName", ' - b'"Images": {"key1": "value1"}, "LastChecked": "2020-01-23T16:15:59.478901889Z", ' - b'"Version": 2, "RequestId": null}\n' - ) - - with patch( - "ccx_messaging.publishers.sha_publisher.KafkaProducer" - ) as kafka_producer_init_mock: - producer_mock = MagicMock() - kafka_producer_init_mock.return_value = producer_mock - - sut = SHAPublisher(outgoing_topic=topic_name, **producer_kwargs) - - sut.publish(input_msg, message_to_publish) - producer_mock.send.assert_called_with(topic_name, expected_message) - - def test_publish_wrong_input_message(self): - """ - Test Producer.publish method. - - The kafka.KafkaProducer class is mocked in order to avoid the usage - of the real library - """ - producer_kwargs = { - "bootstrap_servers": ["kafka_server1"], - "client_id": "ccx-data-pipeline", - } - - topic_name = "KAFKATOPIC" - message_to_publish = "" - - with patch( - "ccx_messaging.publishers.sha_publisher.KafkaProducer" - ) as kafka_producer_init_mock: - producer_mock = MagicMock() - kafka_producer_init_mock.return_value = producer_mock - - sut = SHAPublisher(outgoing_topic=topic_name, **producer_kwargs) - - with self.assertRaises(Exception): - sut.publish(input_msg, message_to_publish) - - def test_publish_wrong_org_id(self): - """ - Test Producer.publish method. - - The kafka.KafkaProducer class is mocked in order to avoid the usage - of the real library - """ - producer_kwargs = { - "bootstrap_servers": ["kafka_server1"], - "client_id": "ccx-data-pipeline", - } - - topic_name = "KAFKATOPIC" - message_to_publish = "" - - input_msg = { - "topic": "topic name", - "partition": "partition name", - "offset": 1234, - "url": "any/url", - "identity": { - "identity": { - "internal": { - "org_id": "*** not an integer ***", - }, - "account_number": "999999", - }, - }, - "timestamp": "2020-01-23T16:15:59.478901889Z", - "cluster_name": "clusterName", - } - - with patch( - "ccx_messaging.publishers.sha_publisher.KafkaProducer" - ) as kafka_producer_init_mock: - producer_mock = MagicMock() - kafka_producer_init_mock.return_value = producer_mock - - sut = SHAPublisher(outgoing_topic=topic_name, **producer_kwargs) - - with self.assertRaises(CCXMessagingError): - sut.publish(input_msg, message_to_publish) - - def test_publish_wrong_account_number(self): - """ - Test Producer.publish method. - - The kafka.KafkaProducer class is mocked in order to avoid the usage - of the real library - """ - producer_kwargs = { - "bootstrap_servers": ["kafka_server1"], - "client_id": "ccx-data-pipeline", - } - - topic_name = "KAFKATOPIC" - message_to_publish = "" - - input_msg = { - "topic": "topic name", - "partition": "partition name", - "offset": 1234, - "url": "any/url", - "identity": { - "identity": { - "internal": { - "org_id": "123456", - }, - "account_number": "*** not an integer ***", - }, - }, - "timestamp": "2020-01-23T16:15:59.478901889Z", - "cluster_name": "clusterName", - } - - with patch( - "ccx_messaging.publishers.sha_publisher.KafkaProducer" - ) as kafka_producer_init_mock: - producer_mock = MagicMock() - kafka_producer_init_mock.return_value = producer_mock - - sut = SHAPublisher(outgoing_topic=topic_name, **producer_kwargs) - - with self.assertRaises(CCXMessagingError): - sut.publish(input_msg, message_to_publish) - - def test_error(self): - """Test Producer.error() method.""" - err = CCXMessagingError("foobar") - - producer_kwargs = { - "bootstrap_servers": ["kafka_server1"], - "client_id": "ccx-data-pipeline", - } - - topic_name = "KAFKATOPIC" - - with patch( - "ccx_messaging.publishers.sha_publisher.KafkaProducer" - ) as kafka_producer_init_mock: - producer_mock = MagicMock() - kafka_producer_init_mock.return_value = producer_mock - - sut = SHAPublisher(outgoing_topic=topic_name, **producer_kwargs) - - err = CCXMessagingError("foobar") - - sut.error(input_msg, err) - - def test_error_wrong_type(self): - """Test Producer.error() method.""" - err = CCXMessagingError("foobar") - - producer_kwargs = { - "bootstrap_servers": ["kafka_server1"], - "client_id": "ccx-data-pipeline", - } - - topic_name = "KAFKATOPIC" - - with patch( - "ccx_messaging.publishers.sha_publisher.KafkaProducer" - ) as kafka_producer_init_mock: - producer_mock = MagicMock() - kafka_producer_init_mock.return_value = producer_mock - - sut = SHAPublisher(outgoing_topic=topic_name, **producer_kwargs) - - # some error with type different from CCXMessagingError - err = ArithmeticError("foobar") - - sut.error(input_msg, err) - - def _test_publish_wrong_message_encoding(self): - """ - Test Producer.publish method when message can't be encoded to UTF-8. - - The kafka.KafkaProducer class is mocked in order to avoid the usage - of the real library - """ - producer_kwargs = { - "bootstrap_servers": ["kafka_server1"], - "client_id": "ccx-data-pipeline", - } - - topic_name = "KAFKATOPIC" - input_msg = "" - message_to_publish = UnicodeEncodeErrorThrower() - - with patch( - "ccx_messaging.publishers.sha_publisher.KafkaProducer" - ) as kafka_producer_init_mock: - producer_mock = MagicMock() - kafka_producer_init_mock.return_value = producer_mock - - sut = SHAPublisher(outgoing_topic=topic_name, **producer_kwargs) - - with self.assertRaises(CCXMessagingError): - sut.publish(input_msg, message_to_publish) diff --git a/test/test_utils.py b/test/test_utils.py index ee26cec..cb42224 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -16,26 +16,6 @@ import time -from kafka.consumer.fetcher import ConsumerRecord - - -def mock_consumer_record(value): - """Construct a value-only `ConsumerRecord`.""" - return ConsumerRecord( - None, - None, - None, - int((time.time() * 1000) - 60), - None, - None, - value, - None, - None, - None, - None, - None, - ) - def mock_consumer_process_no_action_catch_exception(duration_s=0): """ diff --git a/test/utils/kafka_config_test.py b/test/utils/kafka_config_test.py index e4fbb27..272227a 100644 --- a/test/utils/kafka_config_test.py +++ b/test/utils/kafka_config_test.py @@ -14,7 +14,10 @@ """Test for the ccx_messaging.utils.kafka_config module.""" -from ccx_messaging.utils.kafka_config import translate_kafka_configuration, kafka_producer_config_cleanup +from ccx_messaging.utils.kafka_config import ( + translate_kafka_configuration, + kafka_producer_config_cleanup, +) def test_translate_kafka_configuration(): From b59cb63f902bbd1bdeb1832979d08e06ca37cfd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Luis=20Segura=20Lucas?= Date: Tue, 18 Apr 2023 16:22:21 +0200 Subject: [PATCH 2/2] Linter happy again --- ccx_messaging/utils/kafka_config.py | 1 + test/consumers/kafka_consumer_test.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/ccx_messaging/utils/kafka_config.py b/ccx_messaging/utils/kafka_config.py index 27abb6b..a3f7bd5 100644 --- a/ccx_messaging/utils/kafka_config.py +++ b/ccx_messaging/utils/kafka_config.py @@ -1,5 +1,6 @@ """Functions to adapt kafka-python configuration parameters.""" + def translate_kafka_configuration(config: dict) -> dict: """Transform a dict with default Kafka configuration to kafka-python configuration.""" lib_config = {} diff --git a/test/consumers/kafka_consumer_test.py b/test/consumers/kafka_consumer_test.py index ec641b3..9155af2 100644 --- a/test/consumers/kafka_consumer_test.py +++ b/test/consumers/kafka_consumer_test.py @@ -17,7 +17,7 @@ import logging import io import time -from unittest.mock import call,MagicMock, patch +from unittest.mock import MagicMock, patch import pytest from confluent_kafka import KafkaException, TIMESTAMP_NOT_AVAILABLE @@ -42,6 +42,7 @@ def __init__(self, msg, headers=None, timestamp=None): self.headers = lambda: self._headers def timestamp(self): + """Test double for the Message.timestamp function.""" if self._timestamp is None: return TIMESTAMP_NOT_AVAILABLE, None