Skip to content

Commit

Permalink
Merge pull request #60 from InQuest/dev
Browse files Browse the repository at this point in the history
Resolving more community issues
  • Loading branch information
azazelm3dj3d authored Jan 11, 2023
2 parents 00ba370 + c9d2a1f commit c24483d
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 42 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
build/
dist/
_build/
docs-dist/
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# -- Project information -----------------------------------------------------

project = u'iocextract'
copyright = u'2019 InQuest, LLC'
copyright = u'2019 - 2023 InQuest, LLC'
author = u'InQuest Labs'

# The short X.Y version
Expand Down
162 changes: 122 additions & 40 deletions iocextract.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,35 @@ def url_re(open_end=False):
HTTPS_SCHEME_DEFANG_RE = re.compile('hxxps', re.IGNORECASE)

# Get some valid obfuscated ip addresses.
IPV4_RE = re.compile(r"""
(?:^|
(?![^\d\.])
)
(?:
def ipv4_len(ip_len=3):
if ip_len == 3:
IPV4_RE = re.compile(r"""
(?:^|
(?![^\d\.])
)
(?:
(?:[1-9]?\d|1\d\d|2[0-4]\d|25[0-5])
[\[\(\\]*?\.[\]\)]*?
){3}
(?:[1-9]?\d|1\d\d|2[0-4]\d|25[0-5])
(?:(?=[^\d\.])|$)
""", re.VERBOSE)

elif ip_len == 4:
IPV4_RE = re.compile(r"""
(?:^|
(?![^\d\.])
)
(?:
(?:[1-9]?\d|1\d\d|2[0-4]\d|25[0-5])
[\[\(\\]*?\.[\]\)]*?
){4}
([01]?[0-9]?[0-9]|2[0-4][0-9]|25[0-5])
(?:[1-9]?\d|1\d\d|2[0-4]\d|25[0-5])
[\[\(\\]*?\.[\]\)]*?
){3}
(?:[1-9]?\d|1\d\d|2[0-4]\d|25[0-5])
(?:(?=[^\d\.])|$)
""", re.VERBOSE)
(?:(?=[^\d\.])|$)
""", re.VERBOSE)

return IPV4_RE

# Experimental IPv6 regex, will not catch everything but should be sufficent for now.
IPV6_RE = re.compile(r"""
Expand All @@ -210,6 +228,26 @@ def url_re(open_end=False):
# Capture email addresses including common defangs.
EMAIL_RE = re.compile(r"""
(
[a-z0-9_.+-]+
[\(\[{\x20]*
(?:
(?:
(?:
\x20*
""" + SEPARATOR_DEFANGS + r"""
\x20*
)*
\.
(?:
\x20*
""" + SEPARATOR_DEFANGS + r"""
\x20*
)*
|
\W+dot\W+
)
[a-z0-9-]+?
)*
[a-z0-9_.+-]+
[\(\[{\x20]*
(?:@|\Wat\W)
Expand Down Expand Up @@ -283,6 +321,7 @@ def extract_iocs(data, refang=False, strip=False):
:param bool strip: Strip possible garbage from the end of URLs
:rtype: :py:func:`itertools.chain`
"""

return itertools.chain(
extract_urls(data, refang=refang, strip=strip),
extract_ips(data, refang=refang),
Expand All @@ -292,21 +331,22 @@ def extract_iocs(data, refang=False, strip=False):
)


def extract_urls(data, refang=False, strip=False, delimiter=None, open_punc=False):
def extract_urls(data, refang=False, strip=False, delimiter=None, open_punc=False, no_scheme=False):
"""Extract URLs.
:param data: Input text
:param bool refang: Refang output?
:param bool strip: Strip possible garbage from the end of URLs
:rtype: :py:func:`itertools.chain`
"""

return itertools.chain(
extract_unencoded_urls(data, refang=refang, strip=strip, open_punc=open_punc),
extract_unencoded_urls(data, refang=refang, strip=strip, open_punc=open_punc, no_scheme=no_scheme),
extract_encoded_urls(data, refang=refang, strip=strip, delimiter=delimiter),
)


def extract_unencoded_urls(data, refang=False, strip=False, open_punc=False):
def extract_unencoded_urls(data, refang=False, strip=False, open_punc=False, no_scheme=False):
"""Extract only unencoded URLs.
:param data: Input text
Expand All @@ -323,7 +363,7 @@ def extract_unencoded_urls(data, refang=False, strip=False, open_punc=False):

for url in unencoded_urls:
if refang:
url = refang_url(url.group(1))
url = refang_url(url.group(1), no_scheme=no_scheme)
else:
url = url.group(1)

Expand Down Expand Up @@ -411,12 +451,29 @@ def extract_ipv4s(data, refang=False):
:param bool refang: Refang output?
:rtype: Iterator[:class:`str`]
"""
for ip_address in IPV4_RE.finditer(data):

def ipv4_str(data):
protocol_str = re.compile(r"https|http|ftp")

for pro in protocol_str.finditer(data):
if refang:
return refang_ipv4(pro.group(0))
else:
return pro.group(0)

for ip_address in ipv4_len().finditer(data):
# Iterates over any ip address with 4 numbers after the final (3rd) octet
for ip_address in ipv4_len(4).finditer(data):
pass

if refang:
yield refang_ipv4(ip_address.group(0))
else:
yield ip_address.group(0)

if ipv4_str(data) != None:
yield ipv4_str(data)


def extract_ipv6s(data):
"""Extract IPv6 addresses.
Expand All @@ -426,8 +483,11 @@ def extract_ipv6s(data):
:param data: Input text
:rtype: Iterator[:class:`str`]
"""

for ip_address in IPV6_RE.finditer(data):
yield ip_address.group(0)
# Sets a minimal standard for IPv6 (0:0:0:0:0:0:0:0)
if len(data) >= 15:
yield ip_address.group(0)


def extract_emails(data, refang=False):
Expand All @@ -437,7 +497,9 @@ def extract_emails(data, refang=False):
:param bool refang: Refang output?
:rtype: Iterator[:class:`str`]
"""

for email in EMAIL_RE.finditer(data):

if refang:
email = refang_email(email.group(1))
else:
Expand All @@ -455,6 +517,7 @@ def extract_hashes(data):
:param data: Input text
:rtype: :py:func:`itertools.chain`
"""

return itertools.chain(
extract_md5_hashes(data),
extract_sha1_hashes(data),
Expand All @@ -469,6 +532,7 @@ def extract_md5_hashes(data):
:param data: Input text
:rtype: Iterator[:class:`str`]
"""

for md5 in MD5_RE.finditer(data):
yield md5.group(1)

Expand All @@ -479,6 +543,7 @@ def extract_sha1_hashes(data):
:param data: Input text
:rtype: Iterator[:class:`str`]
"""

for sha1 in SHA1_RE.finditer(data):
yield sha1.group(1)

Expand All @@ -489,6 +554,7 @@ def extract_sha256_hashes(data):
:param data: Input text
:rtype: Iterator[:class:`str`]
"""

for sha256 in SHA256_RE.finditer(data):
yield sha256.group(1)

Expand All @@ -499,6 +565,7 @@ def extract_sha512_hashes(data):
:param data: Input text
:rtype: Iterator[:class:`str`]
"""

for sha512 in SHA512_RE.finditer(data):
yield sha512.group(1)

Expand All @@ -509,6 +576,7 @@ def extract_yara_rules(data):
:param data: Input text
:rtype: Iterator[:class:`str`]
"""

for yara_rule in YARA_PARSE_RE.finditer(data):
yield yara_rule.group(1).strip()

Expand Down Expand Up @@ -546,6 +614,7 @@ def extract_custom_iocs(data, regex_list):
:param regex_list: List of strings to treat as regex and match against data.
:rtype: Iterator[:class:`str`]
"""

# Compile all the regex strings first, so we can error out quickly.
regex_objects = []
for regex_string in regex_list:
Expand All @@ -563,6 +632,7 @@ def _is_ipv6_url(url):
:param url: String URL
:rtype: bool
"""

# Fix urlparse exception.
parsed = urlparse(url)

Expand All @@ -586,6 +656,7 @@ def _refang_common(ioc):
:param ioc: String IP/Email Address or URL netloc.
:rtype: str
"""

return ioc.replace('[dot]', '.').\
replace('(dot)', '.').\
replace('[.]', '.').\
Expand All @@ -601,6 +672,7 @@ def refang_email(email):
:param email: String email address.
:rtype: str
"""

# Check for ' at ' and ' dot ' first.
email = re.sub('\W[aA][tT]\W', '@', email.lower())
email = re.sub('\W*[dD][oO][tT]\W*', '.', email)
Expand All @@ -609,16 +681,15 @@ def refang_email(email):
return _refang_common(email).replace('[', '').\
replace(']', '').\
replace('{', '').\
replace('}', '').\
replace('{', '')

replace('}', '')

def refang_url(url):
def refang_url(url, no_scheme=False):
"""Refang a URL.
:param url: String URL
:rtype: str
"""

# First fix urlparse errors.
# Fix ipv6 parsing exception.
if '[.' in url and '[.]' not in url:
Expand Down Expand Up @@ -647,7 +718,8 @@ def refang_url(url):
url = url.replace('\\\\', '//', 1)
else:
# Support no-protocol.
url = 'http://' + url
# url = 'http://' + url
pass

# Refang (/), since it's not entirely in the netloc.
url = url.replace('(/)', '/')
Expand Down Expand Up @@ -676,7 +748,10 @@ def refang_url(url):
elif HTTPS_SCHEME_DEFANG_RE.fullmatch(parsed.scheme):
scheme = 'https'
else:
scheme = 'http'
if no_scheme:
scheme = ''
else:
scheme = 'http'

parsed = parsed._replace(scheme=scheme)
replacee = '{}:///'.format(scheme)
Expand Down Expand Up @@ -708,6 +783,7 @@ def refang_ipv4(ip_address):
:param ip_address: String IPv4 address.
:rtype: str
"""

return _refang_common(ip_address).replace('[', '').\
replace(']', '').\
replace('\\', '')
Expand All @@ -719,6 +795,7 @@ def defang(ioc):
:param ioc: String URL, domain, or IPv4 address.
:rtype: str
"""

# If it's a url, defang just the scheme and netloc.
try:
parsed = urlparse(ioc)
Expand All @@ -745,32 +822,34 @@ def main():
description="""Advanced Indicator of Compromise (IOC) extractor.
If no arguments are specified, the default behavior is
to extract all IOCs.""")
parser.add_argument('--input', type=lambda x: io.open(x, 'r', encoding='utf-8', errors='ignore'),
parser.add_argument('-i', '--input', type=lambda x: io.open(x, 'r', encoding='utf-8', errors='ignore'),
default=io.open(0, 'r', encoding='utf-8', errors='ignore'), help="default: stdin")
parser.add_argument('--output', type=lambda x: io.open(x, 'w', encoding='utf-8', errors='ignore'),
parser.add_argument('-o', '--output', type=lambda x: io.open(x, 'w', encoding='utf-8', errors='ignore'),
default=io.open(1, 'w', encoding='utf-8', errors='ignore'), help="default: stdout")
parser.add_argument('--extract-emails', action='store_true')
parser.add_argument('--extract-ips', action='store_true')
parser.add_argument('--extract-ipv4s', action='store_true')
parser.add_argument('--extract-ipv6s', action='store_true')
parser.add_argument('--extract-urls', action='store_true')
parser.add_argument('--extract-yara-rules', action='store_true')
parser.add_argument('--extract-hashes', action='store_true')
parser.add_argument('--custom-regex', type=lambda x: io.open(x, 'r', encoding='utf-8', errors='ignore'),
parser.add_argument('-ee', '--extract-emails', action='store_true')
parser.add_argument('-ip', '--extract-ips', action='store_true')
parser.add_argument('-ip4', '--extract-ipv4s', action='store_true')
parser.add_argument('-ip6', '--extract-ipv6s', action='store_true')
parser.add_argument('-u', '--extract-urls', action='store_true')
parser.add_argument('-y', '--extract-yara-rules', action='store_true')
parser.add_argument('-ha', '--extract-hashes', action='store_true')
parser.add_argument('-cr', '--custom-regex', type=lambda x: io.open(x, 'r', encoding='utf-8', errors='ignore'),
metavar='REGEX_FILE',
help="file with custom regex strings, one per line, with one capture group each")
parser.add_argument('--refang', action='store_true', help="default: no")
parser.add_argument('--strip-urls', action='store_true',
parser.add_argument('-r', '--refang', action='store_true', help="default: no")
parser.add_argument('-su', '--strip-urls', action='store_true',
help="remove possible garbage from the end of urls. default: no")
parser.add_argument('--wide', action='store_true',
parser.add_argument('-w', '--wide', action='store_true',
help="preprocess input to allow wide-encoded character matches. default: no")
parser.add_argument('--json', action='store_true')
parser.add_argument('--open', action='store_true', help="Removes the end puncuation regex when extracting URLs")
parser.add_argument('-j', '--json', action='store_true')
parser.add_argument('-op', '--open', action='store_true', help="Removes the end puncuation regex when extracting URLs")
parser.add_argument('-rm', '--rm_scheme', action='store_true', help="Removes the protocol from the url (i.e. http, https, etc.)")

args = parser.parse_args()

# Read input.
# Read user unput
data = args.input.read()

if args.wide:
data = data.replace('\x00', '')

Expand All @@ -785,6 +864,7 @@ def main():
args.extract_emails or
args.custom_regex
)

memo = {}

if args.extract_emails or extract_all:
Expand All @@ -795,8 +875,10 @@ def main():
memo["ipv6s"] = list(extract_ipv6s(data))
if args.extract_urls or extract_all:
memo["urls"] = list(extract_urls(data, refang=args.refang, strip=args.strip_urls))
if args.extract_urls and args.open:
memo["urls"] = list(extract_urls(data, refang=args.refang, strip=args.strip_urls, open_punc=True))
if args.open:
memo["open_punc"] = list(extract_urls(data, refang=args.refang, strip=args.strip_urls, open_punc=args.open))
if args.rm_scheme:
memo["no_protocol"] = list(extract_urls(data, refang=args.refang, strip=args.strip_urls, open_punc=args.open, no_scheme=args.rm_scheme))
if args.extract_yara_rules or extract_all:
memo["yara_rules"] = list(extract_yara_rules(data))
if args.extract_hashes or extract_all:
Expand Down
Loading

0 comments on commit c24483d

Please sign in to comment.