Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple build feature code #838

Merged
merged 8 commits into from
Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 36 additions & 18 deletions feathr_project/feathr/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from feathr.definition.settings import ObservationSettings
from feathr.definition.sink import Sink
from feathr.protobuf.featureValue_pb2 import FeatureValue
from feathr.registry.feature_registry import default_registry_client
from feathr.spark_provider._databricks_submission import _FeathrDatabricksJobLauncher
from feathr.spark_provider._localspark_submission import _FeathrLocalSparkJobLauncher
from feathr.spark_provider._synapse_submission import _FeathrSynapseJobLauncher
Expand All @@ -34,8 +33,14 @@
from feathr.utils.feature_printer import FeaturePrinter
from feathr.utils.spark_job_params import FeatureGenerationJobParams, FeatureJoinJobParams
from feathr.definition.source import InputContext
from azure.identity import DefaultAzureCredential
from jinja2 import Template
from loguru import logger
from feathr.definition.config_helper import FeathrConfigHelper
from pyhocon import ConfigFactory
from feathr.registry._feathr_registry_client import _FeatureRegistry
from feathr.registry._feature_registry_purview import _PurviewRegistry
from feathr.version import get_version

class FeathrClient(object):
"""Feathr client.

Expand Down Expand Up @@ -170,10 +175,24 @@ def __init__(self, config_path:str = "./feathr_config.yaml", local_workspace_dir

self.secret_names = []

# initialize registry
self.registry = default_registry_client(self.project_name, config_path=config_path, credential=self.credential)
# initialize config helper
self.config_helper = FeathrConfigHelper()

logger.info(f"Feathr Client {get_version()} initialized successfully")
xiaoyongzhu marked this conversation as resolved.
Show resolved Hide resolved
# initialize registry
self.registry = None
xiaoyongzhu marked this conversation as resolved.
Show resolved Hide resolved
registry_endpoint = self.envutils.get_environment_variable_with_default("feature_registry", "api_endpoint")
azure_purview_name = self.envutils.get_environment_variable_with_default('feature_registry', 'purview', 'purview_name')
if registry_endpoint:
self.registry = _FeatureRegistry(self.project_name, endpoint=registry_endpoint, project_tags=project_registry_tag, credential=credential)
elif azure_purview_name:
registry_delimiter = self.envutils.get_environment_variable_with_default('feature_registry', 'purview', 'delimiter')
# initialize the registry no matter whether we set purview name or not, given some of the methods are used there.
self.registry = _PurviewRegistry(self.project_name, azure_purview_name, registry_delimiter, project_registry_tag, config_path = config_path, credential=credential)
else:
# no registry configured
logger.info("Feathr registry is not configured. Consider setting the Feathr registry component for richer feature store experience.")

logger.info(f"Feathr client {get_version()} initialized successfully.")

def _check_required_environment_variables_exist(self):
"""Checks if the required environment variables(form feathr_config.yaml) is set.
Expand All @@ -197,7 +216,7 @@ def register_features(self, from_context: bool = True):
if from_context:
# make sure those items are in `self`
if 'anchor_list' in dir(self) and 'derived_feature_list' in dir(self):
self.registry.save_to_feature_config_from_context(self.anchor_list, self.derived_feature_list, self.local_workspace_dir)
self.config_helper.save_to_feature_config_from_context(self.anchor_list, self.derived_feature_list, self.local_workspace_dir)
self.registry.register_features(self.local_workspace_dir, from_context=from_context, anchor_list=self.anchor_list, derived_feature_list=self.derived_feature_list)
else:
raise RuntimeError("Please call FeathrClient.build_features() first in order to register features")
Expand All @@ -224,9 +243,8 @@ def build_features(self, anchor_list: List[FeatureAnchor] = [], derived_feature_
else:
source_names[anchor.source.name] = anchor.source

preprocessingPyudfManager = _PreprocessingPyudfManager()
_PreprocessingPyudfManager.build_anchor_preprocessing_metadata(anchor_list, self.local_workspace_dir)
self.registry.save_to_feature_config_from_context(anchor_list, derived_feature_list, self.local_workspace_dir)
self.config_helper.save_to_feature_config_from_context(anchor_list, derived_feature_list, self.local_workspace_dir)
self.anchor_list = anchor_list
self.derived_feature_list = derived_feature_list

Expand Down Expand Up @@ -470,7 +488,7 @@ def get_offline_features(self,
# otherwise users will be confused on what are the available features
# in build_features it will assign anchor_list and derived_feature_list variable, hence we are checking if those two variables exist to make sure the above condition is met
if 'anchor_list' in dir(self) and 'derived_feature_list' in dir(self):
self.registry.save_to_feature_config_from_context(self.anchor_list, self.derived_feature_list, self.local_workspace_dir)
self.config_helper.save_to_feature_config_from_context(self.anchor_list, self.derived_feature_list, self.local_workspace_dir)
else:
raise RuntimeError("Please call FeathrClient.build_features() first in order to get offline features")

Expand Down Expand Up @@ -678,7 +696,7 @@ def materialize_features(self, settings: MaterializationSettings, execution_conf
# otherwise users will be confused on what are the available features
# in build_features it will assign anchor_list and derived_feature_list variable, hence we are checking if those two variables exist to make sure the above condition is met
if 'anchor_list' in dir(self) and 'derived_feature_list' in dir(self):
self.registry.save_to_feature_config_from_context(self.anchor_list, self.derived_feature_list, self.local_workspace_dir)
self.config_helper.save_to_feature_config_from_context(self.anchor_list, self.derived_feature_list, self.local_workspace_dir)
else:
raise RuntimeError("Please call FeathrClient.build_features() first in order to materialize the features")

Expand Down Expand Up @@ -772,7 +790,7 @@ def _get_s3_config_str(self):
# keys can't be only accessed through environment
access_key = self.envutils.get_environment_variable('S3_ACCESS_KEY')
secret_key = self.envutils.get_environment_variable('S3_SECRET_KEY')
# HOCCON format will be parsed by the Feathr job
# HOCON format will be parsed by the Feathr job
config_str = """
S3_ENDPOINT: {S3_ENDPOINT}
S3_ACCESS_KEY: "{S3_ACCESS_KEY}"
Expand All @@ -787,7 +805,7 @@ def _get_adls_config_str(self):
# if ADLS Account is set in the feathr_config, then we need other environment variables
# keys can't be only accessed through environment
key = self.envutils.get_environment_variable('ADLS_KEY')
# HOCCON format will be parsed by the Feathr job
# HOCON format will be parsed by the Feathr job
config_str = """
ADLS_ACCOUNT: {ADLS_ACCOUNT}
ADLS_KEY: "{ADLS_KEY}"
Expand All @@ -801,7 +819,7 @@ def _get_blob_config_str(self):
# if BLOB Account is set in the feathr_config, then we need other environment variables
# keys can't be only accessed through environment
key = self.envutils.get_environment_variable('BLOB_KEY')
# HOCCON format will be parsed by the Feathr job
# HOCON format will be parsed by the Feathr job
config_str = """
BLOB_ACCOUNT: {BLOB_ACCOUNT}
BLOB_KEY: "{BLOB_KEY}"
Expand All @@ -817,7 +835,7 @@ def _get_sql_config_str(self):
driver = self.envutils.get_environment_variable('JDBC_DRIVER')
auth_flag = self.envutils.get_environment_variable('JDBC_AUTH_FLAG')
token = self.envutils.get_environment_variable('JDBC_TOKEN')
# HOCCON format will be parsed by the Feathr job
# HOCON format will be parsed by the Feathr job
config_str = """
JDBC_TABLE: {JDBC_TABLE}
JDBC_USER: {JDBC_USER}
Expand All @@ -834,7 +852,7 @@ def _get_monitoring_config_str(self):
user = self.envutils.get_environment_variable_with_default('monitoring', 'database', 'sql', 'user')
password = self.envutils.get_environment_variable('MONITORING_DATABASE_SQL_PASSWORD')
if url:
# HOCCON format will be parsed by the Feathr job
# HOCON format will be parsed by the Feathr job
config_str = """
MONITORING_DATABASE_SQL_URL: "{url}"
MONITORING_DATABASE_SQL_USER: {user}
Expand All @@ -852,7 +870,7 @@ def _get_snowflake_config_str(self):
sf_role = self.envutils.get_environment_variable_with_default('offline_store', 'snowflake', 'role')
sf_warehouse = self.envutils.get_environment_variable_with_default('offline_store', 'snowflake', 'warehouse')
sf_password = self.envutils.get_environment_variable('JDBC_SF_PASSWORD')
# HOCCON format will be parsed by the Feathr job
# HOCON format will be parsed by the Feathr job
config_str = """
JDBC_SF_URL: {JDBC_SF_URL}
JDBC_SF_USER: {JDBC_SF_USER}
Expand All @@ -866,7 +884,7 @@ def _get_kafka_config_str(self):
"""Construct the Kafka config string. The endpoint, access key, secret key, and other parameters can be set via
environment variables."""
sasl = self.envutils.get_environment_variable('KAFKA_SASL_JAAS_CONFIG')
# HOCCON format will be parsed by the Feathr job
# HOCON format will be parsed by the Feathr job
config_str = """
KAFKA_SASL_JAAS_CONFIG: "{sasl}"
""".format(sasl=sasl)
Expand Down Expand Up @@ -899,4 +917,4 @@ def _reshape_config_str(self, config_str:str):
if self.spark_runtime == 'local':
return "'{" + config_str + "}'"
else:
return config_str
return config_str
193 changes: 193 additions & 0 deletions feathr_project/feathr/definition/config_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
from feathr.definition.dtype import *
from feathr.registry.registry_utils import *
from feathr.utils._file_utils import write_to_file
from feathr.definition.anchor import FeatureAnchor
from feathr.constants import *
from feathr.definition.feature import Feature, FeatureType,FeatureBase
from feathr.definition.feature_derivations import DerivedFeature
from feathr.definition.repo_definitions import RepoDefinitions
from feathr.definition.source import HdfsSource, InputContext, JdbcSource, Source
from feathr.definition.transformation import (ExpressionTransformation, Transformation,
WindowAggTransformation)
from feathr.definition.typed_key import TypedKey
from feathr.registry.feature_registry import FeathrRegistry
from feathr.definition.repo_definitions import RepoDefinitions
from pathlib import Path
from jinja2 import Template
import sys
from feathr.utils._file_utils import write_to_file
import importlib
import os

class FeathrConfigHelper(object):
xiaoyongzhu marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self) -> None:
pass
def _get_py_files(self, path: Path) -> List[Path]:
"""Get all Python files under path recursively, excluding __init__.py"""
py_files = []
for item in path.glob('**/*.py'):
if "__init__.py" != item.name:
py_files.append(item)
return py_files

def _convert_to_module_path(self, path: Path, workspace_path: Path) -> str:
"""Convert a Python file path to its module path so that we can import it later"""
prefix = os.path.commonprefix(
[path.resolve(), workspace_path.resolve()])
resolved_path = str(path.resolve())
module_path = resolved_path[len(prefix): -len(".py")]
# Convert features under nested folder to module name
# e.g. /path/to/pyfile will become path.to.pyfile
return (
module_path
.lstrip('/')
.replace("/", ".")
)

def _extract_features_from_context(self, anchor_list, derived_feature_list, result_path: Path) -> RepoDefinitions:
"""Collect feature definitions from the context instead of python files"""
definitions = RepoDefinitions(
sources=set(),
features=set(),
transformations=set(),
feature_anchors=set(),
derived_features=set()
)
for derived_feature in derived_feature_list:
if isinstance(derived_feature, DerivedFeature):
definitions.derived_features.add(derived_feature)
definitions.transformations.add(
vars(derived_feature)["transform"])
else:
raise RuntimeError(f"Please make sure you pass a list of `DerivedFeature` objects to the `derived_feature_list` argument. {str(type(derived_feature))} is detected.")

for anchor in anchor_list:
# obj is `FeatureAnchor`
definitions.feature_anchors.add(anchor)
# add the source section of this `FeatureAnchor` object
definitions.sources.add(vars(anchor)['source'])
for feature in vars(anchor)['features']:
# get the transformation object from `Feature` or `DerivedFeature`
if isinstance(feature, Feature):
# feature is of type `Feature`
definitions.features.add(feature)
definitions.transformations.add(vars(feature)["transform"])
else:

raise RuntimeError(f"Please make sure you pass a list of `Feature` objects. {str(type(feature))} is detected.")

return definitions

def _extract_features(self, workspace_path: Path) -> RepoDefinitions:
"""Collect feature definitions from the python file, convert them into feature config and save them locally"""
os.chdir(workspace_path)
# Add workspace path to system path so that we can load features defined in Python via import_module
sys.path.append(str(workspace_path))
definitions = RepoDefinitions(
sources=set(),
features=set(),
transformations=set(),
feature_anchors=set(),
derived_features=set()
)
for py_file in self._get_py_files(workspace_path):
module_path = self._convert_to_module_path(py_file, workspace_path)
module = importlib.import_module(module_path)
for attr_name in dir(module):
obj = getattr(module, attr_name)
if isinstance(obj, Source):
definitions.sources.add(obj)
elif isinstance(obj, Feature):
definitions.features.add(obj)
elif isinstance(obj, DerivedFeature):
definitions.derived_features.add(obj)
elif isinstance(obj, FeatureAnchor):
definitions.feature_anchors.add(obj)
elif isinstance(obj, Transformation):
definitions.transformations.add(obj)
return definitions

def save_to_feature_config(self, workspace_path: Path, config_save_dir: Path):
"""Save feature definition within the workspace into HOCON feature config files"""
repo_definitions = self._extract_features(workspace_path)
self._save_request_feature_config(repo_definitions, config_save_dir)
self._save_anchored_feature_config(repo_definitions, config_save_dir)
self._save_derived_feature_config(repo_definitions, config_save_dir)

def save_to_feature_config_from_context(self, anchor_list, derived_feature_list, local_workspace_dir: Path):
"""Save feature definition within the workspace into HOCON feature config files from current context, rather than reading from python files"""
repo_definitions = self._extract_features_from_context(
anchor_list, derived_feature_list, local_workspace_dir)
self._save_request_feature_config(repo_definitions, local_workspace_dir)
self._save_anchored_feature_config(repo_definitions, local_workspace_dir)
self._save_derived_feature_config(repo_definitions, local_workspace_dir)

def _save_request_feature_config(self, repo_definitions: RepoDefinitions, local_workspace_dir="./"):
config_file_name = "feature_conf/auto_generated_request_features.conf"
tm = Template(
"""
// THIS FILE IS AUTO GENERATED. PLEASE DO NOT EDIT.
anchors: {
{% for anchor in feature_anchors %}
{% if anchor.source.name == "PASSTHROUGH" %}
{{anchor.to_feature_config()}}
{% endif %}
{% endfor %}
}
"""
)

request_feature_configs = tm.render(
feature_anchors=repo_definitions.feature_anchors)
config_file_path = os.path.join(local_workspace_dir, config_file_name)
write_to_file(content=request_feature_configs,
full_file_name=config_file_path)

@classmethod
def _save_anchored_feature_config(self, repo_definitions: RepoDefinitions, local_workspace_dir="./"):
config_file_name = "feature_conf/auto_generated_anchored_features.conf"
tm = Template(
"""
// THIS FILE IS AUTO GENERATED. PLEASE DO NOT EDIT.
anchors: {
{% for anchor in feature_anchors %}
{% if not anchor.source.name == "PASSTHROUGH" %}
{{anchor.to_feature_config()}}
{% endif %}
{% endfor %}
}

sources: {
{% for source in sources%}
{% if not source.name == "PASSTHROUGH" %}
{{source.to_feature_config()}}
{% endif %}
{% endfor %}
}
"""
)
anchored_feature_configs = tm.render(feature_anchors=repo_definitions.feature_anchors,
sources=repo_definitions.sources)
config_file_path = os.path.join(local_workspace_dir, config_file_name)
write_to_file(content=anchored_feature_configs,
full_file_name=config_file_path)

@classmethod
def _save_derived_feature_config(self, repo_definitions: RepoDefinitions, local_workspace_dir="./"):
config_file_name = "feature_conf/auto_generated_derived_features.conf"
tm = Template(
"""
anchors: {}
derivations: {
{% for derived_feature in derived_features %}
{{derived_feature.to_feature_config()}}
{% endfor %}
}
"""
)
derived_feature_configs = tm.render(
derived_features=repo_definitions.derived_features)
config_file_path = os.path.join(local_workspace_dir, config_file_name)
write_to_file(content=derived_feature_configs,
full_file_name=config_file_path)

Loading