Skip to content

Commit

Permalink
[HOPSWORKS-1971] Add HSFS python support (#206)
Browse files Browse the repository at this point in the history
  • Loading branch information
SirOibaf authored Jan 16, 2021
1 parent b2a7e0b commit 9302c27
Show file tree
Hide file tree
Showing 27 changed files with 1,014 additions and 200 deletions.
4 changes: 4 additions & 0 deletions python/hsfs/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def _send_request(
headers=None,
data=None,
stream=False,
files=None,
):
"""Send REST request to Hopsworks.
Expand All @@ -110,6 +111,8 @@ def _send_request(
:type data: dict, optional
:param stream: Set if response should be a stream, defaults to False
:type stream: boolean, optional
:param files: dictionary for multipart encoding upload
:type files: dict, optional
:raises RestAPIError: Raised when request wasn't correctly received, understood or accepted
:return: Response json
:rtype: dict
Expand All @@ -126,6 +129,7 @@ def _send_request(
data=data,
params=query_params,
auth=self._auth,
files=files,
)

prepped = self._session.prepare_request(request)
Expand Down
6 changes: 4 additions & 2 deletions python/hsfs/client/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,12 @@ def __init__(
os.makedirs(self._cert_folder, exist_ok=True)
credentials = self._get_credentials(self._project_id)
self._write_b64_cert_to_bytes(
str(credentials["kStore"]), path=self._get_jks_key_store_path(),
str(credentials["kStore"]),
path=self._get_jks_key_store_path(),
)
self._write_b64_cert_to_bytes(
str(credentials["tStore"]), path=self._get_jks_trust_store_path(),
str(credentials["tStore"]),
path=self._get_jks_trust_store_path(),
)

self._cert_key = str(credentials["password"])
Expand Down
6 changes: 4 additions & 2 deletions python/hsfs/constructor/on_demand_feature_group_alias.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

class OnDemandFeatureGroupAlias:
def __init__(self, on_demand_feature_group, alias):
self._on_demand_feature_group = feature_group.OnDemandFeatureGroup.from_response_json(
on_demand_feature_group
self._on_demand_feature_group = (
feature_group.OnDemandFeatureGroup.from_response_json(
on_demand_feature_group
)
)
self._alias = alias

Expand Down
72 changes: 47 additions & 25 deletions python/hsfs/constructor/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#

import json
import humps
from typing import Optional, List, Union

from hsfs import util, engine
Expand All @@ -25,21 +26,23 @@
class Query:
def __init__(
self,
feature_store_name,
feature_store_id,
left_feature_group,
left_features,
left_featuregroup_start_time=None,
left_featuregroup_end_time=None,
feature_store_name=None,
feature_store_id=None,
left_feature_group_start_time=None,
left_feature_group_end_time=None,
joins=[],
filter=None,
):
self._feature_store_name = feature_store_name
self._feature_store_id = feature_store_id
self._left_feature_group = left_feature_group
self._left_features = util.parse_features(left_features)
self._left_featuregroup_start_time = left_featuregroup_start_time
self._left_featuregroup_end_time = left_featuregroup_end_time
self._joins = []
self._filter = None
self._left_feature_group_start_time = left_feature_group_start_time
self._left_feature_group_end_time = left_feature_group_end_time
self._joins = joins
self._filter = filter
self._query_constructor_api = query_constructor_api.QueryConstructorApi()
self._storage_connector_api = storage_connector_api.StorageConnectorApi(
feature_store_id
Expand Down Expand Up @@ -155,13 +158,13 @@ def join(

def as_of(self, wallclock_time):
for join in self._joins:
join.query.left_featuregroup_end_time = wallclock_time
self.left_featuregroup_end_time = wallclock_time
join.query.left_feature_group_end_time = wallclock_time
self.left_feature_group_end_time = wallclock_time
return self

def pull_changes(self, wallclock_start_time, wallclock_end_time):
self.left_featuregroup_start_time = wallclock_start_time
self.left_featuregroup_end_time = wallclock_end_time
self.left_feature_group_start_time = wallclock_start_time
self.left_feature_group_end_time = wallclock_end_time
return self

def filter(self, f: Union[filter.Filter, filter.Logic]):
Expand Down Expand Up @@ -211,14 +214,32 @@ def json(self):

def to_dict(self):
return {
"featureStoreName": self._feature_store_name,
"featureStoreId": self._feature_store_id,
"leftFeatureGroup": self._left_feature_group,
"leftFeatures": self._left_features,
"leftFeatureGroupStartTime": self._left_featuregroup_start_time,
"leftFeatureGroupEndTime": self._left_featuregroup_end_time,
"leftFeatureGroupStartTime": self._left_feature_group_start_time,
"leftFeatureGroupEndTime": self._left_feature_group_end_time,
"joins": self._joins,
"filter": self._filter,
}

@classmethod
def _hopsworks_json(cls, json_dict):
"""
This method is used by the Hopsworks helper job.
It does not fully deserialize the message as the usecase is to
send it straight back to Hopsworks to read the content of the query
Args:
json_dict (str): a json string containing a query object
Returns:
A partially deserialize query object
"""
json_decamelized = humps.decamelize(json_dict)
return cls(**json_decamelized)

def to_string(self, online=False):
fs_query_instance = self._query_constructor_api.construct_query(self)
return fs_query_instance.query_online if online else fs_query_instance.query
Expand All @@ -232,24 +253,25 @@ def _register_on_demand(self, on_demand_fg_aliases):

for on_demand_fg_alias in on_demand_fg_aliases:
engine.get_instance().register_on_demand_temporary_table(
on_demand_fg_alias.on_demand_feature_group, on_demand_fg_alias.alias,
on_demand_fg_alias.on_demand_feature_group,
on_demand_fg_alias.alias,
)

@property
def left_featuregroup_start_time(self):
return self._left_featuregroup_start_time
def left_feature_group_start_time(self):
return self._left_feature_group_start_time

@property
def left_featuregroup_end_time(self):
return self._left_featuregroup_start_time
def left_feature_group_end_time(self):
return self._left_feature_group_start_time

@left_featuregroup_start_time.setter
def left_featuregroup_start_time(self, left_featuregroup_start_time):
self._left_featuregroup_start_time = left_featuregroup_start_time
@left_feature_group_start_time.setter
def left_feature_group_start_time(self, left_feature_group_start_time):
self._left_feature_group_start_time = left_feature_group_start_time

@left_featuregroup_end_time.setter
def left_featuregroup_end_time(self, left_featuregroup_start_time):
self._left_featuregroup_end_time = left_featuregroup_start_time
@left_feature_group_end_time.setter
def left_feature_group_end_time(self, left_feature_group_start_time):
self._left_feature_group_end_time = left_feature_group_start_time

def _register_hudi_tables(
self, hudi_feature_groups, feature_store_id, feature_store_name, read_options
Expand Down
68 changes: 68 additions & 0 deletions python/hsfs/core/dataset_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#
# Copyright 2020 Logical Clocks AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import math

from hsfs import client, util


class DatasetApi:
DEFAULT_FLOW_CHUNK_SIZE = 1048576

def upload(self, feature_group, path, dataframe):
# Convert the dataframe into CSV for upload
df_csv = dataframe.to_csv(index=False)
num_chunks = math.ceil(len(df_csv) / self.DEFAULT_FLOW_CHUNK_SIZE)

base_params = self._get_flow_base_params(feature_group, num_chunks, len(df_csv))

chunks = [
df_csv[i : i + self.DEFAULT_FLOW_CHUNK_SIZE]
for i in range(0, len(df_csv), self.DEFAULT_FLOW_CHUNK_SIZE)
]

chunk_number = 1
for chunk in chunks:
query_params = base_params
query_params["flowCurrentChunkSize"] = len(chunk)
query_params["flowChunkNumber"] = chunk_number

self._upload_request(
query_params, path, util.feature_group_name(feature_group), chunk
)

chunk_number += 1

def _get_flow_base_params(self, feature_group, num_chunks, size):
# TODO(fabio): flow identifier is not unique
return {
"templateId": -1,
"flowChunkSize": self.DEFAULT_FLOW_CHUNK_SIZE,
"flowTotalSize": size,
"flowIdentifier": util.feature_group_name(feature_group),
"flowFilename": util.feature_group_name(feature_group),
"flowRelativePath": util.feature_group_name(feature_group),
"flowTotalChunks": num_chunks,
}

def _upload_request(self, params, path, file_name, chunk):
_client = client.get_instance()
path_params = ["project", _client._project_id, "dataset", "upload", path]

# Flow configuration params are sent as form data
_client._send_request(
"POST", path_params, data=params, files={"file": (file_name, chunk)}
)
28 changes: 28 additions & 0 deletions python/hsfs/core/feature_group_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from hsfs import client
from hsfs import feature_group, feature_group_commit
from hsfs.core import ingestion_job


class FeatureGroupApi:
Expand Down Expand Up @@ -221,3 +222,30 @@ def commit_details(self, feature_group_instance, limit):
return feature_group_commit.FeatureGroupCommit.from_response_json(
_client._send_request("GET", path_params, query_params, headers=headers),
)

def ingestion(self, feature_group_instance, ingestion_conf):
"""
Setup a Hopsworks job for dataframe ingestion
Args:
feature_group_instance: FeatureGroup, required
metadata object of feature group.
ingestion_conf: the configuration for the ingestion job application
"""

_client = client.get_instance()
path_params = [
"project",
_client._project_id,
"featurestores",
self._feature_store_id,
"featuregroups",
feature_group_instance.id,
"ingestion",
]

headers = {"content-type": "application/json"}
return ingestion_job.IngestionJob.from_response_json(
_client._send_request(
"POST", path_params, headers=headers, data=ingestion_conf.json()
),
)
2 changes: 1 addition & 1 deletion python/hsfs/core/hudi_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def __init__(
self._feature_store_id = feature_store_id
self._feature_store_name = feature_store_name
self._base_path = self._feature_group.location
self._table_name = feature_group.name + "_" + str(feature_group.version)
self._table_name = util.feature_group_name(feature_group)

self._primary_key = ",".join(feature_group.primary_key)
self._partition_key = (
Expand Down
46 changes: 46 additions & 0 deletions python/hsfs/core/ingestion_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#
# Copyright 2020 Logical Clocks AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import humps
from hsfs.core.job import Job


class IngestionJob:
def __init__(
self,
data_path,
job,
href=None,
expand=None,
items=None,
count=None,
type=None,
):
self._data_path = data_path
self._job = Job.from_response_json(job)

@classmethod
def from_response_json(cls, json_dict):
json_decamelized = humps.decamelize(json_dict)
return cls(**json_decamelized)

@property
def data_path(self):
return self._data_path

@property
def job(self):
return self._job
Loading

0 comments on commit 9302c27

Please sign in to comment.