Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Signatures #752

Merged
merged 11 commits into from
Nov 28, 2024
135 changes: 61 additions & 74 deletions signatures/abstracts.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from re import findall
import re
from typing import Any, Dict, List, Optional, Union

from assemblyline.common.str_utils import safe_str
Expand All @@ -15,14 +15,14 @@ class Signature:

def __init__(
self,
heuristic_id: int = None,
name: str = None,
description: str = None,
ttp: List[str] = None,
families: List[str] = None,
indicators: List[str] = None,
severity: int = None,
safelist: List[str] = None,
heuristic_id: int | None = None,
name: str | None = None,
description: str | None = None,
ttp: List[str] | None = None,
families: List[str] | None = None,
indicators: List[str] | None = None,
severity: int = 0,
safelist: List[str] | None = None,
):
"""
This method instantiates the base Signature class and performs some validtion checks
Expand Down Expand Up @@ -51,11 +51,11 @@ def __init__(
if severity is None:
self.severity: int = 0
elif severity < 0:
self.severity: int = 0
self.severity = 0
elif severity > 3:
self.severity: int = 3
self.severity = 3
else:
self.severity: int = severity
self.severity = severity

self.safelist: List[str] = [] if safelist is None else safelist

Expand All @@ -69,28 +69,20 @@ def check_indicators_in_list(self, output: List[str], match_all: bool = False) -
:param match_all: All indicators must be found in a single line for a mark to be added
"""
for string in output:
# For more lines of output, there is a datetime separated by a ]. We do not want the datetime.
split_line = string.split("] ")
if len(split_line) == 2:
string = split_line[1]
elif len(split_line) > 2:
string = "] ".join(split_line[1:])

# If we want to match all indicators in a line and nothing from the safelist is in that line, mark it!
if (
match_all
and all(indicator.lower() in string.lower() for indicator in self.indicators)
and not any(item.lower() in string.lower() for item in self.safelist)
):
string = self.remove_timestamp(string)
lower = string.lower()
any_all = all if match_all else any
# If we match indicators in a line and nothing from the safelist is in that line, mark it!
if any_all(indicator.lower() in lower for indicator in self.indicators) and not self.safelisted(lower):
self.add_mark(string)

# If we only want to match at least one indicator in a line, then mark it!
if not match_all:
for indicator in self.indicators:
if indicator.lower() in string.lower() and not any(
item.lower() in string.lower() for item in self.safelist
):
self.add_mark(string)
def safelisted(self, string: str) -> bool:
"""
This method checks if the string contains any safelisted terms
:param string: The string to check
"""
string = string.lower()
return any(item.lower() in string for item in self.safelist)

@staticmethod
def check_regex(regex: str, string: str) -> List[str]:
Expand All @@ -99,11 +91,7 @@ def check_regex(regex: str, string: str) -> List[str]:
:param regex: A regular expression to be applied to the string
:param string: A line of output
"""
result = findall(regex, string)
if len(result) > 0:
return result
else:
return []
return re.findall(regex, string)

def process_output(self, output: List[str]):
"""
Expand All @@ -117,13 +105,25 @@ def add_mark(self, mark: Any) -> bool:
:param mark: The mark to be added
:return: A boolean indicating if the mark was added
"""
if mark:
if safe_str(mark).strip() not in self.marks:
# Sometimes lines end with trailing semi-colons and sometimes they do not. These are not unique marks
if safe_str(mark).strip() + ";" not in self.marks:
self.marks.append(safe_str(mark).strip())
else:
if not mark:
return False
mark = safe_str(mark).strip()
if mark not in self.marks and mark + ";" not in self.marks:
# Sometimes lines end with trailing semi-colons and sometimes they do not. These are not unique marks
self.marks.append(mark)
return True
return False

@staticmethod
def remove_timestamp(line: str) -> str:
"""
This method removes the timestamp from the start of an output line
:param line: The line to strip the timestamp from
"""
if not line.startswith("["):
return line
# For more lines of output, there is a datetime separated by a ]. We do not want the datetime.
return line.split("] ", 1)[-1]

def check_multiple_indicators_in_list(self, output: List[str], indicators: List[Dict[str, List[str]]]) -> None:
"""
Expand All @@ -144,35 +144,22 @@ def check_multiple_indicators_in_list(self, output: List[str], indicators: List[

for string in output:
# For more lines of output, there is a datetime separated by a ]. We do not want the datetime.
split_line = string.split("] ")
if len(split_line) == 2:
string = split_line[1]

# If all_indicators
are_indicators_matched = True
for all_indicator in all_indicators:
if are_indicators_matched and all(
indicator.lower() in string.lower() for indicator in all_indicator["indicators"]
):
for any_indicator in any_indicators:
if are_indicators_matched and any(
indicator.lower() in string.lower() for indicator in any_indicator["indicators"]
):
pass
else:
are_indicators_matched = False
else:
are_indicators_matched = False

# If no all_indicators
if not all_indicators:
for any_indicator in any_indicators:
if are_indicators_matched and any(
indicator.lower() in string.lower() for indicator in any_indicator["indicators"]
):
pass
else:
are_indicators_matched = False

if are_indicators_matched and not any(item.lower() in string.lower() for item in self.safelist):
string = self.remove_timestamp(string)
lower = string.lower()

# We want all of the indicators to match
if (
all(
# With all_indicators matching all of their indicators
all(indicator.lower() in lower for indicator in all_indicator["indicators"])
for all_indicator in all_indicators
)
and all(
# And any_indicators matching any of their indicators
any(indicator.lower() in lower for indicator in any_indicator["indicators"])
for any_indicator in any_indicators
)
# But nothing from the safelist
and not self.safelisted(lower)
):
self.add_mark(string)
29 changes: 15 additions & 14 deletions signatures/save_to_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
These are all of the signatures related to saving a file
"""

import re

from assemblyline.common.str_utils import safe_str

from signatures.abstracts import ANY, Signature

# List of commands used to save a file to disk
Expand Down Expand Up @@ -34,11 +37,13 @@ def __init__(self):
)

def process_output(self, output):
indicator_list = [
{"method": ANY, "indicators": save_commands},
{"method": ANY, "indicators": self.indicators},
]
self.check_multiple_indicators_in_list(output, indicator_list)
for line in output:
line = self.remove_timestamp(line)
lower = line.lower()
if re.search(r"[.](exe|dll)\b", lower) and any(
save_command.lower() in lower for save_command in save_commands
):
self.add_mark(line)


class WritesArchive(Signature):
Expand All @@ -48,7 +53,7 @@ def __init__(self):
name="writes_archive",
description="JavaScript writes archive file to disk",
# File extensions based on https://github.com/CybercentreCanada/assemblyline-service-cape/blob/2412416fd8040897d25d00bdaba6356d514398f4/cape/cape_main.py#L1343
indicators=["\\.zip", "\\.iso", "\\.rar", "\\.vhd", "\\.udf", "\\.7z"],
indicators=["zip", "iso", "rar", "vhd", "udf", "7z"],
severity=3,
)

Expand All @@ -57,21 +62,17 @@ def process_output(self, output):
results = []

# First look for file extensions
extension_regex = f"(?i)({'|'.join(self.indicators)})\\b"
extension_regex = f"(?i)\\w[.]({'|'.join(self.indicators)})\\b"
for line in output:
split_line = line.split("] ")
if len(split_line) == 2:
string = split_line[1]
else:
string = line
if self.check_regex(extension_regex, string.lower()):
string = self.remove_timestamp(line)
if re.search(extension_regex, string.lower()):
extension_results.append(string)

# Next look for the command
escaped_save_commands = [save_command.replace("(", "\\(") for save_command in save_commands]
commands_regex = f"({'|'.join(escaped_save_commands)})"
for line in extension_results:
if self.check_regex(commands_regex, line):
if re.search(commands_regex, line):
results.append(line)

results_set = sorted(set(results))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
},
{
"auto_collapse": false,
"body": "The prefix '_0x' in names of variables and functions suggests that obfuscated code exists\n\t\tvar _0x4b = [\"\\\\ProgramData\\\\\", \"Scripting.FileSystemObject\", \"WinHttp.WinHttpRequest.5.1\", \"WScript...\n\t\tvar _a = new ActiveXObject(_0x4b[1])\n\t\tvar _b = new ActiveXObject(_0x4b[2])\n\t\tvar _c = new ActiveXObject(_0x4b[3])\n\t\tvar _d = _0x4b[18]\n\t\tvar _e = _0x4b[19]\n\t\tvar _f = _0x4b[20]\n\t\t_b[_0x4b[5]](_0x4b[4], _e, false)\n\t\t_b[_0x4b[6]]()\n\t\tvar _g = new ActiveXObject(_0x4b[9])\n\t\t[6 Mark(s) Truncated]",
"body": "The prefix '_0x' in names of variables and functions suggests that obfuscated code exists\n\t\tvar _0x4b = [\"\\\\ProgramData\\\\\", \"Scripting.FileSystemObject\", \"WinHttp.WinHttpRequest.5.1\", \"WScript...\n\t\tvar _a = new ActiveXObject(_0x4b[1])\n\t\tvar _b = new ActiveXObject(_0x4b[2])\n\t\tvar _c = new ActiveXObject(_0x4b[3])\n\t\tvar _d = _0x4b[18]\n\t\tvar _e = _0x4b[19]\n\t\tvar _f = _0x4b[20]\n\t\t_b[_0x4b[5]](_0x4b[4], _e, false)\n\t\t_b[_0x4b[6]]()\n\t\tif (_b[_0x4b[7]] == 200) {\n\t\t[9 Mark(s) Truncated]",
"body_config": {},
"body_format": "TEXT",
"classification": "TLP:C",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"extra": {
"drop_file": false,
"score": 2021,
"score": 2031,
"sections": [
{
"auto_collapse": false,
Expand Down Expand Up @@ -136,7 +136,7 @@
},
{
"auto_collapse": false,
"body": "JavaScript uses a MemoryStream object to manipulate memory\n\t\t\"function LtGEs($XGqbLJV){$xamrMR=\"A8AFCE8D37\";function szxwJG($PQNgWB){$JUxiC = [System.IO.MemorySt...\n\t\tTypeError: \"function LtGEs($XGqbLJV){$xamrMR=\"A8AFCE8D37\";function szxwJG($PQNgWB){$JUxiC = [System....\n\t\tfunction szxwJG($PQNgWB){$JUxiC = [System.IO.MemoryStream]::new()",
"body": "JavaScript uses a MemoryStream object to manipulate memory\n\t\t\"function LtGEs($XGqbLJV){$xamrMR=\"A8AFCE8D37\";function szxwJG($PQNgWB){$JUxiC = [System.IO.MemorySt...\n\t\tR= 'ShELLEXEcUtESTdinlastIndexOfcscriptWsCriPT.ShElLsLeepsCrIptFulLNAmeshell.APPLiCaTioNCrEAtEObJeCT...\n\t\tTypeError: \"function LtGEs($XGqbLJV){$xamrMR=\"A8AFCE8D37\";function szxwJG($PQNgWB){$JUxiC = [System....\n\t\tfunction szxwJG($PQNgWB){$JUxiC = [System.IO.MemoryStream]::new()",
"body_config": {},
"body_format": "TEXT",
"classification": "TLP:C",
Expand All @@ -160,7 +160,7 @@
},
{
"auto_collapse": false,
"body": "JavaScript runs PowerShell via powershell.exe\n\t\tR= 'ShELLEXEcUtESTdinlastIndexOfcscriptWsCriPT.ShElLsLeepsCrIptFulLNAmeshell.APPLiCaTioNCrEAtEObJeCT...",
"body": "JavaScript runs PowerShell via powershell.exe\n\t\tR= 'ShELLEXEcUtESTdinlastIndexOfcscriptWsCriPT.ShElLsLeepsCrIptFulLNAmeshell.APPLiCaTioNCrEAtEObJeCT...\n\t\tR= 'ShELLEXEcUtESTdinlastIndexOfcscriptWsCriPT.ShElLsLeepsCrIptFulLNAmeshell.APPLiCaTioNCrEAtEObJeCT...",
"body_config": {},
"body_format": "TEXT",
"classification": "TLP:C",
Expand All @@ -182,6 +182,30 @@
"title_text": "Signature: RunsPowerShell",
"zeroize_on_tag_safe": false
},
{
"auto_collapse": false,
"body": "JavaScript runs PowerShell to call out to a URI\n\t\tR= 'ShELLEXEcUtESTdinlastIndexOfcscriptWsCriPT.ShElLsLeepsCrIptFulLNAmeshell.APPLiCaTioNCrEAtEObJeCT...",
"body_config": {},
"body_format": "TEXT",
"classification": "TLP:C",
"depth": 1,
"heuristic": {
"attack_ids": [],
"frequency": 1,
"heur_id": 3,
"score": 10,
"score_map": {
"runs_ps1_to_download": 10
},
"signatures": {
"runs_ps1_to_download": 1
}
},
"promote_to": null,
"tags": {},
"title_text": "Signature: PowerShellDownloader",
"zeroize_on_tag_safe": false
},
{
"auto_collapse": false,
"body": [
Expand Down Expand Up @@ -356,6 +380,13 @@
"runs_ps1"
]
},
{
"attack_ids": [],
"heur_id": 3,
"signatures": [
"runs_ps1_to_download"
]
},
{
"attack_ids": [],
"heur_id": 4,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@
},
{
"auto_collapse": false,
"body": "JavaScript uses charCodeAt/fromCharCode to obfuscate/de-obfuscate characters\n\t\t= jUVqAOisGJal[zWCwJfTxMEaZ].charCodeAt(0)",
"body": "JavaScript uses charCodeAt/fromCharCode to obfuscate/de-obfuscate characters\n\t\tGdxgkbZtlbkX[i] = jUVqAOisGJal[zWCwJfTxMEaZ].charCodeAt(0)",
"body_config": {},
"body_format": "TEXT",
"classification": "TLP:C",
Expand Down
Loading