Skip to content

Commit

Permalink
add extra method for normalizing the timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
djkhl committed Aug 14, 2024
1 parent 5c87128 commit 64b2753
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 37 deletions.
61 changes: 25 additions & 36 deletions logprep/processor/pre_detector/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,30 @@ class Config(Processor.Config):
def _ip_alerter(self):
return IPAlerter(self._config.alert_ip_list_path)

# def detect_format_and_normalize_timestamp(self, timestamp):
# """method for detecting the used source format of a timestamp and normalizing it"""
# # formats = [
# # "%Y%m%d%H%M%S",
# # "UNIX",
# # ]
# # for form in formats:
# # try:
# # return TimeParser.parse_datetime(timestamp, form, timezone.utc).isoformat()
# # except TimeParserException:
# # continue
def normalize_timestamp(self, event, rule, timestamp):
"""method for normalizing the timestamp"""
source_timezone, target_timezone, source_formats = (
rule.source_timezone,
rule.target_timezone,
rule.source_formats,
)
parsed_successfully = False
for detection, _ in self.result.data:
for source_format in source_formats:
try:
parsed_datetime = TimeParser.parse_datetime(
timestamp, source_format, source_timezone
)
except TimeParserException:
continue
result = (
parsed_datetime.astimezone(target_timezone).isoformat().replace("+00:00", "Z")
)
detection[rule.timestamp_field] = result
parsed_successfully = True
break
if not parsed_successfully:
raise ProcessingWarning(str("Could not parse timestamp"), rule, event)

def _apply_rules(self, event, rule):
if not (
Expand All @@ -117,31 +130,7 @@ def _apply_rules(self, event, rule):
timestamp = get_dotted_field_value(event, "@timestamp")

if timestamp is not None:
# timestamp = self.detect_format_and_normalize_timestamp(timestamp)

source_timezone, target_timezone, source_formats = (
rule.source_timezone,
rule.target_timezone,
rule.source_formats,
)
parsed_successfully = False
for source_format in source_formats:
try:
parsed_datetime = TimeParser.parse_datetime(
timestamp, source_format, source_timezone
)
except TimeParserException:
continue
result = (
parsed_datetime.astimezone(target_timezone)
.isoformat()
.replace("+00:00", "Z")
)
detection["@timestamp"] = result
parsed_successfully = True
break
if not parsed_successfully:
raise ProcessingWarning(str("Could not parse timestamp"), rule, event)
self.normalize_timestamp(event, rule, timestamp)

def _get_detection_result(self, event: dict, rule: PreDetectorRule):
pre_detection_id = get_dotted_field_value(event, "pre_detection_id")
Expand Down
4 changes: 4 additions & 0 deletions logprep/processor/pre_detector/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,8 @@ def target_timezone(self) -> str:
def source_timezone(self) -> str:
return self._config.source_timezone

@property
def timestamp_field(self) -> str:
return self._config.timestamp_field

# pylint: enable=C0111
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,5 @@ pre_detector:
- attack.test1
- attack.test2
case_condition: directly
source_formats: ["UNIX"]
sigma_fields: true
description: Test rule four

0 comments on commit 64b2753

Please sign in to comment.