Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support to connections with Thrift over HTTP transport. #325

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 140 additions & 48 deletions pyhive/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from __future__ import absolute_import
from __future__ import unicode_literals

import base64
import datetime
import re
from decimal import Decimal
Expand All @@ -28,6 +29,7 @@
import thrift.protocol.TBinaryProtocol
import thrift.transport.TSocket
import thrift.transport.TTransport
import thrift.transport.THttpClient

# PEP 249 module globals
apilevel = '2.0'
Expand Down Expand Up @@ -94,12 +96,24 @@ def connect(*args, **kwargs):
return Connection(*args, **kwargs)


# TODO
# Setting the Cookie in the headers should be implemented in the thrift library.
# We'll keep this here until that change is available in there.
class TCookieHttpClient(thrift.transport.THttpClient.THttpClient):
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the meantime, this commit was merged into Thrift master branch, we just have to wait for a new Thrift release to get rid of this TCookieHttpClient:

apache/thrift@69642f3

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this fix has been included in the 0.13.0 Release

def flush(self):
super(TCookieHttpClient, self).flush()

if self.headers['Set-Cookie'] is not None:
self.setCustomHeaders(
{'Cookie': self.headers['Set-Cookie']})


class Connection(object):
"""Wraps a Thrift session"""

def __init__(self, host=None, port=None, username=None, database='default', auth=None,
configuration=None, kerberos_service_name=None, password=None,
thrift_transport=None):
thrift_transport=None, thrift_transport_protocol='binary'):
"""Connect to HiveServer2

:param host: What host HiveServer2 runs on
Expand All @@ -119,9 +133,6 @@ def __init__(self, host=None, port=None, username=None, database='default', auth
username = username or getpass.getuser()
configuration = configuration or {}

if (password is not None) != (auth in ('LDAP', 'CUSTOM')):
raise ValueError("Password should be set if and only if in LDAP or CUSTOM mode; "
"Remove password or use one of those modes")
if (kerberos_service_name is not None) != (auth == 'KERBEROS'):
raise ValueError("kerberos_service_name should be set if and only if in KERBEROS mode")
if thrift_transport is not None:
Expand All @@ -138,51 +149,28 @@ def __init__(self, host=None, port=None, username=None, database='default', auth

if thrift_transport is not None:
self._transport = thrift_transport
elif thrift_transport_protocol == 'binary':
self._transport = Connection. \
__create_binary_transport(host=host,
username=username,
password=password,
kerberos_service_name=kerberos_service_name,
port=port,
auth=auth)
elif thrift_transport_protocol == 'http':
self._transport = Connection.\
__create_http_transport(host=host,
username=username,
password=password,
kerberos_service_name=kerberos_service_name,
port=port,
auth=auth)
else:
if port is None:
port = 10000
if auth is None:
auth = 'NONE'
socket = thrift.transport.TSocket.TSocket(host, port)
if auth == 'NOSASL':
# NOSASL corresponds to hive.server2.authentication=NOSASL in hive-site.xml
self._transport = thrift.transport.TTransport.TBufferedTransport(socket)
elif auth in ('LDAP', 'KERBEROS', 'NONE', 'CUSTOM'):
# Defer import so package dependency is optional
import sasl
import thrift_sasl

if auth == 'KERBEROS':
# KERBEROS mode in hive.server2.authentication is GSSAPI in sasl library
sasl_auth = 'GSSAPI'
else:
sasl_auth = 'PLAIN'
if password is None:
# Password doesn't matter in NONE mode, just needs to be nonempty.
password = 'x'

def sasl_factory():
sasl_client = sasl.Client()
sasl_client.setAttr('host', host)
if sasl_auth == 'GSSAPI':
sasl_client.setAttr('service', kerberos_service_name)
elif sasl_auth == 'PLAIN':
sasl_client.setAttr('username', username)
sasl_client.setAttr('password', password)
else:
raise AssertionError
sasl_client.init()
return sasl_client
self._transport = thrift_sasl.TSaslClientTransport(sasl_factory, sasl_auth, socket)
else:
# All HS2 config options:
# https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2#SettingUpHiveServer2-Configuration
# PAM currently left to end user via thrift_transport option.
raise NotImplementedError(
"Only NONE, NOSASL, LDAP, KERBEROS, CUSTOM "
"authentication are supported, got {}".format(auth))

protocol = thrift.protocol.TBinaryProtocol.TBinaryProtocol(self._transport)
raise ValueError("Invalid thrift_transport_protocol: {}".format(
thrift_transport_protocol))

protocol = thrift.protocol.TBinaryProtocol.TBinaryProtocol(
self._transport)
self._client = TCLIService.Client(protocol)
# oldest version that still contains features we care about
# "V6 uses binary type for binary payload (was string) and uses columnar result set"
Expand All @@ -207,6 +195,110 @@ def sasl_factory():
self._transport.close()
raise

@staticmethod
def __create_http_transport(host, username, password, kerberos_service_name,
port=10001, auth='KERBEROS'):
if auth == 'KERBEROS':
import kerberos

__, krb_context = kerberos.authGSSClientInit(
service='{}@{}'.format(kerberos_service_name, host))
kerberos.authGSSClientClean(krb_context, '')
kerberos.authGSSClientStep(krb_context, '')
auth_header = kerberos.authGSSClientResponse(krb_context)

socket = TCookieHttpClient(uri_or_host=host,
port=port,
path='/')

socket.setCustomHeaders(
{'Authorization': 'Negotiate {}'.format(auth_header)})

elif auth == 'BASIC':
if username is None or password is None:
raise ValueError("BASIC authentication require username and password.")

socket = TCookieHttpClient(uri_or_host=host,
port=port,
path='/')

auth_credentials = '{}:{}'.format(username, password)\
.encode('UTF-8')
auth_credentials_base64 = base64.standard_b64encode(
auth_credentials).decode('UTF-8')

socket.setCustomHeaders(
{'Authorization': 'Basic {}'.format(auth_credentials_base64)})

elif auth == 'NONE':
socket = thrift.transport.THttpClient.THttpClient(host, port, '/')

else:
raise NotImplementedError(
"Only NONE, BASIC and KERBEROS authentication are supported "
"when using HTTP transport, got {}".format(auth))

return thrift.transport.TTransport.TBufferedTransport(socket)

@staticmethod
def __create_binary_transport(host, username, password,
kerberos_service_name, port, auth):

if port is None:
port = 10000
if auth is None:
auth = 'NONE'

if (password is not None) != (auth in ('LDAP', 'CUSTOM')):
raise ValueError(
"Password should be set if and only if in LDAP or CUSTOM mode; "
"Remove password or use one of those modes")

socket = thrift.transport.TSocket.TSocket(host, port)

if auth == 'NOSASL':
# NOSASL corresponds to hive.server2.authentication=NOSASL in hive-site.xml
transport = thrift.transport.TTransport.TBufferedTransport(socket)
elif auth in ('LDAP', 'KERBEROS', 'NONE', 'CUSTOM'):
# Defer import so package dependency is optional
import sasl
import thrift_sasl

if auth == 'KERBEROS':
# KERBEROS mode in hive.server2.authentication is GSSAPI in sasl library
sasl_auth = 'GSSAPI'
else:
sasl_auth = 'PLAIN'
if password is None:
# Password doesn't matter in NONE mode, just needs to be nonempty.
password = 'x'

def sasl_factory():
sasl_client = sasl.Client()
sasl_client.setAttr('host', host)
if sasl_auth == 'GSSAPI':
sasl_client.setAttr('service', kerberos_service_name)
elif sasl_auth == 'PLAIN':
sasl_client.setAttr('username', username)
sasl_client.setAttr('password', password)
else:
raise AssertionError
sasl_client.init()
return sasl_client

transport = thrift_sasl.TSaslClientTransport(sasl_factory,
sasl_auth,
socket)
else:
# All HS2 config options:
# https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2#SettingUpHiveServer2-Configuration
# PAM currently left to end user via thrift_transport option.
raise NotImplementedError(
"Only NONE, NOSASL, LDAP, KERBEROS, CUSTOM "
"authentication are supported, got {}".format(auth))

return transport

def __enter__(self):
"""Transport should already be opened by __init__"""
return self
Expand Down