Skip to content

Commit

Permalink
[Append] Hopsworks client to work with Hive Engine (#201)
Browse files Browse the repository at this point in the history
  • Loading branch information
moritzmeister committed Dec 21, 2020
1 parent 110b613 commit cafe316
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 26 deletions.
15 changes: 11 additions & 4 deletions python/hsfs/client/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(
self._project_id = str(project_info["projectId"])

self._cert_key = None
self._cert_folder_base = None

if engine == "hive":
# On external Spark clients (Databricks, Spark Cluster),
Expand All @@ -80,11 +81,11 @@ def __init__(
credentials = self._get_credentials(self._project_id)
self._write_b64_cert_to_bytes(
str(credentials["kStore"]),
path=os.path.join(self._cert_folder, "keyStore.jks"),
path=self._get_jks_key_store_path(),
)
self._write_b64_cert_to_bytes(
str(credentials["tStore"]),
path=os.path.join(self._cert_folder, "trustStore.jks"),
path=self._get_jks_trust_store_path(),
)

self._cert_key = str(credentials["password"])
Expand All @@ -99,8 +100,8 @@ def _close(self):
return

# Clean up only on AWS
self._cleanup_file(os.path.join(self._cert_folder, "keyStore.jks"))
self._cleanup_file(os.path.join(self._cert_folder, "trustStore.jks"))
self._cleanup_file(self._get_jks_key_store_path())
self._cleanup_file(self._get_jks_trust_store_path())
self._cleanup_file(os.path.join(self._cert_folder, "material_passwd"))

try:
Expand All @@ -114,6 +115,12 @@ def _close(self):
pass
self._connected = False

def _get_jks_trust_store_path(self):
return os.path.join(self._cert_folder, "trustStore.jks")

def _get_jks_key_store_path(self):
return os.path.join(self._cert_folder, "keyStore.jks")

def _get_secret(self, secrets_store, secret_key=None, api_key_file=None):
"""Returns secret value from the AWS Secrets Manager or Parameter Store.
Expand Down
8 changes: 5 additions & 3 deletions python/hsfs/client/hopsworks.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,16 @@ class Client(base.Client):
HDFS_USER = "HDFS_USER"
T_CERTIFICATE = "t_certificate"
K_CERTIFICATE = "k_certificate"
TRUSTSTORE_SUFFIX = "__tstore.key"
KEYSTORE_SUFFIX = "__kstore.key"
TRUSTSTORE_SUFFIX = "__tstore.jks"
KEYSTORE_SUFFIX = "__kstore.jks"
PEM_CA_CHAIN = "ca_chain.pem"

def __init__(self):
"""Initializes a client being run from a job/notebook directly on Hopsworks."""
self._base_url = self._get_hopsworks_rest_endpoint()
self._host, self._port = self._get_host_port_pair()

self._cert_key = util.get_cert_pw()
trust_store_path = self._get_trust_store_path()
hostname_verification = (
os.environ[self.REQUESTS_VERIFY]
Expand Down Expand Up @@ -76,7 +78,7 @@ def _write_ca_chain(self, ca_chain_path):
"""
Converts JKS trustore file into PEM to be compatible with Python libraries
"""
keystore_pw = util.get_cert_pw()
keystore_pw = self._cert_key
keystore_ca_cert = self._convert_jks_to_pem(
self._get_jks_key_store_path(), keystore_pw
)
Expand Down
8 changes: 1 addition & 7 deletions python/hsfs/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,7 @@ def connect(self):
client.init("hopsworks")

# init engine
engine.init(
self._engine,
self._host,
self._cert_folder,
self._project,
client.get_instance()._cert_key,
)
engine.init(self._engine)

self._feature_store_api = feature_store_api.FeatureStoreApi()
self._project_api = project_api.ProjectApi()
Expand Down
4 changes: 2 additions & 2 deletions python/hsfs/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
_engine = None


def init(engine_type, host=None, cert_folder=None, project=None, cert_key=None):
def init(engine_type):
global _engine
if not _engine:
if engine_type == "spark":
Expand All @@ -34,7 +34,7 @@ def init(engine_type, host=None, cert_folder=None, project=None, cert_key=None):
"missing in HSFS installation. Install with `pip install "
"hsfs[hive]`."
)
_engine = hive.Engine(host, cert_folder, project, cert_key)
_engine = hive.Engine()


def get_instance():
Expand Down
17 changes: 8 additions & 9 deletions python/hsfs/engine/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@
# limitations under the License.
#

import os
import pandas as pd
from pyhive import hive
from sqlalchemy import create_engine

from hsfs import client


class Engine:
def __init__(self, host, cert_folder, project, cert_key):
self._host = host
self._cert_folder = os.path.join(cert_folder, host, project)
self._cert_key = cert_key
def __init__(self):
pass

def sql(self, sql_query, feature_store, online_conn, dataframe_type):
if not online_conn:
Expand Down Expand Up @@ -70,14 +69,14 @@ def set_job_group(self, group_id, description):

def _create_hive_connection(self, feature_store):
return hive.Connection(
host=self._host,
host=client.get_instance()._host,
port=9085,
# database needs to be set every time, 'default' doesn't work in pyhive
database=feature_store,
auth="CERTIFICATES",
truststore=os.path.join(self._cert_folder, "trustStore.jks"),
keystore=os.path.join(self._cert_folder, "keyStore.jks"),
keystore_password=self._cert_key,
truststore=client.get_instance()._get_jks_trust_store_path(),
keystore=client.get_instance()._get_jks_key_store_path(),
keystore_password=client.get_instance()._cert_key,
)

def _create_mysql_connection(self, online_conn):
Expand Down
2 changes: 1 addition & 1 deletion python/hsfs/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def get_cert_pw():
Returns:
Certificate password
"""
hadoop_user_name = "hadoop_user_name"
hadoop_user_name = "HADOOP_USER_NAME"
crypto_material_password = "material_passwd"
material_directory = "MATERIAL_DIRECTORY"
password_suffix = "__cert.key"
Expand Down

0 comments on commit cafe316

Please sign in to comment.