forked from mk-fg/fgtk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
maildir-cat
executable file
·141 lines (115 loc) · 4.77 KB
/
maildir-cat
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#!/usr/bin/env python3
import itertools as it, operator as op, functools as ft
import mailbox, email, email.header, email.charset, email.errors
import os, sys, re, pathlib as pl, collections as cs
def bytes_decode(b, enc, errors='strict'):
try: return b.decode(enc, errors)
except LookupError as err:
# Try to handle cp-850, unknown-8bit and such
if enc == 'unknown-8bit': enc_sub = 'utf-8'
else: enc_sub = enc.replace('-', '')
if enc_sub == enc: raise
try: return b.decode(enc_sub, errors)
except LookupError:
raise LookupError(enc, enc_sub) from None
def _mail_header_decode_part(line):
header = ''
for part, enc in email.header.decode_header(line):
if enc: part = bytes_decode(part, enc, 'replace')
if isinstance(part, bytes): part = part.decode('utf-8', 'replace')
# RFC2822#2.2.3 whitespace folding auto-adds spaces.
# But extra space can also be encoded in base64 or such,
# so this does not preserve exact number of encoded spaces.
if not header.endswith(' '): header += ' '
header += part.lstrip()
return header.strip()
def mail_header_decode(val):
res, header = list(), _mail_header_decode_part(val)
while True:
match = re.search('=\?[\w\d-]+(\*[\w\d-]+)?\?[QB]\?[^?]+\?=', header)
if not match:
res.append(header)
break
start, end = match.span(0)
match = header[start:end]
try: match = _mail_header_decode_part(match)
except email.errors.HeaderParseError: pass
res.extend([header[:start], match])
header = header[end:]
return ''.join(res)
def _mail_parse(msg):
headers = MailMsgHeaders((k.lower(), mail_header_decode(v)) for k,v in msg.items())
payload = ( msg.get_payload(decode=True)
if not msg.is_multipart() else list(map(_mail_parse, msg.get_payload())) )
if not headers.get('content-type'): headers['content-type'] = [msg.get_content_type()]
if headers.get_core('content-disposition') == 'attachment': payload = '<attachment scrubbed>'
elif isinstance(payload, bytes):
payload = bytes_decode(payload, msg.get_content_charset() or 'utf-8', 'replace')
return MailMsg(headers, payload)
def mail_parse(msg):
if isinstance(msg, (bytes, str)): msg = email.message_from_bytes(msg)
return _mail_parse(msg)
class MailMsg(cs.namedtuple('MailMsg', 'headers payload')):
@property
def all_parts(self):
return [self] if isinstance(self.payload, str)\
else sorted(it.chain.from_iterable(m.all_parts for m in self.payload), key=len)
def _text_ct_prio(self, part):
ct = part.headers.get('content-type')
if ct == 'text/plain': return 1
if ct.startswith('text/'): return 2
return 3
@property
def text(self):
return sorted(self.all_parts, key=self._text_ct_prio)[0].payload
class MailMsgHeaders(cs.UserDict):
def __init__(self, header_list):
super().__init__()
for k, v in header_list:
if k not in self: self[k] = list()
self[k].append(v)
def get(self, k, default=None, proc=op.itemgetter(0)):
hs = self.data.get(k)
if not hs: return default
return proc(hs)
def get_core(self, k, default=None):
return self.get(k, default, lambda hs: hs[0].split(';', 1)[0].strip())
def get_all(self, k, default=None):
return self.get(k, default, lambda x: x)
def dump_msg(pre, msg):
msg = mail_parse(msg)
header_list = 'from to subject date message-id reply-to sender'.split()
# header_list = sorted(msg.headers.keys())
for k in header_list:
for v in msg.headers.get_all(k, list()): print(f'{pre}{k.title()}: {v}')
print(pre)
for line in msg.text.strip().split('\n'): print(f'{pre}{line}')
def main(args=None):
import argparse
parser = argparse.ArgumentParser(
description='Tool to find all messages in the maildir, decode'
' MIME msg bodies and dump every line in these along with the filename'
' to stdout to run grep or any other search on them to find specific msg/file.')
parser.add_argument('maildir', nargs='*', default=['~/.maildir'],
help='Path to maildir(s) or individual msg file(s). Default: %(default)s.')
opts = parser.parse_args(sys.argv[1:] if args is None else args)
log_err = ft.partial(print, file=sys.stderr, flush=True)
for p in opts.maildir:
p_root_base = p = pl.Path(p)
p_root = p_root_base.expanduser().resolve()
if p_root.is_file():
try: msg = dump_msg(f'{p}: ', p_root.read_bytes())
except email.errors.MessageParseError: log_err('Malformed msg file: {p}')
continue
ps_root = str(p_root)
maildir = mailbox.Maildir(ps_root)
box_dirs = [maildir, *(maildir.get_folder(key) for key in maildir.list_folders())]
for box in box_dirs:
for key in box.keys():
ps = str((pl.Path(box._path) / box._lookup(key)).resolve())
assert ps.startswith(ps_root), [ps_root, ps]
p = p_root_base / ps[len(ps_root)+1:]
try: msg = box[key]
except email.errors.MessageParseError: log_err('Malformed msg file: {p}')
else: dump_msg(f'{p}: ', msg)
if __name__ == '__main__': sys.exit(main())