Skip to content

Commit

Permalink
Merge pull request #29 from dhs-ncats/bugfix/spf_redirect
Browse files Browse the repository at this point in the history
Modified the trustymail code to handle SPF redirects
  • Loading branch information
h-m-f-t authored Nov 27, 2017
2 parents 15c9548 + 2ad018e commit 06caaa4
Showing 1 changed file with 134 additions and 57 deletions.
191 changes: 134 additions & 57 deletions trustymail/trustymail.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import csv
import datetime
import json
import logging
import re
import requests
import smtplib
import spf
import datetime
import json
import socket
import spf

import DNS
import dns.resolver
Expand Down Expand Up @@ -167,70 +168,146 @@ def starttls_scan(domain, smtp_timeout, smtp_localhost, smtp_ports, smtp_cache):
domain.starttls_results[server_and_port] = _SMTP_CACHE[server_and_port]


def spf_scan(resolver, domain):
def check_spf_record(record_text, expected_result, domain):
"""
Test to see if an SPF record is valid and correct.
The record is tested by checking the response when we query if it
allows us to send mail from an IP that is known not to be a mail
server that appears in the MX records for ANY domain.
Parameters
----------
record_text : str
The text of the SPF record to be tested.
expected_result : str
The expected result of the test.
domain : trustymail.Domain
The Domain object corresponding to the SPF record being
tested. Any errors will be logged to this object.
"""
try:
# Here I am using the IP address for c1b1.ncats.cyber.dhs.gov
# (64.69.57.18) since it (1) has a valid PTR record and (2) is not
# listed by anyone as a valid mail server.
#
# I'm actually temporarily using an IP that virginia.edu resolves to
# until we resolve why Google DNS does not return the same PTR records
# as the CAL DNS does for 64.69.57.18.
query = spf.query("128.143.22.36", "email_wizard@" + domain.domain_name, domain.domain_name, strict=2)
response = query.check()

if response[0] == 'temperror':
logging.debug(response[2])
elif response[0] == 'permerror':
logging.debug('\t' + response[2])
domain.syntax_errors.append(response[2])
elif response[0] == 'ambiguous':
logging.debug('\t' + response[2])
domain.syntax_errors.append(response[2])
elif response[0] == expected_result:
# Everything checks out the SPF syntax seems valid.
domain.valid_spf = True
else:
domain.valid_spf = False
logging.debug('\tResult Differs: Expected [{0}] - Actual [{1}]'.format(expected_result, response[0]))
domain.errors.append('Result Differs: Expected [{0}] - Actual [{1}]'.format(expected_result, response[0]))
except spf.AmbiguityWarning as error:
logging.debug('\t' + error.msg)
domain.syntax_errors.append(error.msg)


def get_spf_record_text(resolver, domain_name, domain, follow_redirect=False):
"""
Get the SPF record text for the given domain name.
DNS queries are performed using the dns.resolver.Resolver object.
Errors are logged to the trustymail.Domain object. The Boolean
parameter indicates whether to follow redirects in SPF records.
Parameters
----------
resolver : dns.resolver.Resolver
The Resolver object to use for DNS queries.
domain_name : str
The domain name to query for an SPF record.
domain : trustymail.Domain
The Domain object whose corresponding SPF record text is
desired. Any errors will be logged to this object.
follow_redirect : bool
A Boolean value indicating whether to follow redirects in SPF
records.
Returns
-------
str: The desired SPF record text
"""
record_to_return = None
try:
# Use TCP, since we care about the content and correctness of the
# records more than whether their records fit in a single UDP packet.
for record in resolver.query(domain.domain_name, 'TXT', tcp=True):
record_text = record.to_text()

if record_text.startswith("\""):
record_text = record_text[1:-1]
for record in resolver.query(domain_name, 'TXT', tcp=True):
record_text = record.to_text().strip('"')

if not record_text.startswith("v=spf1"):
if not record_text.startswith('v=spf1'):
# Not an spf record, ignore it.
continue

domain.spf.append(record_text)

# From the found record grab the specific result when something doesn't match.
# Definitions of result come from https://www.ietf.org/rfc/rfc4408.txt
if record_text.endswith("-all"):
result = 'fail'
elif record_text.endswith("?all"):
result = "neutral"
elif record_text.endswith("~all"):
result = "softfail"
elif record_text.endswith("all") or record_text.endswith("+all"):
result = "pass"
match = re.search('v=spf1\s*redirect=(\S*)', record_text)
if follow_redirect and match:
redirect_domain_name = match.group(1)
record_to_return = get_spf_record_text(resolver, redirect_domain_name, domain)
else:
result = "neutral"

try:
# Here I am using the IP address for c1b1.ncats.cyber.dhs.gov
# (64.69.57.18) since it (1) has a valid PTR record and (2) is
# not listed by anyone as a valid mail server.
#
# I'm actually temporarily using an IP that
# virginia.edu resolves to until we resolve why Google
# DNS does not return the same PTR records as the CAL
# DNS does for 64.69.57.18.
query = spf.query("128.143.22.36", "email_wizard@" + domain.domain_name, domain.domain_name, strict=2)
response = query.check()
except spf.AmbiguityWarning as error:
logging.debug("\t" + error.msg)
domain.syntax_errors.append(error.msg)
continue
record_to_return = record_text
except (dns.resolver.NoNameservers, dns.resolver.NoAnswer, dns.exception.Timeout, dns.resolver.NXDOMAIN) as error:
handle_error('[SPF]', domain, error)

if response[0] == 'temperror':
logging.debug(response[2])
elif response[0] == 'permerror':
logging.debug("\t" + response[2])
domain.syntax_errors.append(response[2])
elif response[0] == 'ambiguous':
logging.debug("\t" + response[2])
domain.syntax_errors.append(response[2])
elif response[0] == result:
# Everything checks out the SPF syntax seems valid.
domain.valid_spf = True
continue
else:
domain.valid_spf = False
logging.debug("\tResult Differs: Expected [{0}] - Actual [{1}]".format(result, response[0]))
domain.errors.append("Result Differs: Expected [{0}] - Actual [{1}]".format(result, response[0]))
return record_to_return

except (dns.resolver.NoNameservers, dns.resolver.NoAnswer, dns.exception.Timeout, dns.resolver.NXDOMAIN) as error:
handle_error("[SPF]", domain, error)

def spf_scan(resolver, domain):
"""
Scan a domain to see if it supports SPF. If the domain has an SPF
record, verify that it properly rejects mail sent from an IP known
to be disallowed.
Parameters
----------
resolver : dns.resolver.Resolver
The Resolver object to use for DNS queries.
domain : trustymail.Domain
The Domain object being scanned for SPF support. Any errors
will be logged to this object.
"""
# If an SPF record exists, record the raw SPF record text in the
# Domain object
record_text_not_following_redirect = get_spf_record_text(resolver, domain.domain_name, domain)
if record_text_not_following_redirect:
domain.spf.append(record_text_not_following_redirect)

record_text_following_redirect = get_spf_record_text(resolver, domain.domain_name, domain, True)
if record_text_following_redirect:
# From the found record grab the specific result when something
# doesn't match. Definitions of result come from
# https://www.ietf.org/rfc/rfc4408.txt
if record_text_following_redirect.endswith('-all'):
result = 'fail'
elif record_text_following_redirect.endswith('?all'):
result = 'neutral'
elif record_text_following_redirect.endswith('~all'):
result = 'softfail'
elif record_text_following_redirect.endswith('all') or record_text_following_redirect.endswith('+all'):
result = 'pass'
else:
result = 'neutral'

check_spf_record(record_text_not_following_redirect, result, domain)


def dmarc_scan(resolver, domain):
Expand Down

0 comments on commit 06caaa4

Please sign in to comment.