forked from syrusakbary/validate_email
-
Notifications
You must be signed in to change notification settings - Fork 0
/
validate_email.py
212 lines (190 loc) · 8.17 KB
/
validate_email.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# RFC 2822 - style email validation for Python
# (c) 2012 Syrus Akbary <me@syrusakbary.com>
# Extended from (c) 2011 Noel Bush <noel@aitools.org>
# for support of mx and user check
# This code is made available to you under the GNU LGPL v3.
#
# This module provides a single method, valid_email_address(),
# which returns True or False to indicate whether a given address
# is valid according to the 'addr-spec' part of the specification
# given in RFC 2822. Ideally, we would like to find this
# in some other library, already thoroughly tested and well-
# maintained. The standard Python library email.utils
# contains a parse_addr() function, but it is not sufficient
# to detect many malformed addresses.
#
# This implementation aims to be faithful to the RFC, with the
# exception of a circular definition (see comments below), and
# with the omission of the pattern components marked as "obsolete".
import re
import smtplib
import logging
import socket
try:
raw_input
except NameError:
def raw_input(prompt=''):
return input(prompt)
try:
import DNS
ServerError = DNS.ServerError
DNS.DiscoverNameServers()
except (ImportError, AttributeError):
DNS = None
class ServerError(Exception):
pass
# All we are really doing is comparing the input string to one
# gigantic regular expression. But building that regexp, and
# ensuring its correctness, is made much easier by assembling it
# from the "tokens" defined by the RFC. Each of these tokens is
# tested in the accompanying unit test file.
#
# The section of RFC 2822 from which each pattern component is
# derived is given in an accompanying comment.
#
# (To make things simple, every string below is given as 'raw',
# even when it's not strictly necessary. This way we don't forget
# when it is necessary.)
#
WSP = r'[\s]' # see 2.2.2. Structured Header Field Bodies
CRLF = r'(?:\r\n)' # see 2.2.3. Long Header Fields
NO_WS_CTL = r'\x01-\x08\x0b\x0c\x0f-\x1f\x7f' # see 3.2.1. Primitive Tokens
QUOTED_PAIR = r'(?:\\.)' # see 3.2.2. Quoted characters
FWS = r'(?:(?:' + WSP + r'*' + CRLF + r')?' + \
WSP + r'+)' # see 3.2.3. Folding white space and comments
CTEXT = r'[' + NO_WS_CTL + \
r'\x21-\x27\x2a-\x5b\x5d-\x7e]' # see 3.2.3
CCONTENT = r'(?:' + CTEXT + r'|' + \
QUOTED_PAIR + r')' # see 3.2.3 (NB: The RFC includes COMMENT here
# as well, but that would be circular.)
COMMENT = r'\((?:' + FWS + r'?' + CCONTENT + \
r')*' + FWS + r'?\)' # see 3.2.3
CFWS = r'(?:' + FWS + r'?' + COMMENT + ')*(?:' + \
FWS + '?' + COMMENT + '|' + FWS + ')' # see 3.2.3
ATEXT = r'[\w!#$%&\'\*\+\-/=\?\^`\{\|\}~]' # see 3.2.4. Atom
ATOM = CFWS + r'?' + ATEXT + r'+' + CFWS + r'?' # see 3.2.4
DOT_ATOM_TEXT = ATEXT + r'+(?:\.' + ATEXT + r'+)*' # see 3.2.4
DOT_ATOM = CFWS + r'?' + DOT_ATOM_TEXT + CFWS + r'?' # see 3.2.4
QTEXT = r'[' + NO_WS_CTL + \
r'\x21\x23-\x5b\x5d-\x7e]' # see 3.2.5. Quoted strings
QCONTENT = r'(?:' + QTEXT + r'|' + \
QUOTED_PAIR + r')' # see 3.2.5
QUOTED_STRING = CFWS + r'?' + r'"(?:' + FWS + \
r'?' + QCONTENT + r')*' + FWS + \
r'?' + r'"' + CFWS + r'?'
LOCAL_PART = r'(?:' + DOT_ATOM + r'|' + \
QUOTED_STRING + r')' # see 3.4.1. Addr-spec specification
DTEXT = r'[' + NO_WS_CTL + r'\x21-\x5a\x5e-\x7e]' # see 3.4.1
DCONTENT = r'(?:' + DTEXT + r'|' + \
QUOTED_PAIR + r')' # see 3.4.1
DOMAIN_LITERAL = CFWS + r'?' + r'\[' + \
r'(?:' + FWS + r'?' + DCONTENT + \
r')*' + FWS + r'?\]' + CFWS + r'?' # see 3.4.1
DOMAIN = r'(?:' + DOT_ATOM + r'|' + \
DOMAIN_LITERAL + r')' # see 3.4.1
ADDR_SPEC = LOCAL_PART + r'@' + DOMAIN # see 3.4.1
# A valid address will match exactly the 3.4.1 addr-spec.
VALID_ADDRESS_REGEXP = '^' + ADDR_SPEC + '$'
MX_DNS_CACHE = {}
MX_CHECK_CACHE = {}
def get_mx_ip(hostname):
if hostname not in MX_DNS_CACHE:
try:
MX_DNS_CACHE[hostname] = DNS.mxlookup(hostname)
except ServerError as e:
if e.rcode == 3 or e.rcode == 2: # NXDOMAIN (Non-Existent Domain) or SERVFAIL
MX_DNS_CACHE[hostname] = None
else:
raise
return MX_DNS_CACHE[hostname]
def validate_email(email, check_mx=False, verify=False, debug=False, smtp_timeout=10):
"""Indicate whether the given string is a valid email address
according to the 'addr-spec' portion of RFC 2822 (see section
3.4.1). Parts of the spec that are marked obsolete are *not*
included in this test, and certain arcane constructions that
depend on circular definitions in the spec may not pass, but in
general this should correctly identify any email address likely
to be in use as of 2011."""
if debug:
logger = logging.getLogger('validate_email')
logger.setLevel(logging.DEBUG)
else:
logger = None
try:
assert re.match(VALID_ADDRESS_REGEXP, email) is not None
check_mx |= verify
if check_mx:
if not DNS:
raise Exception('For check the mx records or check if the email exists you must '
'have installed pyDNS python package')
hostname = email[email.find('@') + 1:]
mx_hosts = get_mx_ip(hostname)
if mx_hosts is None:
return False
for mx in mx_hosts:
try:
if not verify and mx[1] in MX_CHECK_CACHE:
return MX_CHECK_CACHE[mx[1]]
smtp = smtplib.SMTP(timeout=smtp_timeout)
smtp.connect(mx[1])
MX_CHECK_CACHE[mx[1]] = True
if not verify:
try:
smtp.quit()
except smtplib.SMTPServerDisconnected:
pass
return True
status, _ = smtp.helo()
if status != 250:
smtp.quit()
if debug:
logger.debug(u'%s answer: %s - %s', mx[1], status, _)
continue
smtp.mail('')
status, _ = smtp.rcpt(email)
if status == 250:
smtp.quit()
return True
if debug:
logger.debug(u'%s answer: %s - %s', mx[1], status, _)
smtp.quit()
except smtplib.SMTPServerDisconnected: # Server not permits verify user
if debug:
logger.debug(u'%s disconected.', mx[1])
except smtplib.SMTPConnectError:
if debug:
logger.debug(u'Unable to connect to %s.', mx[1])
return None
except AssertionError:
return False
except (ServerError, socket.error) as e:
if debug:
logger.debug('ServerError or socket.error exception raised (%s).', e)
return None
return True
if __name__ == "__main__":
import time
while True:
email = raw_input('Enter email for validation: ')
mx = raw_input('Validate MX record? [yN] ')
if mx.strip().lower() == 'y':
mx = True
else:
mx = False
validate = raw_input('Try to contact server for address validation? [yN] ')
if validate.strip().lower() == 'y':
validate = True
else:
validate = False
logging.basicConfig()
result = validate_email(email, mx, validate, debug=True, smtp_timeout=1)
if result:
print("Valid!")
elif result is None:
print("I'm not sure.")
else:
print("Invalid!")
time.sleep(1)
# import sys
# sys.modules[__name__],sys.modules['validate_email_module'] = validate_email,sys.modules[__name__]
# from validate_email_module import *