-
Notifications
You must be signed in to change notification settings - Fork 0
/
transmission.py
181 lines (136 loc) · 5.27 KB
/
transmission.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
"""
A Python wrapper for Transmission's RPC interface.
Took it from transmission-fluid project and modified to fit put.io's needs.
Original source code:
https://github.com/edavis/transmission-fluid/blob/6a33bfcd595bf322b85505e99a419e2e95c57d1a/transmission/__init__.py
>>> from transmission import Transmission
>>> client = Transmission()
>>> client('torrent-get', ids=range(1,11), fields=['name'])
{'torrents': [
{'name': u'Elvis spotted in Florida.mov'},
{'name': u'Bigfoot sings the hits'},
# ...
{'name': u'a-song-of-ice-and-fire_final-chapter.txt'}
]}
"""
import json
import calendar
import datetime
import unicodedata
import requests
DEFAULT_TIMEOUT = 10
_CSRF_ERROR_CODE = 409
_UNAUTHORIZED_ERROR_CODE = 401
_CSRF_HEADER = 'X-Transmission-Session-Id'
class TransmissionError(Exception):
pass
class Unauthorized(TransmissionError):
pass
class Transmission:
def __init__(self, host='localhost', port=9091, path='/transmission/rpc',
username=None, password=None, ssl=False, timeout=DEFAULT_TIMEOUT):
"""
Initialize the Transmission client.
The default host, port and path are all set to Transmission's
default.
"""
if ssl:
scheme = 'https'
else:
scheme = 'http'
self.url = "%s://%s:%d%s" % (scheme, host, port, path)
self.headers = {} # type: Dict[str, str]
self.tag = 0
self.timeout = timeout
self.auth = None # type: Tuple[str, str]
if username or password:
self.auth = (username, password)
def call(self, method, **kwargs):
"""
Send request to Transmission's RPC interface.
"""
response = self._do_request(method, **kwargs)
return self._deserialize_response(response)
def _do_request(self, method, **kwargs):
body = json.dumps(self._format_request_body(method, **kwargs), cls=_TransmissionJSONEncoder).encode()
response = requests.post(self.url, data=body, headers=self.headers, auth=self.auth, timeout=self.timeout)
if response.status_code == _CSRF_ERROR_CODE:
self.headers[_CSRF_HEADER] = response.headers[_CSRF_HEADER]
return self._do_request(method, **kwargs)
elif response.status_code == _UNAUTHORIZED_ERROR_CODE:
raise Unauthorized("Check Username and Password")
response.raise_for_status()
return response
def _format_request_body(self, method, **kwargs):
"""
Create a request object to be serialized and sent to Transmission.
"""
# As Python can't accept dashes in kwargs keys, replace any
# underscores with them here.
fixed = {}
for k, v in kwargs.items():
fixed[k.replace('_', '-')] = v
return {"method": method, "tag": self.tag, "arguments": fixed}
def _deserialize_response(self, response):
"""
Return the response generated by the request object, raising
TransmissionError if there were any problems.
"""
text = response.content.decode(errors='replace')
text = _remove_control_characters(text)
doc = json.loads(text, cls=_TransmissionJSONDecoder)
if doc['result'] != 'success':
raise TransmissionError("Request failed: '%s'" % doc['result'])
if doc['tag'] != self.tag:
raise TransmissionError("Tag mismatch: (got %d, expected %d)" % (doc['tag'], self.tag))
else:
self.tag += 1
if 'arguments' in doc:
return doc['arguments'] or None
return None
class _UTC(datetime.tzinfo):
def utcoffset(self, dt):
return datetime.timedelta(0)
def tzname(self, dt):
return 'UTC'
def dst(self, dt):
return datetime.timedelta(0)
def _epoch_to_datetime(value):
return datetime.datetime.fromtimestamp(value, _UTC())
def _datetime_to_epoch(value):
if isinstance(value, datetime.datetime):
return calendar.timegm(value.utctimetuple())
elif isinstance(value, datetime.date):
new_dt = datetime.datetime(value.year, value.month, value.day)
return calendar.timegm(new_dt.utctimetuple())
class _TransmissionJSONDecoder(json.JSONDecoder):
# UNIX epochs to be turned into UTC datetimes
TIMESTAMP_KEYS = frozenset(
['activityDate',
'addedDate',
'dateCreated',
'doneDate',
'startDate',
'lastAnnounceStartTime',
'lastAnnounceTime',
'lastScrapeStartTime',
'lastScrapeTime',
'nextAnnounceTime',
'nextScrapeTime'])
def __init__(self, **kwargs):
return super(_TransmissionJSONDecoder, self).__init__(object_hook=self.object_hook, **kwargs)
def object_hook(self, obj):
for key, value in obj.items():
if key in self.TIMESTAMP_KEYS:
value = _epoch_to_datetime(value)
obj[key] = value
return obj
class _TransmissionJSONEncoder(json.JSONEncoder):
def default(self, value):
# datetime is a subclass of date, so this'll catch both
if isinstance(value, datetime.date):
return _datetime_to_epoch(value)
else:
return value
def _remove_control_characters(s):
return "".join(ch for ch in s if unicodedata.category(ch)[0] != "C")