Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change the feast serve endpoint to be sync rather than async. #2119

Merged
merged 1 commit into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,14 +508,17 @@ def init_command(project_directory, minimal: bool, template: str):
default=6566,
help="Specify a port for the server [default: 6566]",
)
@click.option(
"--no-access-log", is_flag=True, help="Disable the Uvicorn access log.",
)
@click.pass_context
def serve_command(ctx: click.Context, host: str, port: int):
def serve_command(ctx: click.Context, host: str, port: int, no_access_log: bool):
"""[Experimental] Start a the feature consumption server locally on a given port."""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))

store.serve(host, port)
store.serve(host, port, no_access_log)


@cli.command("serve_transformations")
Expand Down
17 changes: 12 additions & 5 deletions sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import uvicorn
from fastapi import FastAPI, HTTPException, Request
from fastapi.logger import logger
from fastapi.params import Depends
from google.protobuf.json_format import MessageToDict, Parse

import feast
Expand All @@ -15,17 +16,21 @@ def get_app(store: "feast.FeatureStore"):

app = FastAPI()

async def get_body(request: Request):
return await request.body()

@app.post("/get-online-features")
async def get_online_features(request: Request):
def get_online_features(body=Depends(get_body)):
try:
# Validate and parse the request data into GetOnlineFeaturesRequest Protobuf object
body = await request.body()
request_proto = GetOnlineFeaturesRequest()
Parse(body, request_proto)

# Initialize parameters for FeatureStore.get_online_features(...) call
if request_proto.HasField("feature_service"):
features = store.get_feature_service(request_proto.feature_service)
features = store.get_feature_service(
request_proto.feature_service, allow_cache=True
)
else:
features = list(request_proto.features.val)

Expand Down Expand Up @@ -61,11 +66,13 @@ async def get_online_features(request: Request):
return app


def start_server(store: "feast.FeatureStore", host: str, port: int):
def start_server(
store: "feast.FeatureStore", host: str, port: int, no_access_log: bool
):
app = get_app(store)
click.echo(
"This is an "
+ click.style("experimental", fg="yellow", bold=True, underline=True)
+ " feature. It's intended for early testing and feedback, and could change without warnings in future releases."
)
uvicorn.run(app, host=host, port=port)
uvicorn.run(app, host=host, port=port, access_log=(not no_access_log))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @nossrannug , thanks for your PR. But I'm confused, what's the benefit of moving to the synchronous interface if we'll continue to use uvicorn as our server and at the end of the day we'll have a single-threaded app.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you use some other server in your tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @pyalex,
We're not running a single-threaded app. FastAPI supports mixing async and sync methods(endpoints) as much as you want and it will take care of running it correctly. When a method is async it will be executed using the event loop, but when a method is synchronous it will be executed in a thread pool.

sync:
image

async:
image

21 changes: 14 additions & 7 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,9 @@ def get_entity(self, name: str) -> Entity:
return self._registry.get_entity(name, self.project)

@log_exceptions_and_usage
def get_feature_service(self, name: str) -> FeatureService:
def get_feature_service(
self, name: str, allow_cache: bool = False
) -> FeatureService:
"""
Retrieves a feature service.

Expand All @@ -301,7 +303,7 @@ def get_feature_service(self, name: str) -> FeatureService:
Raises:
FeatureServiceNotFoundException: The feature service could not be found.
"""
return self._registry.get_feature_service(name, self.project)
return self._registry.get_feature_service(name, self.project, allow_cache)

@log_exceptions_and_usage
def get_feature_view(self, name: str) -> FeatureView:
Expand Down Expand Up @@ -369,14 +371,19 @@ def delete_feature_service(self, name: str):
"""
return self._registry.delete_feature_service(name, self.project)

def _get_features(self, features: Union[List[str], FeatureService],) -> List[str]:
def _get_features(
self, features: Union[List[str], FeatureService], allow_cache: bool = False,
) -> List[str]:
_features = features

if not _features:
raise ValueError("No features specified for retrieval")

_feature_refs = []
if isinstance(_features, FeatureService):
feature_service_from_registry = self.get_feature_service(_features.name)
feature_service_from_registry = self.get_feature_service(
_features.name, allow_cache
)
if feature_service_from_registry != _features:
warnings.warn(
"The FeatureService object that has been passed in as an argument is"
Expand Down Expand Up @@ -1040,7 +1047,7 @@ def get_online_features(
... )
>>> online_response_dict = online_response.to_dict()
"""
_feature_refs = self._get_features(features)
_feature_refs = self._get_features(features, allow_cache=True)
(
requested_feature_views,
requested_request_feature_views,
Expand Down Expand Up @@ -1455,12 +1462,12 @@ def _get_feature_views_to_use(
return views_to_use

@log_exceptions_and_usage
def serve(self, host: str, port: int) -> None:
def serve(self, host: str, port: int, no_access_log: bool) -> None:
"""Start the feature consumption server locally on a given port."""
if not flags_helper.enable_python_feature_server(self.config):
raise ExperimentalFeatureNotEnabled(flags.FLAG_PYTHON_FEATURE_SERVER_NAME)

feature_server.start_server(self, host, port)
feature_server.start_server(self, host, port, no_access_log)

@log_exceptions_and_usage
def get_feature_server_endpoint(self) -> Optional[str]:
Expand Down
46 changes: 25 additions & 21 deletions sdk/python/feast/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from collections import defaultdict
from datetime import datetime, timedelta
from pathlib import Path
from threading import Lock
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse

Expand Down Expand Up @@ -101,7 +102,6 @@ class Registry:
cached_registry_proto: Optional[RegistryProto] = None
cached_registry_proto_created: Optional[datetime] = None
cached_registry_proto_ttl: timedelta
cache_being_updated: bool = False

def __init__(
self, registry_config: Optional[RegistryConfig], repo_path: Optional[Path]
Expand All @@ -115,6 +115,8 @@ def __init__(
or where it will be created if it does not exist yet.
"""

self._refresh_lock = Lock()

if registry_config:
registry_store_type = registry_config.registry_store_type
registry_path = registry_config.path
Expand Down Expand Up @@ -325,6 +327,7 @@ def get_feature_service(
Args:
name: Name of feature service
project: Feast project that this feature service belongs to
allow_cache: Whether to allow returning this feature service from a cached registry

Returns:
Returns either the specified feature service, or raises an exception if
Expand All @@ -347,6 +350,7 @@ def get_entity(self, name: str, project: str, allow_cache: bool = False) -> Enti
Args:
name: Name of entity
project: Feast project that this entity belongs to
allow_cache: Whether to allow returning this entity from a cached registry

Returns:
Returns either the specified entity, or raises an exception if
Expand Down Expand Up @@ -419,8 +423,8 @@ def list_on_demand_feature_views(
Retrieve a list of on demand feature views from the registry

Args:
allow_cache: Whether to allow returning on demand feature views from a cached registry
project: Filter on demand feature views based on project name
allow_cache: Whether to allow returning on demand feature views from a cached registry

Returns:
List of on demand feature views
Expand Down Expand Up @@ -733,31 +737,31 @@ def _get_registry_proto(self, allow_cache: bool = False) -> RegistryProto:

Returns: Returns a RegistryProto object which represents the state of the registry
"""
expired = (
self.cached_registry_proto is None
or self.cached_registry_proto_created is None
) or (
self.cached_registry_proto_ttl.total_seconds() > 0 # 0 ttl means infinity
and (
datetime.now()
> (self.cached_registry_proto_created + self.cached_registry_proto_ttl)
with self._refresh_lock:
expired = (
self.cached_registry_proto is None
or self.cached_registry_proto_created is None
) or (
self.cached_registry_proto_ttl.total_seconds()
> 0 # 0 ttl means infinity
and (
datetime.now()
> (
self.cached_registry_proto_created
+ self.cached_registry_proto_ttl
)
)
)
)

if allow_cache and (not expired or self.cache_being_updated):
pyalex marked this conversation as resolved.
Show resolved Hide resolved
assert isinstance(self.cached_registry_proto, RegistryProto)
return self.cached_registry_proto
if allow_cache and not expired:
assert isinstance(self.cached_registry_proto, RegistryProto)
return self.cached_registry_proto

try:
self.cache_being_updated = True
registry_proto = self._registry_store.get_registry_proto()
self.cached_registry_proto = registry_proto
self.cached_registry_proto_created = datetime.now()
except Exception as e:
raise e
finally:
self.cache_being_updated = False
return registry_proto

return registry_proto

def _check_conflicting_feature_view_names(self, feature_view: BaseFeatureView):
name_to_fv_protos = self._existing_feature_view_names_to_fvs()
Expand Down