Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add use_env_var flag to client #923

Merged
merged 11 commits into from
Dec 22, 2022
11 changes: 6 additions & 5 deletions docs/samples/feature_embedding.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
"outputs": [],
"source": [
"import json\n",
"import os\n",
"\n",
"import pandas as pd\n",
"from pyspark.sql import DataFrame\n",
Expand Down Expand Up @@ -102,7 +103,7 @@
},
"outputs": [],
"source": [
"RESOURCE_PREFIX = None # TODO fill the value\n",
"RESOURCE_PREFIX = \"\" # TODO fill the value\n",
"PROJECT_NAME = \"hotel_reviews_embedding\"\n",
"\n",
"REGISTRY_ENDPOINT = f\"https://{RESOURCE_PREFIX}webapp.azurewebsites.net/api/v1\"\n",
Expand All @@ -114,8 +115,8 @@
" SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL = f\"https://{ctx.tags().get('browserHostName').get()}\"\n",
"else:\n",
" # TODO fill the values.\n",
" DATABRICKS_WORKSPACE_TOKEN_VALUE = None\n",
" SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL = None\n",
" DATABRICKS_WORKSPACE_TOKEN_VALUE = os.environ.get(\"DATABRICKS_WORKSPACE_TOKEN_VALUE\")\n",
" SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL = os.environ.get(\"SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL\")\n",
"\n",
"# We'll need an authentication credential to access Azure resources and register features \n",
"USE_CLI_AUTH = False # Set True to use interactive authentication\n",
Expand Down Expand Up @@ -146,7 +147,6 @@
" credential = AzureCliCredential(additionally_allowed_tenants=['*'],)\n",
"elif AZURE_TENANT_ID and AZURE_CLIENT_ID and AZURE_CLIENT_SECRET:\n",
" # Use Environment variable secret\n",
" import os\n",
" from azure.identity import EnvironmentCredential\n",
" os.environ[\"AZURE_TENANT_ID\"] = AZURE_TENANT_ID\n",
" os.environ[\"AZURE_CLIENT_ID\"] = AZURE_CLIENT_ID\n",
Expand Down Expand Up @@ -315,6 +315,7 @@
"client = FeathrClient(\n",
" config_path=config_path,\n",
" credential=credential,\n",
" use_env_vars=False,\n",
")"
]
},
Expand Down Expand Up @@ -791,7 +792,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.4"
"version": "3.10.8 (main, Nov 24 2022, 14:13:03) [GCC 11.2.0]"
},
"vscode": {
"interpreter": {
Expand Down
188 changes: 99 additions & 89 deletions feathr_project/feathr/client.py

Large diffs are not rendered by default.

4 changes: 1 addition & 3 deletions feathr_project/feathr/registry/feature_registry.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
from abc import ABC, abstractmethod
from pathlib import Path

from typing import Any, Dict, List, Optional, Tuple
from typing import List, Tuple
from feathr.definition.feature_derivations import DerivedFeature
from feathr.definition.anchor import FeatureAnchor
from feathr.utils._envvariableutil import _EnvVaraibleUtil

class FeathrRegistry(ABC):
"""This is the abstract class for all the feature registries. All the feature registries should implement those interfaces.
Expand Down
188 changes: 108 additions & 80 deletions feathr_project/feathr/utils/_envvariableutil.py
Original file line number Diff line number Diff line change
@@ -1,91 +1,119 @@
import os
from pathlib import Path
import yaml

from loguru import logger
from feathr.secrets.akv_client import AzureKeyVaultClient

from azure.core.exceptions import ResourceNotFoundError
from feathr.secrets.akv_client import AzureKeyVaultClient

class _EnvVaraibleUtil(object):
def __init__(self, config_path):
self.config_path = config_path
# Set to none first to avoid invalid reference
self.akv_name = None
self.akv_name = self.get_environment_variable_with_default( 'secrets', 'azure_key_vault', 'name')
self.akv_client = AzureKeyVaultClient(self.akv_name) if self.akv_name else None
class EnvConfigReader(object):
"""A utility class to read Feathr environment variables either from os environment variables,
the config yaml file or Azure Key Vault.
If a key is set in the environment variable, ConfigReader will return the value of that environment variable
unless use_env_vars set to False.
"""
akv_name: str = None # Azure Key Vault name to use for retrieving config values.
yaml_config: dict = None # YAML config file content.

def __init__(self, config_path: str, use_env_vars: bool = True):
"""Initialize the utility class.

def get_environment_variable_with_default(self, *args):
"""Gets the environment variable for the variable key.
Args:
*args: list of keys in feathr_config.yaml file
Return:
A environment variable for the variable key. It will retrieve the value of the environment variables in the following order:
If the key is set in the environment variable, Feathr will use the value of that environment variable
If it's not set in the environment, then a default is retrieved from the feathr_config.yaml file with the same config key.
If it's not available in the feathr_config.yaml file, Feathr will try to retrieve the value from key vault
If not found, an empty string will be returned with a warning error message.
"""

# if envs exist, just return the existing env variable without reading the file
env_keyword = "__".join(args)
upper_env_keyword = env_keyword.upper()
# make it work for lower case and upper case.
env_variable = os.environ.get(
env_keyword, os.environ.get(upper_env_keyword))

# If the key is set in the environment variable, Feathr will use the value of that environment variable
if env_variable:
return env_variable

# If it's not set in the environment, then a default is retrieved from the feathr_config.yaml file with the same config key.
if os.path.exists(os.path.abspath(self.config_path)):
with open(os.path.abspath(self.config_path), 'r') as stream:
try:
yaml_config = yaml.safe_load(stream)
# concat all layers and check in environment variable
yaml_layer = yaml_config

# resolve one layer after another
for arg in args:
yaml_layer = yaml_layer[arg]
return yaml_layer
except KeyError as exc:
logger.info("{} not found in the config file.", env_keyword)
except yaml.YAMLError as exc:
logger.warning(exc)

# If it's not available in the feathr_config.yaml file, Feathr will try to retrieve the value from key vault
if self.akv_name:
config_path: Config file path.
use_env_vars (optional): Whether to use os environment variables instead of config file. Defaults to True.
"""
config_path = Path(config_path)
if config_path.is_file():
try:
return self.akv_client.get_feathr_akv_secret(env_keyword)
except ResourceNotFoundError:
# print out warning message if cannot find the env variable in all the resources
logger.warning('Environment variable {} not found in environment variable, default YAML config file, or key vault service.', env_keyword)
return None
self.yaml_config = yaml.safe_load(config_path.read_text())
except yaml.YAMLError as e:
logger.warning(e)

def get_environment_variable(self, variable_key):
"""Gets the environment variable for the variable key.

self.use_env_vars = use_env_vars

self.akv_name = self.get("secrets__azure_key_vault__name")
self.akv_client = AzureKeyVaultClient(self.akv_name) if self.akv_name else None

def get(self, key: str, default: str = None) -> str:
"""Gets the Feathr config variable for the given key.
It will retrieve the value in the following order:
- From the environment variable if `use_env_vars == True` and the key is set in the os environment variables.
- From the config yaml file if the key exists.
- From the Azure Key Vault.
If the key is not found in any of the above, it will return `default`.

Args:
variable_key: environment variable key that is used to retrieve the environment variable
Return:
A environment variable for the variable key. It will retrieve the value of the environment variables in the following order:
If the key is set in the environment variable, Feathr will use the value of that environment variable
If it's not available in the environment variable file, Feathr will try to retrieve the value from key vault
If not found, an empty string will be returned with a warning error message.
"""
env_var_value = os.environ.get(variable_key)

if env_var_value:
return env_var_value

# If it's not available in the environment variable file, Feathr will try to retrieve the value from key vault
logger.info(variable_key + ' is not set in the environment variables.')

if self.akv_name:
try:
return self.akv_client.get_feathr_akv_secret(variable_key)
except ResourceNotFoundError:
# print out warning message if cannot find the env variable in all the resources
logger.warning('Environment variable {} not found in environment variable or key vault service.', variable_key)
return None

key: Config variable name. For example, `SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL`
default (optional): Default value to return if the key is not found. Defaults to None.

Returns:
Feathr client's config value.
"""
conf_var = (
(self._get_variable_from_env(key) if self.use_env_vars else None) or
(self._get_variable_from_file(key) if self.yaml_config else None) or
(self._get_variable_from_akv(key) if self.akv_name else None) or
default
)

return conf_var

def get_from_env_or_akv(self, key: str) -> str:
"""Gets the Feathr config variable for the given key. This function ignores `use_env_vars` attribute and force to
look up environment variables or Azure Key Vault.
It will retrieve the value in the following order:
- From the environment variable if the key is set in the os environment variables.
- From the Azure Key Vault.
If the key is not found in any of the above, it will return None.

Args:
key: Config variable name. For example, `ADLS_ACCOUNT`

Returns:
Feathr client's config value.
"""
conf_var = (
self._get_variable_from_env(key) or
(self._get_variable_from_akv(key) if self.akv_name else None)
)

return conf_var

def _get_variable_from_env(self, key: str) -> str:
# make it work for lower case and upper case.
conf_var = os.environ.get(key.lower(), os.environ.get(key.upper()))

if conf_var is None:
logger.info(f"Config {key} is not set in the environment variables.")

return conf_var

def _get_variable_from_akv(self, key: str) -> str:
try:
# Azure Key Vault object name is case in-sensitive.
# https://learn.microsoft.com/en-us/azure/key-vault/general/about-keys-secrets-certificates#vault-name-and-object-name
return self.akv_client.get_feathr_akv_secret(key)
except ResourceNotFoundError:
logger.warning(f"Resource {self.akv_name} not found")

return None

def _get_variable_from_file(self, key: str) -> str:
args = key.split("__")
try:
conf_var = self.yaml_config
for arg in args:
if conf_var is None:
break
# make it work for lower case and upper case.
conf_var = conf_var.get(arg.lower(), conf_var.get(arg.upper()))

if conf_var is None:
logger.info(f"Config {key} is not found in the config file.")

return conf_var
except Exception as e:
logger.warning(e)

return None
41 changes: 17 additions & 24 deletions feathr_project/feathr/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,22 @@
}
}


# New databricks job cluster config
DEFAULT_DATABRICKS_CLUSTER_CONFIG = {
"spark_version": "11.2.x-scala2.12",
"node_type_id": "Standard_D3_v2",
"num_workers": 2,
"node_type_id": "Standard_F4s", # May change this if out of quota
"num_workers": 1,
"spark_conf": {
"FEATHR_FILL_IN": "FEATHR_FILL_IN",
# Exclude conflicting packages if use feathr <= v0.8.0:
"spark.jars.excludes": "commons-logging:commons-logging,org.slf4j:slf4j-api,com.google.protobuf:protobuf-java,javax.xml.bind:jaxb-api",
},
}


# New Azure Synapse spark pool config
DEFAULT_AZURE_SYNAPSE_SPARK_POOL_CONFIG = {
"executor_size": "Small",
"executor_num": 2,
"executor_num": 1,
}


Expand All @@ -59,16 +57,9 @@ def generate_config(
databricks_cluster_id: str = None,
redis_password: str = None,
adls_key: str = None,
use_env_vars: bool = True,
**kwargs,
) -> str:
"""Generate a feathr config yaml file.
Note, `use_env_vars` argument gives an option to either use environment variables for generating the config file
or not. Feathr client will use environment variables anyway if they are set.

Keyword arguments follow the same naming convention as the feathr config. E.g. to set Databricks as the target
cluster, use `spark_config__spark_cluster="databricks"`.
See https://feathr-ai.github.io/feathr/quickstart_synapse.html#step-4-update-feathr-config for more details.

Note:
This utility function assumes Azure resources are deployed using the Azure Resource Manager (ARM) template,
Expand All @@ -78,14 +69,16 @@ def generate_config(
Args:
resource_prefix: Resource name prefix used when deploying Feathr resources by using ARM template.
project_name: Feathr project name.
cluster_name (optional): Databricks cluster or Azure Synapse spark pool name to use an existing one.
output_filepath (optional): Output filepath.
use_env_vars (optional): Whether to use environment variables if they are set.
databricks_workspace_token_value (optional): Databricks workspace token. If provided, the value will be stored
as the environment variable.
databricks_cluster_id (optional): Databricks cluster id to use an existing cluster.
redis_password (optional): Redis password. If provided, the value will be stored as the environment variable.
adls_key (optional): ADLS key. If provided, the value will be stored as the environment variable.
**kwargs: Keyword arguments to update the config. Keyword arguments follow the same naming convention as
the feathr config. E.g. to set Databricks as the target cluster,
use `spark_config__spark_cluster="databricks"`.
See https://feathr-ai.github.io/feathr/quickstart_synapse.html#step-4-update-feathr-config for more details.

Returns:
str: Generated config file path. This will be identical to `output_filepath` if provided.
Expand All @@ -100,6 +93,16 @@ def generate_config(

# Set configs
config = deepcopy(DEFAULT_FEATHR_CONFIG)

# Maybe update configs with environment variables
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__SPARK_CLUSTER")
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__AZURE_SYNAPSE__DEV_URL")
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__AZURE_SYNAPSE__POOL_NAME")
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__AZURE_SYNAPSE__WORKSPACE_DIR")
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__DATABRICKS__WORK_DIR")
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL")
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__DATABRICKS__CONFIG_TEMPLATE")

config["project_config"]["project_name"] = project_name
config["feature_registry"]["api_endpoint"] = f"https://{resource_prefix}webapp.azurewebsites.net/api/v1"
config["online_store"]["redis"]["host"] = f"{resource_prefix}redis.redis.cache.windows.net"
Expand All @@ -124,16 +127,6 @@ def generate_config(
cluster_id=databricks_cluster_id,
)

# Maybe update configs with environment variables
if use_env_vars:
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__SPARK_CLUSTER")
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__AZURE_SYNAPSE__DEV_URL")
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__AZURE_SYNAPSE__POOL_NAME")
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__AZURE_SYNAPSE__WORKSPACE_DIR")
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__DATABRICKS__WORK_DIR")
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL")
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__DATABRICKS__CONFIG_TEMPLATE")

# Verify config
_verify_config(config)

Expand Down
5 changes: 1 addition & 4 deletions feathr_project/test/samples/test_notebooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,19 @@ def test__nyc_taxi_demo(config_path, tmp_path):


@pytest.mark.databricks
def test__feature_embedding(config_path, tmp_path):
def test__feature_embedding(tmp_path):
notebook_name = "feature_embedding"
output_notebook_path = str(tmp_path.joinpath(f"{notebook_name}.ipynb"))

print(f"Running {notebook_name} notebook as {output_notebook_path}")

conf = yaml.safe_load(Path(config_path).read_text())

pm.execute_notebook(
input_path=NOTEBOOK_PATHS[notebook_name],
output_path=output_notebook_path,
# kernel_name="python3",
parameters=dict(
USE_CLI_AUTH=False,
REGISTER_FEATURES=False,
SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL=conf["spark_config"]["databricks"]["workspace_instance_url"],
CLEAN_UP=True,
),
)
Loading