-
Notifications
You must be signed in to change notification settings - Fork 552
/
presto.py
330 lines (275 loc) Β· 13.3 KB
/
presto.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
"""DB-API implementation backed by Presto
See http://www.python.org/dev/peps/pep-0249/
Many docstrings in this file are based on the PEP, which is in the public domain.
"""
from __future__ import absolute_import
from __future__ import unicode_literals
from builtins import object
from pyhive import common
from pyhive.common import DBAPITypeObject
# Make all exceptions visible in this module per DB-API
from pyhive.exc import * # noqa
import base64
import getpass
import logging
import requests
from requests.auth import HTTPBasicAuth
from requests_kerberos import HTTPKerberosAuth, OPTIONAL
import os
try: # Python 3
import urllib.parse as urlparse
except ImportError: # Python 2
import urlparse
# PEP 249 module globals
apilevel = '2.0'
threadsafety = 2 # Threads may share the module and connections.
paramstyle = 'pyformat' # Python extended format codes, e.g. ...WHERE name=%(name)s
_logger = logging.getLogger(__name__)
_escaper = common.ParamEscaper()
def connect(*args, **kwargs):
"""Constructor for creating a connection to the database. See class :py:class:`Connection` for
arguments.
:returns: a :py:class:`Connection` object.
"""
return Connection(*args, **kwargs)
class Connection(object):
"""Presto does not have a notion of a persistent connection.
Thus, these objects are small stateless factories for cursors, which do all the real work.
"""
def __init__(self, *args, **kwargs):
self._args = args
self._kwargs = kwargs
def close(self):
"""Presto does not have anything to close"""
# TODO cancel outstanding queries?
pass
def commit(self):
"""Presto does not support transactions"""
pass
def cursor(self):
"""Return a new :py:class:`Cursor` object using the connection."""
return Cursor(*self._args, **self._kwargs)
def rollback(self):
raise NotSupportedError("Presto does not have transactions") # pragma: no cover
class Cursor(common.DBAPICursor):
"""These objects represent a database cursor, which is used to manage the context of a fetch
operation.
Cursors are not isolated, i.e., any changes done to the database by a cursor are immediately
visible by other cursors or connections.
"""
def __init__(self, host, port='8080', username=None, catalog='hive',
schema='default', poll_interval=1, source='pyhive', session_props=None,
protocol='http', password=None, requests_session=None, requests_kwargs=None,
KerberosRemoteServiceName=None, KerberosPrincipal=None,
KerberosConfigPath=None, KerberosKeytabPath=None,
KerberosCredentialCachePath=None, KerberosUseCanonicalHostname=None):
"""
:param host: hostname to connect to, e.g. ``presto.example.com``
:param port: int -- port, defaults to 8080
:param username: string -- defaults to system user name
:param catalog: string -- defaults to ``hive``
:param schema: string -- defaults to ``default``
:param poll_interval: int -- how often to ask the Presto REST interface for a progress
update, defaults to a second
:param source: string -- arbitrary identifier (shows up in the Presto monitoring page)
:param protocol: string -- network protocol, valid options are ``http`` and ``https``.
defaults to ``http``
:param password: string -- Deprecated. Defaults to ``None``.
Using BasicAuth, requires ``https``.
Prefer ``requests_kwargs={'auth': HTTPBasicAuth(username, password)}``.
May not be specified with ``requests_kwargs['auth']``.
:param requests_session: a ``requests.Session`` object for advanced usage. If absent, this
class will use the default requests behavior of making a new session per HTTP request.
Caller is responsible for closing session.
:param requests_kwargs: Additional ``**kwargs`` to pass to requests
:param KerberosRemoteServiceName: string -- Presto coordinator Kerberos service name.
This parameter is required for Kerberos authentiation.
:param KerberosPrincipal: string -- The principal to use when authenticating to
the Presto coordinator.
:param KerberosConfigPath: string -- Kerberos configuration file.
(default: /etc/krb5.conf)
:param KerberosKeytabPath: string -- Kerberos keytab file.
:param KerberosCredentialCachePath: string -- Kerberos credential cache.
:param KerberosUseCanonicalHostname: boolean -- Use the canonical hostname of the
Presto coordinator for the Kerberos service principal by first resolving the
hostname to an IP address and then doing a reverse DNS lookup for that IP address.
This is enabled by default.
"""
super(Cursor, self).__init__(poll_interval)
# Config
self._host = host
self._port = port
self._username = username or getpass.getuser()
self._catalog = catalog
self._schema = schema
self._arraysize = 1
self._poll_interval = poll_interval
self._source = source
self._session_props = session_props if session_props is not None else {}
if protocol not in ('http', 'https'):
raise ValueError("Protocol must be http/https, was {!r}".format(protocol))
self._protocol = protocol
self._requests_session = requests_session or requests
requests_kwargs = dict(requests_kwargs) if requests_kwargs is not None else {}
if KerberosRemoteServiceName is not None:
hostname_override = None
if KerberosUseCanonicalHostname is not None \
and KerberosUseCanonicalHostname.lower() == 'false':
hostname_override = host
if KerberosConfigPath is not None:
os.environ['KRB5_CONFIG'] = KerberosConfigPath
if KerberosKeytabPath is not None:
os.environ['KRB5_CLIENT_KTNAME'] = KerberosKeytabPath
if KerberosCredentialCachePath is not None:
os.environ['KRB5CCNAME'] = KerberosCredentialCachePath
requests_kwargs['auth'] = HTTPKerberosAuth(mutual_authentication=OPTIONAL,
principal=KerberosPrincipal,
service=KerberosRemoteServiceName,
hostname_override=hostname_override)
else:
if password is not None and 'auth' in requests_kwargs:
raise ValueError("Cannot use both password and requests_kwargs authentication")
for k in ('method', 'url', 'data', 'headers'):
if k in requests_kwargs:
raise ValueError("Cannot override requests argument {}".format(k))
if password is not None:
requests_kwargs['auth'] = HTTPBasicAuth(username, password)
if protocol != 'https':
raise ValueError("Protocol must be https when passing a password")
self._requests_kwargs = requests_kwargs
self._reset_state()
def _reset_state(self):
"""Reset state about the previous query in preparation for running another query"""
super(Cursor, self)._reset_state()
self._nextUri = None
self._columns = None
@property
def description(self):
"""This read-only attribute is a sequence of 7-item sequences.
Each of these sequences contains information describing one result column:
- name
- type_code
- display_size (None in current implementation)
- internal_size (None in current implementation)
- precision (None in current implementation)
- scale (None in current implementation)
- null_ok (always True in current implementation)
The ``type_code`` can be interpreted by comparing it to the Type Objects specified in the
section below.
"""
# Sleep until we're done or we got the columns
self._fetch_while(
lambda: self._columns is None and
self._state not in (self._STATE_NONE, self._STATE_FINISHED)
)
if self._columns is None:
return None
return [
# name, type_code, display_size, internal_size, precision, scale, null_ok
(col['name'], col['type'], None, None, None, None, True)
for col in self._columns
]
def execute(self, operation, parameters=None):
"""Prepare and execute a database operation (query or command).
Return values are not defined.
"""
headers = {
'X-Presto-Catalog': self._catalog,
'X-Presto-Schema': self._schema,
'X-Presto-Source': self._source,
'X-Presto-User': self._username,
}
if self._session_props:
headers['X-Presto-Session'] = ','.join(
'{}={}'.format(propname, propval)
for propname, propval in self._session_props.items()
)
# Prepare statement
if parameters is None:
sql = operation
else:
sql = operation % _escaper.escape_args(parameters)
self._reset_state()
self._state = self._STATE_RUNNING
url = urlparse.urlunparse((
self._protocol,
'{}:{}'.format(self._host, self._port), '/v1/statement', None, None, None))
_logger.info('%s', sql)
_logger.debug("Headers: %s", headers)
response = self._requests_session.post(
url, data=sql.encode('utf-8'), headers=headers, **self._requests_kwargs)
self._process_response(response)
def cancel(self):
if self._state == self._STATE_NONE:
raise ProgrammingError("No query yet")
if self._nextUri is None:
assert self._state == self._STATE_FINISHED, "Should be finished if nextUri is None"
return
response = self._requests_session.delete(self._nextUri, **self._requests_kwargs)
if response.status_code != requests.codes.no_content:
fmt = "Unexpected status code after cancel {}\n{}"
raise OperationalError(fmt.format(response.status_code, response.content))
self._state = self._STATE_FINISHED
self._nextUri = None
def poll(self):
"""Poll for and return the raw status data provided by the Presto REST API.
:returns: dict -- JSON status information or ``None`` if the query is done
:raises: ``ProgrammingError`` when no query has been started
.. note::
This is not a part of DB-API.
"""
if self._state == self._STATE_NONE:
raise ProgrammingError("No query yet")
if self._nextUri is None:
assert self._state == self._STATE_FINISHED, "Should be finished if nextUri is None"
return None
response = self._requests_session.get(self._nextUri, **self._requests_kwargs)
self._process_response(response)
return response.json()
def _fetch_more(self):
"""Fetch the next URI and update state"""
self._process_response(self._requests_session.get(self._nextUri, **self._requests_kwargs))
def _decode_binary(self, rows):
# As of Presto 0.69, binary data is returned as the varbinary type in base64 format
# This function decodes base64 data in place
for i, col in enumerate(self.description):
if col[1] == 'varbinary':
for row in rows:
if row[i] is not None:
row[i] = base64.b64decode(row[i])
def _process_response(self, response):
"""Given the JSON response from Presto's REST API, update the internal state with the next
URI and any data from the response
"""
# TODO handle HTTP 503
if response.status_code != requests.codes.ok:
fmt = "Unexpected status code {}\n{}"
raise OperationalError(fmt.format(response.status_code, response.content))
response_json = response.json()
_logger.debug("Got response %s", response_json)
assert self._state == self._STATE_RUNNING, "Should be running if processing response"
self._nextUri = response_json.get('nextUri')
self._columns = response_json.get('columns')
if 'X-Presto-Clear-Session' in response.headers:
propname = response.headers['X-Presto-Clear-Session']
self._session_props.pop(propname, None)
if 'X-Presto-Set-Session' in response.headers:
propname, propval = response.headers['X-Presto-Set-Session'].split('=', 1)
self._session_props[propname] = propval
if 'data' in response_json:
assert self._columns
new_data = response_json['data']
self._decode_binary(new_data)
self._data += map(tuple, new_data)
if 'nextUri' not in response_json:
self._state = self._STATE_FINISHED
if 'error' in response_json:
raise DatabaseError(response_json['error'])
#
# Type Objects and Constructors
#
# See types in presto-main/src/main/java/com/facebook/presto/tuple/TupleInfo.java
FIXED_INT_64 = DBAPITypeObject(['bigint'])
VARIABLE_BINARY = DBAPITypeObject(['varchar'])
DOUBLE = DBAPITypeObject(['double'])
BOOLEAN = DBAPITypeObject(['boolean'])