-
Notifications
You must be signed in to change notification settings - Fork 550
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds support to connections with Thrift over HTTP transport. #325
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ | |
from __future__ import absolute_import | ||
from __future__ import unicode_literals | ||
|
||
import base64 | ||
import datetime | ||
import re | ||
from decimal import Decimal | ||
|
@@ -28,6 +29,7 @@ | |
import thrift.protocol.TBinaryProtocol | ||
import thrift.transport.TSocket | ||
import thrift.transport.TTransport | ||
import thrift.transport.THttpClient | ||
|
||
# PEP 249 module globals | ||
apilevel = '2.0' | ||
|
@@ -94,12 +96,24 @@ | |
return Connection(*args, **kwargs) | ||
|
||
|
||
# TODO | ||
# Setting the Cookie in the headers should be implemented in the thrift library. | ||
# We'll keep this here until that change is available in there. | ||
class TCookieHttpClient(thrift.transport.THttpClient.THttpClient): | ||
def flush(self): | ||
super(TCookieHttpClient, self).flush() | ||
|
||
if 'Set-Cookie' in self.headers: | ||
self.setCustomHeaders( | ||
{'Cookie': self.headers['Set-Cookie']}) | ||
|
||
|
||
class Connection(object): | ||
"""Wraps a Thrift session""" | ||
|
||
def __init__(self, host=None, port=None, username=None, database='default', auth=None, | ||
configuration=None, kerberos_service_name=None, password=None, | ||
thrift_transport=None): | ||
thrift_transport=None, thrift_transport_protocol='binary', http_path=None): | ||
"""Connect to HiveServer2 | ||
|
||
:param host: What host HiveServer2 runs on | ||
|
@@ -119,9 +133,6 @@ | |
username = username or getpass.getuser() | ||
configuration = configuration or {} | ||
|
||
if (password is not None) != (auth in ('LDAP', 'CUSTOM')): | ||
raise ValueError("Password should be set if and only if in LDAP or CUSTOM mode; " | ||
"Remove password or use one of those modes") | ||
if (kerberos_service_name is not None) != (auth == 'KERBEROS'): | ||
raise ValueError("kerberos_service_name should be set if and only if in KERBEROS mode") | ||
if thrift_transport is not None: | ||
|
@@ -138,51 +149,29 @@ | |
|
||
if thrift_transport is not None: | ||
self._transport = thrift_transport | ||
elif thrift_transport_protocol == 'binary': | ||
self._transport = Connection. \ | ||
create_binary_transport(host=host, | ||
port=port, | ||
username=username, | ||
password=password, | ||
kerberos_service_name=kerberos_service_name, | ||
auth=auth) | ||
elif thrift_transport_protocol == 'http': | ||
self._transport = Connection.\ | ||
create_http_transport(host=host, | ||
username=username, | ||
port=port, | ||
http_path=http_path, | ||
password=password, | ||
kerberos_service_name=kerberos_service_name, | ||
auth=auth) | ||
else: | ||
if port is None: | ||
port = 10000 | ||
if auth is None: | ||
auth = 'NONE' | ||
socket = thrift.transport.TSocket.TSocket(host, port) | ||
if auth == 'NOSASL': | ||
# NOSASL corresponds to hive.server2.authentication=NOSASL in hive-site.xml | ||
self._transport = thrift.transport.TTransport.TBufferedTransport(socket) | ||
elif auth in ('LDAP', 'KERBEROS', 'NONE', 'CUSTOM'): | ||
# Defer import so package dependency is optional | ||
import sasl | ||
import thrift_sasl | ||
|
||
if auth == 'KERBEROS': | ||
# KERBEROS mode in hive.server2.authentication is GSSAPI in sasl library | ||
sasl_auth = 'GSSAPI' | ||
else: | ||
sasl_auth = 'PLAIN' | ||
if password is None: | ||
# Password doesn't matter in NONE mode, just needs to be nonempty. | ||
password = 'x' | ||
|
||
def sasl_factory(): | ||
sasl_client = sasl.Client() | ||
sasl_client.setAttr('host', host) | ||
if sasl_auth == 'GSSAPI': | ||
sasl_client.setAttr('service', kerberos_service_name) | ||
elif sasl_auth == 'PLAIN': | ||
sasl_client.setAttr('username', username) | ||
sasl_client.setAttr('password', password) | ||
else: | ||
raise AssertionError | ||
sasl_client.init() | ||
return sasl_client | ||
self._transport = thrift_sasl.TSaslClientTransport(sasl_factory, sasl_auth, socket) | ||
else: | ||
# All HS2 config options: | ||
# https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2#SettingUpHiveServer2-Configuration | ||
# PAM currently left to end user via thrift_transport option. | ||
raise NotImplementedError( | ||
"Only NONE, NOSASL, LDAP, KERBEROS, CUSTOM " | ||
"authentication are supported, got {}".format(auth)) | ||
|
||
protocol = thrift.protocol.TBinaryProtocol.TBinaryProtocol(self._transport) | ||
raise ValueError("Invalid thrift_transport_protocol: {}".format( | ||
thrift_transport_protocol)) | ||
|
||
protocol = thrift.protocol.TBinaryProtocol.TBinaryProtocol( | ||
self._transport) | ||
self._client = TCLIService.Client(protocol) | ||
# oldest version that still contains features we care about | ||
# "V6 uses binary type for binary payload (was string) and uses columnar result set" | ||
|
@@ -207,6 +196,109 @@ | |
self._transport.close() | ||
raise | ||
|
||
@staticmethod | ||
def create_http_transport(host, port, http_path, username, password, | ||
kerberos_service_name, auth): | ||
if port is None: | ||
port = 10001 | ||
if auth is None: | ||
auth = 'NONE' | ||
if http_path is None: | ||
http_path = '/' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It could be better to default this to |
||
|
||
socket = TCookieHttpClient('http://{}:{}{}'.format(host, port, http_path)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be great if we could also pass through a |
||
|
||
if auth == 'KERBEROS': | ||
import kerberos | ||
|
||
__, krb_context = kerberos.authGSSClientInit( | ||
service='{}@{}'.format(kerberos_service_name, host)) | ||
kerberos.authGSSClientClean(krb_context, '') | ||
kerberos.authGSSClientStep(krb_context, '') | ||
auth_header = kerberos.authGSSClientResponse(krb_context) | ||
|
||
socket.setCustomHeaders( | ||
{'Authorization': 'Negotiate {}'.format(auth_header)}) | ||
|
||
elif auth in ('BASIC', 'NOSASL', 'NONE'): | ||
if auth == 'BASIC' and password is None: | ||
raise ValueError("BASIC authentication requires password.") | ||
|
||
auth_credentials = '{}:{}'.format(username, password)\ | ||
.encode('UTF-8') | ||
auth_credentials_base64 = base64.standard_b64encode( | ||
auth_credentials).decode('UTF-8') | ||
|
||
# we're using the Authorization header for auth NONE or NOSASL because that's where | ||
# Hive gets the username when doAs is enabled | ||
socket.setCustomHeaders( | ||
{'Authorization': 'Basic {}'.format(auth_credentials_base64)}) | ||
|
||
else: | ||
raise NotImplementedError( | ||
"Only NONE, NOSASL, BASIC and KERBEROS authentication are supported " | ||
"when using HTTP transport, got {}".format(auth)) | ||
|
||
return thrift.transport.TTransport.TBufferedTransport(socket) | ||
|
||
@staticmethod | ||
def create_binary_transport(host, port, username, password, kerberos_service_name, auth): | ||
|
||
if port is None: | ||
port = 10000 | ||
if auth is None: | ||
auth = 'NONE' | ||
|
||
if (password is not None) != (auth in ('LDAP', 'CUSTOM')): | ||
raise ValueError( | ||
"Password should be set if and only if in LDAP or CUSTOM mode; " | ||
"Remove password or use one of those modes") | ||
|
||
socket = thrift.transport.TSocket.TSocket(host, port) | ||
|
||
if auth == 'NOSASL': | ||
# NOSASL corresponds to hive.server2.authentication=NOSASL in hive-site.xml | ||
transport = thrift.transport.TTransport.TBufferedTransport(socket) | ||
elif auth in ('LDAP', 'KERBEROS', 'NONE', 'CUSTOM'): | ||
# Defer import so package dependency is optional | ||
import sasl | ||
import thrift_sasl | ||
|
||
if auth == 'KERBEROS': | ||
# KERBEROS mode in hive.server2.authentication is GSSAPI in sasl library | ||
sasl_auth = 'GSSAPI' | ||
else: | ||
sasl_auth = 'PLAIN' | ||
if password is None: | ||
# Password doesn't matter in NONE mode, just needs to be nonempty. | ||
password = 'x' | ||
|
||
def sasl_factory(): | ||
sasl_client = sasl.Client() | ||
sasl_client.setAttr('host', host) | ||
if sasl_auth == 'GSSAPI': | ||
sasl_client.setAttr('service', kerberos_service_name) | ||
elif sasl_auth == 'PLAIN': | ||
sasl_client.setAttr('username', username) | ||
sasl_client.setAttr('password', password) | ||
else: | ||
raise AssertionError | ||
sasl_client.init() | ||
return sasl_client | ||
|
||
transport = thrift_sasl.TSaslClientTransport(sasl_factory, | ||
sasl_auth, | ||
socket) | ||
else: | ||
# All HS2 config options: | ||
# https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2#SettingUpHiveServer2-Configuration | ||
# PAM currently left to end user via thrift_transport option. | ||
raise NotImplementedError( | ||
"Only NONE, NOSASL, LDAP, KERBEROS, CUSTOM " | ||
"authentication are supported, got {}".format(auth)) | ||
|
||
return transport | ||
|
||
def __enter__(self): | ||
"""Transport should already be opened by __init__""" | ||
return self | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the meantime, this commit was merged into Thrift master branch, we just have to wait for a new Thrift release to get rid of this TCookieHttpClient:
apache/thrift@69642f3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this fix has been included in the 0.13.0 Release