-
Notifications
You must be signed in to change notification settings - Fork 2
/
iconvcodec.py
131 lines (108 loc) · 4.38 KB
/
iconvcodec.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import sys, iconv, codecs, errno
_ENCODE_REPLACECHAR = "?".encode()
_DECODE_REPLACECHAR = u"\uFFFD".encode()
def _iconv_encode_impl(encoder, msg, errors, bufsize=None):
if bufsize is None:
bufsize = len(msg)
try:
return encoder.iconv(msg, bufsize), len(msg)
except iconv.error as e:
errstring, code, inlen, outres = e.args
if code == errno.E2BIG:
# outbuffer was too small, increase size a bit and try to encode rest
out1, len1 = _iconv_encode_impl(
encoder, msg[inlen:], errors, bufsize - inlen + max(bufsize // 10, 3)
)
return outres + out1, inlen + len1
if code == errno.EINVAL:
# An incomplete multibyte sequence has been
# encountered in the input. Should not happen in Unicode
raise AssertionError("EINVAL in encode")
if code == errno.EILSEQ:
# An invalid multibyte sequence has been encountered
# in the input. Used to indicate that the character is
# not supported in the target code
if errors == "strict":
raise UnicodeError(*e.args)
if errors == "replace":
out1, len1 = _iconv_encode_impl(
encoder,
(_ENCODE_REPLACECHAR + msg[inlen:].decode()[1:].encode()),
errors,
)
elif errors == "ignore":
out1, len1 = _iconv_encode_impl(
encoder, msg[inlen:].decode()[1:].encode(), errors
)
else:
raise ValueError("unsupported error handling")
return outres + out1, inlen + len1 + 1
raise
def _iconv_decode_impl(decoder, msg, errors, bufsize=None):
if bufsize is None:
bufsize = len(msg)
try:
return decoder.iconv(msg, bufsize).decode(), len(msg)
except iconv.error as e:
errstring, code, inlen, outres = e.args
if code == errno.E2BIG:
# buffer too small
out1, len1 = _iconv_decode_impl(
decoder, msg[inlen:], errors, bufsize - inlen + max(bufsize // 10, 3)
)
return outres.decode() + out1, inlen + len1
if code == errno.EINVAL:
# An incomplete multibyte sequence has been
# encountered in the input.
return outres.decode(), inlen
if code == errno.EILSEQ:
# An invalid multibyte sequence has been encountered
# in the input. Ignoring or replacing it is hard to
# achieve, just try one character at a time
if errors == "strict":
raise UnicodeError(*e.args)
if errors == "replace":
outres += _DECODE_REPLACECHAR
out1, len1 = _iconv_decode_impl(decoder, msg[inlen + 1 :], errors)
elif errors == "ignore":
out1, len1 = _iconv_decode_impl(decoder, msg[inlen + 1 :], errors)
else:
raise ValueError("unsupported error handling")
return outres.decode() + out1, inlen + len1 + 1
def codec_factory(encoding):
encoder = iconv.open(encoding, "utf-8")
decoder = iconv.open("utf-8", encoding)
def encode(inp, errors="strict"):
msg = inp.encode()
return _iconv_encode_impl(encoder, msg, errors)
def decode(msg, errors="strict"):
return _iconv_decode_impl(decoder, msg, errors)
return encode, decode
def lookup(encoding):
try:
encode, decode = codec_factory(encoding)
except ValueError:
# Encoding not supported by iconv
return None
class StreamWriter(codecs.StreamWriter):
nonlocal encode
class StreamReader(codecs.StreamReader):
nonlocal decode
class IncrementalEncoder(codecs.IncrementalEncoder):
def encode(self, input, final=False):
nonlocal encode
return encode(input, self.errors)[0]
class IncrementalDecoder(codecs.BufferedIncrementalDecoder):
def _buffer_decode(self, input, errors, final):
nonlocal decode
return decode(input, errors)
return codecs.CodecInfo(
name=encoding,
encode=encode,
decode=decode,
streamreader=StreamReader,
streamwriter=StreamWriter,
incrementalencoder=IncrementalEncoder,
incrementaldecoder=IncrementalDecoder,
)
codecs.register(lookup)