-
Notifications
You must be signed in to change notification settings - Fork 32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Modified the trustymail code to handle SPF redirects #29
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,12 @@ | ||
import csv | ||
import datetime | ||
import json | ||
import logging | ||
import re | ||
import requests | ||
import smtplib | ||
import spf | ||
import datetime | ||
import json | ||
import socket | ||
import spf | ||
|
||
import DNS | ||
import dns.resolver | ||
|
@@ -167,70 +168,146 @@ def starttls_scan(domain, smtp_timeout, smtp_localhost, smtp_ports, smtp_cache): | |
domain.starttls_results[server_and_port] = _SMTP_CACHE[server_and_port] | ||
|
||
|
||
def spf_scan(resolver, domain): | ||
def check_spf_record(record_text, expected_result, domain): | ||
""" | ||
Test to see if an SPF record is valid and correct. | ||
|
||
The record is tested by checking the response when we query if it | ||
allows us to send mail from an IP that is known not to be a mail | ||
server that appears in the MX records for ANY domain. | ||
|
||
Parameters | ||
---------- | ||
record_text : str | ||
The text of the SPF record to be tested. | ||
|
||
expected_result : str | ||
The expected result of the test. | ||
|
||
domain : trustymail.Domain | ||
The Domain object corresponding to the SPF record being | ||
tested. Any errors will be logged to this object. | ||
""" | ||
try: | ||
# Here I am using the IP address for c1b1.ncats.cyber.dhs.gov | ||
# (64.69.57.18) since it (1) has a valid PTR record and (2) is not | ||
# listed by anyone as a valid mail server. | ||
# | ||
# I'm actually temporarily using an IP that virginia.edu resolves to | ||
# until we resolve why Google DNS does not return the same PTR records | ||
# as the CAL DNS does for 64.69.57.18. | ||
query = spf.query("128.143.22.36", "email_wizard@" + domain.domain_name, domain.domain_name, strict=2) | ||
response = query.check() | ||
|
||
if response[0] == 'temperror': | ||
logging.debug(response[2]) | ||
elif response[0] == 'permerror': | ||
logging.debug('\t' + response[2]) | ||
domain.syntax_errors.append(response[2]) | ||
elif response[0] == 'ambiguous': | ||
logging.debug('\t' + response[2]) | ||
domain.syntax_errors.append(response[2]) | ||
elif response[0] == expected_result: | ||
# Everything checks out the SPF syntax seems valid. | ||
domain.valid_spf = True | ||
else: | ||
domain.valid_spf = False | ||
logging.debug('\tResult Differs: Expected [{0}] - Actual [{1}]'.format(expected_result, response[0])) | ||
domain.errors.append('Result Differs: Expected [{0}] - Actual [{1}]'.format(expected_result, response[0])) | ||
except spf.AmbiguityWarning as error: | ||
logging.debug('\t' + error.msg) | ||
domain.syntax_errors.append(error.msg) | ||
|
||
|
||
def get_spf_record_text(resolver, domain_name, domain, follow_redirect=False): | ||
""" | ||
Get the SPF record text for the given domain name. | ||
|
||
DNS queries are performed using the dns.resolver.Resolver object. | ||
Errors are logged to the trustymail.Domain object. The Boolean | ||
parameter indicates whether to follow redirects in SPF records. | ||
|
||
Parameters | ||
---------- | ||
resolver : dns.resolver.Resolver | ||
The Resolver object to use for DNS queries. | ||
|
||
domain_name : str | ||
The domain name to query for an SPF record. | ||
|
||
domain : trustymail.Domain | ||
The Domain object whose corresponding SPF record text is | ||
desired. Any errors will be logged to this object. | ||
|
||
follow_redirect : bool | ||
A Boolean value indicating whether to follow redirects in SPF | ||
records. | ||
|
||
Returns | ||
------- | ||
str: The desired SPF record text | ||
""" | ||
record_to_return = None | ||
try: | ||
# Use TCP, since we care about the content and correctness of the | ||
# records more than whether their records fit in a single UDP packet. | ||
for record in resolver.query(domain.domain_name, 'TXT', tcp=True): | ||
record_text = record.to_text() | ||
|
||
if record_text.startswith("\""): | ||
record_text = record_text[1:-1] | ||
for record in resolver.query(domain_name, 'TXT', tcp=True): | ||
record_text = record.to_text().strip('"') | ||
|
||
if not record_text.startswith("v=spf1"): | ||
if not record_text.startswith('v=spf1'): | ||
# Not an spf record, ignore it. | ||
continue | ||
|
||
domain.spf.append(record_text) | ||
|
||
# From the found record grab the specific result when something doesn't match. | ||
# Definitions of result come from https://www.ietf.org/rfc/rfc4408.txt | ||
if record_text.endswith("-all"): | ||
result = 'fail' | ||
elif record_text.endswith("?all"): | ||
result = "neutral" | ||
elif record_text.endswith("~all"): | ||
result = "softfail" | ||
elif record_text.endswith("all") or record_text.endswith("+all"): | ||
result = "pass" | ||
match = re.search('v=spf1\s*redirect=(\S*)', record_text) | ||
if follow_redirect and match: | ||
redirect_domain_name = match.group(1) | ||
record_to_return = get_spf_record_text(resolver, redirect_domain_name, domain) | ||
else: | ||
result = "neutral" | ||
|
||
try: | ||
# Here I am using the IP address for c1b1.ncats.cyber.dhs.gov | ||
# (64.69.57.18) since it (1) has a valid PTR record and (2) is | ||
# not listed by anyone as a valid mail server. | ||
# | ||
# I'm actually temporarily using an IP that | ||
# virginia.edu resolves to until we resolve why Google | ||
# DNS does not return the same PTR records as the CAL | ||
# DNS does for 64.69.57.18. | ||
query = spf.query("128.143.22.36", "email_wizard@" + domain.domain_name, domain.domain_name, strict=2) | ||
response = query.check() | ||
except spf.AmbiguityWarning as error: | ||
logging.debug("\t" + error.msg) | ||
domain.syntax_errors.append(error.msg) | ||
continue | ||
record_to_return = record_text | ||
except (dns.resolver.NoNameservers, dns.resolver.NoAnswer, dns.exception.Timeout, dns.resolver.NXDOMAIN) as error: | ||
handle_error('[SPF]', domain, error) | ||
|
||
if response[0] == 'temperror': | ||
logging.debug(response[2]) | ||
elif response[0] == 'permerror': | ||
logging.debug("\t" + response[2]) | ||
domain.syntax_errors.append(response[2]) | ||
elif response[0] == 'ambiguous': | ||
logging.debug("\t" + response[2]) | ||
domain.syntax_errors.append(response[2]) | ||
elif response[0] == result: | ||
# Everything checks out the SPF syntax seems valid. | ||
domain.valid_spf = True | ||
continue | ||
else: | ||
domain.valid_spf = False | ||
logging.debug("\tResult Differs: Expected [{0}] - Actual [{1}]".format(result, response[0])) | ||
domain.errors.append("Result Differs: Expected [{0}] - Actual [{1}]".format(result, response[0])) | ||
return record_to_return | ||
|
||
except (dns.resolver.NoNameservers, dns.resolver.NoAnswer, dns.exception.Timeout, dns.resolver.NXDOMAIN) as error: | ||
handle_error("[SPF]", domain, error) | ||
|
||
def spf_scan(resolver, domain): | ||
""" | ||
Scan a domain to see if it supports SPF. If the domain has an SPF | ||
record, verify that it properly rejects mail sent from an IP known | ||
to be disallowed. | ||
|
||
Parameters | ||
---------- | ||
resolver : dns.resolver.Resolver | ||
The Resolver object to use for DNS queries. | ||
|
||
domain : trustymail.Domain | ||
The Domain object being scanned for SPF support. Any errors | ||
will be logged to this object. | ||
""" | ||
# If an SPF record exists, record the raw SPF record text in the | ||
# Domain object | ||
record_text_not_following_redirect = get_spf_record_text(resolver, domain.domain_name, domain) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
if record_text_not_following_redirect: | ||
domain.spf.append(record_text_not_following_redirect) | ||
|
||
record_text_following_redirect = get_spf_record_text(resolver, domain.domain_name, domain, True) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
if record_text_following_redirect: | ||
# From the found record grab the specific result when something | ||
# doesn't match. Definitions of result come from | ||
# https://www.ietf.org/rfc/rfc4408.txt | ||
if record_text_following_redirect.endswith('-all'): | ||
result = 'fail' | ||
elif record_text_following_redirect.endswith('?all'): | ||
result = 'neutral' | ||
elif record_text_following_redirect.endswith('~all'): | ||
result = 'softfail' | ||
elif record_text_following_redirect.endswith('all') or record_text_following_redirect.endswith('+all'): | ||
result = 'pass' | ||
else: | ||
result = 'neutral' | ||
|
||
check_spf_record(record_text_not_following_redirect, result, domain) | ||
|
||
|
||
def dmarc_scan(resolver, domain): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should
follow_redirect
default to True?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comments below, in the
spf_scan
function.