Skip to content

Commit

Permalink
Merge pull request #105 from tisnik/style-fixes
Browse files Browse the repository at this point in the history
Style fixes + Ruff settings + Ruff on CI
  • Loading branch information
tisnik authored Nov 10, 2023
2 parents cad78c5 + 342ca89 commit 453b025
Show file tree
Hide file tree
Showing 22 changed files with 56 additions and 40 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/ruff.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
name: Ruff
on: [ push, pull_request ]
jobs:
ruff:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: chartboost/ruff-action@v1
7 changes: 3 additions & 4 deletions ccx_messaging/consumers/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@


class KafkaConsumer(Consumer):

"""Consumer based in Confluent Kafka."""

def __init__(
Expand Down Expand Up @@ -92,8 +93,7 @@ def __init__(
self.dlq_producer = Producer(kafka_producer_config_cleanup(kwargs))

def get_url(self, input_msg: dict) -> str:
"""
Retrieve URL to storage (S3/Minio) from Kafka message.
"""Retrieve URL to storage (S3/Minio) from Kafka message.
Same as previous 2 methods, when we receive and figure out the
message format, we can modify this method
Expand Down Expand Up @@ -235,8 +235,7 @@ def deserialize(self, msg: Message) -> dict:
return deseralized_msg

def check_last_message_received_time(self):
"""
Verify elapsed time between received messages and warn if too long.
"""Verify elapsed time between received messages and warn if too long.
Checks if the last received message was received more than one hour ago
and sends an alert if it is the case
Expand Down
1 change: 1 addition & 0 deletions ccx_messaging/downloaders/http_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def parse_human_input(file_size):

# pylint: disable=too-few-public-methods
class HTTPDownloader:

"""Downloader for HTTP uris."""

# https://<hostname>/service_id/file_id?<credentials and other params>
Expand Down
4 changes: 2 additions & 2 deletions ccx_messaging/engines/sha_extractor_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@


class SHAExtractorEngine(ICMEngine):

"""Engine for extraction of downloading tar archive and selecting a file to be processed."""

def process(self, broker, path):
"""
Retrieve SHA records from a downloaded archive.
"""Retrieve SHA records from a downloaded archive.
The archive is extracted and if the SHA records are found, the JSON is retrieved
from the file and returned as the method output. Otherwise, None is returned.
Expand Down
4 changes: 2 additions & 2 deletions ccx_messaging/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@


class CCXMessagingError(Exception):
"""
Represents a CCX messaging exception.

"""Represents a CCX messaging exception.
This should make it easier to differentiate between
exceptions caused by internal and external code.
Expand Down
7 changes: 4 additions & 3 deletions ccx_messaging/publishers/kafka_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
BEST_COMPRESSION = 9

class KafkaPublisher(Publisher):
"""
KafkaPublisher is a base class for Kafka based publishers.

"""KafkaPublisher is a base class for Kafka based publishers.
It relays on Confluent Kafka library to perform the Kafka related
tasks.
Expand Down Expand Up @@ -70,7 +70,8 @@ def __init__(self, outgoing_topic: str, kafka_broker_config: dict = None, **kwar
def produce(self, outgoing_message: bytes):
"""Send the message though the Kafka producer."""
if self.compression:
self.producer.produce(self.topic, gzip.compress(outgoing_message,compresslevel=BEST_COMPRESSION))
self.producer.produce(self.topic,
gzip.compress(outgoing_message,compresslevel=BEST_COMPRESSION))
else:
self.producer.produce(self.topic, outgoing_message)
self.producer.poll(0)
Expand Down
21 changes: 8 additions & 13 deletions ccx_messaging/publishers/rule_processing_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@


class RuleProcessingPublisher(KafkaPublisher):
"""
RuleProcessingPublisher will handle the results of the applied rules and publish them to Kafka.

"""RuleProcessingPublisher handles the results of the applied rules and publish them to Kafka.
The results of the data analysis are received as a JSON (string)
and turned into a byte array using UTF-8 encoding.
Expand All @@ -45,19 +45,15 @@ def __init__(self, outgoing_topic, kafka_broker_config=None, **kwargs):
self.outdata_schema_version = 2

def validate_timestamp_rfc3339(self, timestamp):
"""
Check if the timestamp matches RFC3339 format.
"""
"""Check if the timestamp matches RFC3339 format."""
try:
datetime.datetime.strptime(timestamp, RFC3339_FORMAT)
except:
except: # noqa E722
return False
return True

def get_gathering_time(self, input_msg):
"""
Retrieve the gathering time from input message if present, otherwise create one.
"""
"""Retrieve the gathering time from input message if present, otherwise create one."""
gathered_at = input_msg.get("metadata", {}) \
.get("custom_metadata", {}) \
.get("gathering_time", None)
Expand All @@ -66,7 +62,7 @@ def get_gathering_time(self, input_msg):
log.debug("Gathering time is not present; creating replacement")
gathered_at = datetime.datetime.now().strftime(RFC3339_FORMAT)

# If the timestamp is not in correct format, try to parse
# If the timestamp is not in correct format, try to parse
# format used in molodec. Otherwise use current timestamp.
if not self.validate_timestamp_rfc3339(gathered_at):
try:
Expand All @@ -79,8 +75,7 @@ def get_gathering_time(self, input_msg):
return gathered_at

def publish(self, input_msg, response):
"""
Publish an EOL-terminated JSON message to the output Kafka topic.
"""Publish an EOL-terminated JSON message to the output Kafka topic.
The response is assumed to be a string representing a valid JSON object.
A newline character will be appended to it, it will be converted into
Expand Down Expand Up @@ -114,7 +109,7 @@ def publish(self, input_msg, response):
"gathering_time": self.get_gathering_time(input_msg)
}
}

message = json.dumps(output_msg) + "\n"

log.debug("Sending response to the %s topic.", self.topic)
Expand Down
4 changes: 2 additions & 2 deletions ccx_messaging/publishers/workloads_info_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@


class WorkloadInfoPublisher(KafkaPublisher):

"""WorkloadInfoPublisher based on Confluent Kafka Producer.
The workload info is received from a custom engine as a JSON string.
Expand All @@ -39,8 +40,7 @@ def __init__(self, outgoing_topic, kafka_broker_config=None, **kwargs):
self.outdata_schema_version = 2

def publish(self, input_msg: dict, response: str):
"""
Publish an EOL-terminated JSON message to the output Kafka topic.
"""Publish an EOL-terminated JSON message to the output Kafka topic.
The input_msg contains content of message read from incoming Kafka
topic. Such message should contains account info, cluster ID etc.
Expand Down
1 change: 1 addition & 0 deletions ccx_messaging/utils/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def setup_watchtower(logging_config=None):


class CloudWatchFormatter(jsonlogger.JsonFormatter):

"""Class that implements formatter for logging to CloudWatch."""

def __init__(self, *args, **kwargs):
Expand Down
3 changes: 2 additions & 1 deletion ccx_messaging/watchers/cluster_id_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@


class ClusterIdWatcher(EngineWatcher, ConsumerWatcher):

"""Mixed `Watcher` that is able to watch both `Consumer` and `Engine`."""

CLUSTER_ID_LENGTH = 36
Expand Down Expand Up @@ -73,7 +74,7 @@ def on_extract(self, ctx, broker, extraction):
"The archive doesn't contain a valid Cluster Id file. Skipping its extraction"
)

except IOError:
except OSError:
self.last_record["cluster_name"] = None
LOG.warning(f"Could not read file: {id_file_path}")

Expand Down
1 change: 1 addition & 0 deletions ccx_messaging/watchers/consumer_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@


class ConsumerWatcher(ICMConsumerWatcher):

"""Extension of the ConsumerWatcher class to allow handling new callbacks."""

def on_not_handled(self, input_msg):
Expand Down
1 change: 1 addition & 0 deletions ccx_messaging/watchers/payload_tracker_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@


class PayloadTrackerWatcher(ConsumerWatcher):

"""`Watcher` implementation to handle Payload Tracker updates."""

def __init__(self, topic, service_name="ccx-data-pipeline", kafka_broker_config=None, **kwargs):
Expand Down
1 change: 1 addition & 0 deletions ccx_messaging/watchers/stats_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

# pylint: disable=too-many-instance-attributes
class StatsWatcher(ConsumerWatcher):

"""A Watcher that stores different Prometheus `Counter`s."""

def __init__(self, prometheus_port=8000):
Expand Down
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,9 @@ requires = ["setuptools", "setuptools_scm", "wheel"]
build-backend = "setuptools.build_meta"

[tool.setuptools_scm]

[tool.ruff]
select = ["E", "F", "W", "UP", "C", "D"]
ignore = ["D211", "C401", "D213", "UP006", "UP007", "UP009", "UP015", "UP035"]

line-length = 100
9 changes: 4 additions & 5 deletions test/consumers/kafka_consumer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@


class KafkaMessage:

"""Test double for the confluent_kafka.Message class."""

def __init__(self, msg, headers=None, timestamp=None):
Expand Down Expand Up @@ -247,7 +248,7 @@ def test_get_url_valid(value):
def test_consumer_init_direct(topic, group, server):
"""Test of our Consumer constructor, using direct configuration options."""
with patch("ccx_messaging.consumers.kafka_consumer.ConfluentConsumer") as mock_consumer_init:
with patch("os.environ", new=dict()):
with patch("os.environ", new={}):
kwargs = {
"group.id": group,
"bootstrap.servers": server,
Expand All @@ -269,8 +270,7 @@ def test_consumer_init_direct(topic, group, server):
MAX_ELAPSED_TIME_BETWEEN_MESSAGES_TEST,
)
def test_elapsed_time_thread_no_warning_when_message_received():
"""
Test elapsed time thread if new message received on time.
"""Test elapsed time thread if new message received on time.
Test that no warnings are sent if a new message is received before
the defined MAX_ELAPSED_TIME_BETWEEN_MESSAGES.
Expand Down Expand Up @@ -301,8 +301,7 @@ def test_elapsed_time_thread_no_warning_when_message_received():
MAX_ELAPSED_TIME_BETWEEN_MESSAGES_TEST,
)
def test_elapsed_time_thread_warning_when_no_message_received():
"""
Test elapsed time thread if no new message received on time.
"""Test elapsed time thread if no new message received on time.
Test that warnings are sent if no new messages are received before
the defined MAX_ELAPSED_TIME_BETWEEN_MESSAGES.
Expand Down
2 changes: 1 addition & 1 deletion test/downloaders/http_downloader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@


_REGEX_BAD_URL_FORMAT = r"^Invalid URL format: .*"
_INVALID_TYPE_URLS = [42, 2.71, True, list(), dict()]
_INVALID_TYPE_URLS = [42, 2.71, True, [], {}]


@pytest.mark.parametrize("url", _INVALID_TYPE_URLS)
Expand Down
2 changes: 1 addition & 1 deletion test/ingress_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def test_parse_indentity_invalid_values(invalid_value):


VALID_VALUES = [
'{"url": "","b64_identity": "eyJpZGVudGl0eSI6IHsiaW50ZXJuYWwiOiB7Im9yZ19pZCI6ICIxMjM0NTY3OCJ9fX0K",'
'{"url": "","b64_identity": "eyJpZGVudGl0eSI6IHsiaW50ZXJuYWwiOiB7Im9yZ19pZCI6ICIxMjM0NTY3OCJ9fX0K",' # noqa E501
'"timestamp": "2020-01-23T16:15:59.478901889Z"}',
'{"url": "https://s3.com/hash", "unused-property": null, '
'"b64_identity": "eyJpZGVudGl0eSI6IHsiaW50ZXJuYWwiOiB7Im9yZ19pZCI6ICIxMjM0NTY3OCJ9fX0K",'
Expand Down
5 changes: 4 additions & 1 deletion test/publishers/kafka_publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
BEST_COMPRESSION = 9

def timeStampMasking(message):
"""Mask four bytes in Gzip stream that contain timestamp."""
message=list(message)
message[4] = 0
message[5] = 0
Expand All @@ -134,6 +135,7 @@ def test_init_compression():

@pytest.mark.parametrize("input", VALID_INPUT_MSG)
def test_compressing_enabled(input):
"""Check if message is gzipped if compression is enabled."""
input = bytes(json.dumps(input) + "\n",'utf-8')
expected_output = timeStampMasking(gzip.compress(input,compresslevel=BEST_COMPRESSION))
kakfa_config = {
Expand All @@ -149,6 +151,7 @@ def test_compressing_enabled(input):

@pytest.mark.parametrize("input", VALID_INPUT_MSG)
def test_compressing_disabled(input):
"""Check if message is not gzipped if compression is disabled."""
input = bytes(json.dumps(input) + "\n",'utf-8')
expected_output = input
kakfa_config = {
Expand All @@ -157,4 +160,4 @@ def test_compressing_disabled(input):
pub = KafkaPublisher(outgoing_topic="topic-name", **kakfa_config)
pub.producer = MagicMock()
pub.produce(input)
pub.producer.produce.assert_called_with("topic-name",expected_output)
pub.producer.produce.assert_called_with("topic-name",expected_output)
1 change: 1 addition & 0 deletions test/publishers/unicode_encode_error_thower.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@


class UnicodeEncodeErrorThrower:

"""UnicodeErrorThrower is helper class for testing string handling."""

def __init__(self):
Expand Down
3 changes: 1 addition & 2 deletions test/sentry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@


def test_get_event_level():
"""
Verify the `get_event_level` function works.
"""Verify the `get_event_level` function works.
Check that it returns `logging.WARNING` if SENTRY_CATCH_WARNINGS
is set, otherwise return `logging.ERROR`.
Expand Down
3 changes: 1 addition & 2 deletions test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@


def mock_consumer_process_no_action_catch_exception(duration_s=0):
"""
Mock the process method of ICM consumer so it does nothing.
"""Mock the process method of ICM consumer so it does nothing.
This mocks the process method and removes all processing, but
still catches the raised exceptions.
Expand Down
2 changes: 1 addition & 1 deletion test/watchers/stats_watcher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def test_stats_watcher_initialize_invalid_port(value):
_ = StatsWatcher(value)


_VALID_PORTS = [dict(), {"prometheus_port": 9500}, {"prometheus_port": 80}]
_VALID_PORTS = [{}, {"prometheus_port": 9500}, {"prometheus_port": 80}]


@pytest.mark.parametrize("value", _VALID_PORTS)
Expand Down

0 comments on commit 453b025

Please sign in to comment.