-
Notifications
You must be signed in to change notification settings - Fork 0
/
message.py
118 lines (95 loc) · 3.33 KB
/
message.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# NNTP message encoding trivia
from email.Header import Header
from email.Charset import Charset
from StringIO import StringIO
import email.Charset
import base64
import quopri
import re
charset = Charset('utf-8')
charset.header_encoding = email.Charset.SHORTEST
charset.body_encoding = email.Charset.SHORTEST
line_end_re = re.compile(r'\r\n|\n\r|\n(?!\r)|\r(?!\n)')
def encode_header_word(word):
if type(word) is unicode:
# see if it is plain ascii first
try:
return word.encode('us-ascii')
except:
# try to encode non-ascii headers using email.Header. The
# 1000000 value is a maximum line length, meaning never
# fold header lines. This is important for the XOVER
# response.
return str(Header(word, charset, 1000000))
else:
return word
# base64 encode a string, splitting lines in the output
def base64encode(s):
buf=StringIO()
base64.encode(StringIO(s), buf)
return buf.getvalue()
class Message:
"""An object repesenting NNTP message data.
Acts as a mutable mapping containing the message headers."""
def __init__(self):
self.headers = {}
self.body = ''
self.headers['MIME-Version'] = '1.0'
def __len__(self):
return len(self.headers)
def __contains__(self, name):
return name in self.headers
def __getitem__(self, name):
return self.headers[name]
def __setitem__(self, name, value):
if type(value) is unicode:
value = encode_header_word(value)
else:
value = str(value)
self.headers[name] = value
def __delitem__(self, name):
del self.headers[name]
def set_body(self, value, content_type):
"""The the body of the message."""
encode = True
if type(value) is unicode:
# see if it is plain ascii first
try:
value = value.encode('us-ascii')
encode = False
except:
value = value.encode('utf-8')
content_type += "; charset='utf-8'"
else:
value = str(value)
self['Content-Type'] = content_type
if encode:
# use the shortest of quoted-printable and base64 encodings
qp = quopri.encodestring(value)
b64 = base64.b64encode(value)
if len(qp) <= len(b64):
self.body = qp
self['Content-Transfer-Encoding'] = 'quoted-printable'
else:
self.body = b64
self['Content-Transfer-Encoding'] = 'base64'
else:
self.body = value
def header_bytes(self):
"""Return the bytes for the message headers to be passed over NNTP."""
res = ''
for (h, v) in self.headers.items():
res += h + ': ' + v + '\r\n'
return res
def dot_stuffed_body(self):
"""Return the bytes for the dot-stuffed message body, to be
passed over NNTP."""
# take the body, normalize line endings, and dot-stuff
body = line_end_re.sub('\r\n', self.body)
if not body.endswith('\r\n'):
body += '\r\n'
if body.startswith('.'):
body = '.' + body
body = body.replace('\r\n.', '\r\n..')
body += '.\r\n'
return body