Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Added generic Feature store Creation for CLI #3618

Merged
merged 1 commit into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 20 additions & 89 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import json
import logging
import os
import tempfile
from datetime import datetime
from pathlib import Path
from typing import List, Optional
Expand All @@ -28,18 +25,15 @@
from pygments import formatters, highlight, lexers

from feast import utils
from feast.constants import (
DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT,
FEATURE_STORE_YAML_ENV_NAME,
)
from feast.constants import DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT
from feast.errors import FeastObjectNotFoundException, FeastProviderLoginError
from feast.feature_store import FeatureStore
from feast.feature_view import FeatureView
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.repo_config import load_repo_config
from feast.repo_operations import (
apply_total,
cli_check_repo,
create_feature_store,
generate_project_name,
init_repo,
plan,
Expand Down Expand Up @@ -172,10 +166,7 @@ def ui(
"""
Shows the Feast UI over the current directory
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)
# Pass in the registry_dump method to get around a circular dependency
store.serve_ui(
host=host,
Expand All @@ -192,10 +183,7 @@ def endpoint(ctx: click.Context):
"""
Display feature server endpoints
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)
endpoint = store.get_feature_server_endpoint()
if endpoint is not None:
_logger.info(
Expand All @@ -220,10 +208,7 @@ def data_source_describe(ctx: click.Context, name: str):
"""
Describe a data source
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)

try:
data_source = store.get_data_source(name)
Expand All @@ -244,10 +229,7 @@ def data_source_list(ctx: click.Context):
"""
List all data sources
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)
table = []
for datasource in store.list_data_sources():
table.append([datasource.name, datasource.__class__])
Expand All @@ -272,10 +254,7 @@ def entity_describe(ctx: click.Context, name: str):
"""
Describe an entity
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)

try:
entity = store.get_entity(name)
Expand All @@ -296,10 +275,7 @@ def entity_list(ctx: click.Context):
"""
List all entities
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)
table = []
for entity in store.list_entities():
table.append([entity.name, entity.description, entity.value_type])
Expand All @@ -324,10 +300,7 @@ def feature_service_describe(ctx: click.Context, name: str):
"""
Describe a feature service
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)

try:
feature_service = store.get_feature_service(name)
Expand All @@ -350,10 +323,7 @@ def feature_service_list(ctx: click.Context):
"""
List all feature services
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)
feature_services = []
for feature_service in store.list_feature_services():
feature_names = []
Expand Down Expand Up @@ -383,10 +353,7 @@ def feature_view_describe(ctx: click.Context, name: str):
"""
Describe a feature view
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)

try:
feature_view = store.get_feature_view(name)
Expand All @@ -407,11 +374,7 @@ def feature_view_list(ctx: click.Context):
"""
List all feature views
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]

cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)
table = []
for feature_view in [
*store.list_feature_views(),
Expand Down Expand Up @@ -452,10 +415,7 @@ def on_demand_feature_view_describe(ctx: click.Context, name: str):
"""
[Experimental] Describe an on demand feature view
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)

try:
on_demand_feature_view = store.get_on_demand_feature_view(name)
Expand All @@ -478,10 +438,7 @@ def on_demand_feature_view_list(ctx: click.Context):
"""
[Experimental] List all on demand feature views
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)
table = []
for on_demand_feature_view in store.list_on_demand_feature_views():
table.append([on_demand_feature_view.name])
Expand Down Expand Up @@ -583,10 +540,8 @@ def materialize_command(

START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)

store.materialize(
feature_views=None if not views else views,
start_date=utils.make_tzaware(parser.parse(start_ts)),
Expand All @@ -612,10 +567,7 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List

END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)
store.materialize_incremental(
feature_views=None if not views else views,
end_date=utils.make_tzaware(datetime.fromisoformat(end_ts)),
Expand Down Expand Up @@ -707,22 +659,7 @@ def serve_command(
no_feature_log: bool,
):
"""Start a feature server locally on a given port."""
repo = ctx.obj["CHDIR"]

# If we received a base64 encoded version of feature_store.yaml, use that
config_base64 = os.getenv(FEATURE_STORE_YAML_ENV_NAME)
if config_base64:
print("Received base64 encoded feature_store.yaml")
config_bytes = base64.b64decode(config_base64)
# Create a new unique directory for writing feature_store.yaml
repo_path = Path(tempfile.mkdtemp())
with open(repo_path / "feature_store.yaml", "wb") as f:
f.write(config_bytes)
store = FeatureStore(repo_path=str(repo_path.resolve()))
else:
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)

store.serve(host, port, type_, no_access_log, no_feature_log)

Expand All @@ -738,10 +675,7 @@ def serve_command(
@click.pass_context
def serve_transformations_command(ctx: click.Context, port: int):
"""[Experimental] Start a feature consumption server locally on a given port."""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)

store.serve_transformations(port)

Expand Down Expand Up @@ -778,10 +712,7 @@ def validate(

START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
"""
repo = ctx.obj["CHDIR"]
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)
store = create_feature_store(ctx)

feature_service = store.get_feature_service(name=feature_service)
reference = store.get_validation_reference(reference)
Expand Down
24 changes: 24 additions & 0 deletions sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import base64
import importlib
import json
import os
import random
import re
import sys
import tempfile
from importlib.abc import Loader
from importlib.machinery import ModuleSpec
from pathlib import Path
Expand All @@ -14,6 +16,7 @@

from feast import PushSource
from feast.batch_feature_view import BatchFeatureView
from feast.constants import FEATURE_STORE_YAML_ENV_NAME
from feast.data_source import DataSource, KafkaSource, KinesisSource
from feast.diff.registry_diff import extract_objects_for_keep_delete_update_add
from feast.entity import Entity
Expand Down Expand Up @@ -328,6 +331,27 @@ def log_infra_changes(
)


@log_exceptions_and_usage
def create_feature_store(
ctx: click.Context,
) -> FeatureStore:
repo = ctx.obj["CHDIR"]
# If we received a base64 encoded version of feature_store.yaml, use that
config_base64 = os.getenv(FEATURE_STORE_YAML_ENV_NAME)
if config_base64:
print("Received base64 encoded feature_store.yaml")
config_bytes = base64.b64decode(config_base64)
# Create a new unique directory for writing feature_store.yaml
repo_path = Path(tempfile.mkdtemp())
with open(repo_path / "feature_store.yaml", "wb") as f:
f.write(config_bytes)
return FeatureStore(repo_path=str(repo_path.resolve()))
else:
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
return FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file)


@log_exceptions_and_usage
def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool):
os.chdir(repo_path)
Expand Down