Skip to content

Commit

Permalink
Merge pull request #69 from JiriPapousek/gathered-at-not-available
Browse files Browse the repository at this point in the history
Make passing gathering time more robust
  • Loading branch information
JiriPapousek authored Aug 16, 2023
2 parents 6cd8d82 + 28ee8f0 commit bb964da
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 13 deletions.
51 changes: 42 additions & 9 deletions ccx_messaging/publishers/rule_processing_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

"""Module that implements a custom Kafka publisher."""

import datetime
import json
import logging
from json import JSONDecodeError
Expand All @@ -24,6 +25,8 @@

log = logging.getLogger(__name__)

RFC3339_FORMAT = "%Y-%m-%dT%H:%M:%SZ"


class RuleProcessingPublisher(KafkaPublisher):
"""
Expand All @@ -41,6 +44,40 @@ def __init__(self, outgoing_topic, kafka_broker_config=None, **kwargs):
super().__init__(outgoing_topic, kafka_broker_config, **kwargs)
self.outdata_schema_version = 2

def validate_timestamp_rfc3339(self, timestamp):
"""
Check if the timestamp matches RFC3339 format.
"""
try:
datetime.datetime.strptime(timestamp, RFC3339_FORMAT)
except:
return False
return True

def get_gathering_time(self, input_msg):
"""
Retrieve the gathering time from input message if present, otherwise create one.
"""
gathered_at = input_msg.get("metadata", {}) \
.get("custom_metadata", {}) \
.get("gathering_time", None)

if not gathered_at:
log.debug("Gathering time is not present; creating replacement")
gathered_at = datetime.datetime.now().strftime(RFC3339_FORMAT)

# If the timestamp is not in correct format, try to parse
# format used in molodec. Otherwise use current timestamp.
if not self.validate_timestamp_rfc3339(gathered_at):
try:
gathered_at = datetime.datetime.fromisoformat(gathered_at).strftime(RFC3339_FORMAT)
log.debug("Converting gathering time from ISO format to RFC3339 format")
except ValueError:
log.debug("Gathering time could not be parsed; creating replacement")
gathered_at = datetime.datetime.now().strftime(RFC3339_FORMAT)

return gathered_at

def publish(self, input_msg, response):
"""
Publish an EOL-terminated JSON message to the output Kafka topic.
Expand Down Expand Up @@ -73,16 +110,11 @@ def publish(self, input_msg, response):
"LastChecked": msg_timestamp,
"Version": self.outdata_schema_version,
"RequestId": input_msg.get("request_id"),
}

gathered_at = input_msg.get("metadata", {}) \
.get("custom_metadata", {}) \
.get("gathering_time", None)
if gathered_at:
output_msg["Metadata"] = {
"gathering_time": gathered_at
"Metadata": {
"gathering_time": self.get_gathering_time(input_msg)
}

}

message = json.dumps(output_msg) + "\n"

log.debug("Sending response to the %s topic.", self.topic)
Expand Down Expand Up @@ -115,4 +147,5 @@ def publish(self, input_msg, response):
raise CCXMessagingError("Missing expected keys in the input message") from err

except (TypeError, UnicodeEncodeError, JSONDecodeError) as err:
log.info(err)
raise CCXMessagingError(f"Error encoding the response to publish: {response}") from err
62 changes: 58 additions & 4 deletions test/publishers/rule_processing_publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

"""Tests for the RuleProcessingPublisher class."""

import freezegun
import json
from unittest.mock import MagicMock, patch

Expand Down Expand Up @@ -148,6 +149,9 @@ def test_publish_bad_argument(wrong_input_msg):
"LastChecked": "a timestamp",
"Version": 2,
"RequestId": "a request id",
"Metadata": {
"gathering_time": "2012-01-14T00:00:00Z"
}
},
id="with account",
),
Expand All @@ -174,6 +178,9 @@ def test_publish_bad_argument(wrong_input_msg):
"LastChecked": "a timestamp",
"Version": 2,
"RequestId": "a request id",
"Metadata": {
"gathering_time": "2012-01-14T00:00:00Z"
}
},
id="invalid account",
),
Expand All @@ -200,6 +207,9 @@ def test_publish_bad_argument(wrong_input_msg):
"LastChecked": "a timestamp",
"Version": 2,
"RequestId": "a request id",
"Metadata": {
"gathering_time": "2012-01-14T00:00:00Z"
}
},
id="empty account",
),
Expand All @@ -225,6 +235,9 @@ def test_publish_bad_argument(wrong_input_msg):
"LastChecked": "a timestamp",
"Version": 2,
"RequestId": "a request id",
"Metadata": {
"gathering_time": "2012-01-14T00:00:00Z"
}
},
id="no account",
),
Expand All @@ -243,7 +256,7 @@ def test_publish_bad_argument(wrong_input_msg):
"offset": 100,
"metadata": {
"custom_metadata": {
"gathering_time": "2023-08-14T09:31:46.677052"
"gathering_time": "2023-08-14T09:31:46Z"
}
}
},
Expand All @@ -256,11 +269,44 @@ def test_publish_bad_argument(wrong_input_msg):
"Version": 2,
"RequestId": "a request id",
"Metadata": {
"gathering_time": "2023-08-14T09:31:46.677052"
"gathering_time": "2023-08-14T09:31:46Z"
}
},
id="with gathering timestamp",
),
pytest.param(
{
"identity": {
"identity": {
"internal": {"org_id": 10},
},
},
"timestamp": "a timestamp",
"cluster_name": "uuid",
"request_id": "a request id",
"topic": "incoming_topic",
"partition": 0,
"offset": 100,
"metadata": {
"custom_metadata": {
"gathering_time": "2023-08-14T09:31:46.677052"
}
}
},
{
"OrgID": 10,
"AccountNumber": "",
"ClusterName": "uuid",
"Report": {},
"LastChecked": "a timestamp",
"Version": 2,
"RequestId": "a request id",
"Metadata": {
"gathering_time": "2023-08-14T09:31:46Z"
}
},
id="with gathering timestamp in ISO format",
),
pytest.param(
{
"identity": {
Expand All @@ -287,7 +333,10 @@ def test_publish_bad_argument(wrong_input_msg):
"Report": {},
"LastChecked": "a timestamp",
"Version": 2,
"RequestId": "a request id"
"RequestId": "a request id",
"Metadata": {
"gathering_time": "2012-01-14T00:00:00Z"
}
},
id="with custom metadata without gathering timestamp",
),
Expand All @@ -314,13 +363,17 @@ def test_publish_bad_argument(wrong_input_msg):
"Report": {},
"LastChecked": "a timestamp",
"Version": 2,
"RequestId": "a request id"
"RequestId": "a request id",
"Metadata": {
"gathering_time": "2012-01-14T00:00:00Z"
}
},
id="empty metadata",
),
]


@freezegun.freeze_time("2012-01-14")
@pytest.mark.parametrize("input, expected_output", VALID_INPUT_MSG)
def test_publish_valid(input, expected_output):
"""Check that Kafka producer is called with an expected message."""
Expand Down Expand Up @@ -356,6 +409,7 @@ def test_publish_invalid_report(invalid_report):
assert not sut.producer.produce.called


@freezegun.freeze_time("2012-01-14")
@pytest.mark.parametrize("input,output", VALID_INPUT_MSG)
def test_error(input, output):
"""Check that error just prints a log."""
Expand Down

0 comments on commit bb964da

Please sign in to comment.