Skip to content

Commit

Permalink
fix: Added Online Store REST client errors handler (feast-dev#4488)
Browse files Browse the repository at this point in the history
* Added Online Store rest client errors handler

Signed-off-by: Theodor Mihalache <tmihalac@redhat.com>

* Added Online Store rest client errors handler
- Small refactor to from_error_detail and FeastErrors
- Fixed tests

Signed-off-by: Theodor Mihalache <tmihalac@redhat.com>

* Added Online Store rest client errors handler
- Fixed linter

Signed-off-by: Theodor Mihalache <tmihalac@redhat.com>

---------

Signed-off-by: Theodor Mihalache <tmihalac@redhat.com>
  • Loading branch information
tmihalac authored Sep 5, 2024
1 parent 2bd03fa commit 2118719
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 171 deletions.
5 changes: 4 additions & 1 deletion sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def from_error_detail(detail: str) -> Optional["FeastError"]:
module = importlib.import_module(module_name)
class_reference = getattr(module, class_name)

instance = class_reference(message)
instance = class_reference.__new__(class_reference)
setattr(instance, "__overridden_message__", message)
return instance
except Exception as e:
Expand Down Expand Up @@ -451,6 +451,9 @@ class PushSourceNotFoundException(FeastError):
def __init__(self, push_source_name: str):
super().__init__(f"Unable to find push source '{push_source_name}'.")

def http_status_code(self) -> int:
return HttpStatusCode.HTTP_422_UNPROCESSABLE_ENTITY


class ReadOnlyRegistryException(FeastError):
def __init__(self):
Expand Down
278 changes: 129 additions & 149 deletions sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
import pandas as pd
import psutil
from dateutil import parser
from fastapi import Depends, FastAPI, HTTPException, Request, Response, status
from fastapi import Depends, FastAPI, Request, Response, status
from fastapi.logger import logger
from fastapi.responses import JSONResponse
from google.protobuf.json_format import MessageToDict
from prometheus_client import Gauge, start_http_server
from pydantic import BaseModel
Expand All @@ -19,7 +20,10 @@
from feast import proto_json, utils
from feast.constants import DEFAULT_FEATURE_SERVER_REGISTRY_TTL
from feast.data_source import PushMode
from feast.errors import FeatureViewNotFoundException, PushSourceNotFoundException
from feast.errors import (
FeastError,
FeatureViewNotFoundException,
)
from feast.permissions.action import WRITE, AuthzedAction
from feast.permissions.security_manager import assert_permissions
from feast.permissions.server.rest import inject_user_details
Expand Down Expand Up @@ -101,187 +105,163 @@ async def lifespan(app: FastAPI):
async def get_body(request: Request):
return await request.body()

# TODO RBAC: complete the dependencies for the other endpoints
@app.post(
"/get-online-features",
dependencies=[Depends(inject_user_details)],
)
def get_online_features(body=Depends(get_body)):
try:
body = json.loads(body)
full_feature_names = body.get("full_feature_names", False)
entity_rows = body["entities"]
# Initialize parameters for FeatureStore.get_online_features(...) call
if "feature_service" in body:
feature_service = store.get_feature_service(
body["feature_service"], allow_cache=True
body = json.loads(body)
full_feature_names = body.get("full_feature_names", False)
entity_rows = body["entities"]
# Initialize parameters for FeatureStore.get_online_features(...) call
if "feature_service" in body:
feature_service = store.get_feature_service(
body["feature_service"], allow_cache=True
)
assert_permissions(
resource=feature_service, actions=[AuthzedAction.READ_ONLINE]
)
features = feature_service
else:
features = body["features"]
all_feature_views, all_on_demand_feature_views = (
utils._get_feature_views_to_use(
store.registry,
store.project,
features,
allow_cache=True,
hide_dummy_entity=False,
)
)
for feature_view in all_feature_views:
assert_permissions(
resource=feature_service, actions=[AuthzedAction.READ_ONLINE]
resource=feature_view, actions=[AuthzedAction.READ_ONLINE]
)
features = feature_service
else:
features = body["features"]
all_feature_views, all_on_demand_feature_views = (
utils._get_feature_views_to_use(
store.registry,
store.project,
features,
allow_cache=True,
hide_dummy_entity=False,
)
for od_feature_view in all_on_demand_feature_views:
assert_permissions(
resource=od_feature_view, actions=[AuthzedAction.READ_ONLINE]
)
for feature_view in all_feature_views:
assert_permissions(
resource=feature_view, actions=[AuthzedAction.READ_ONLINE]
)
for od_feature_view in all_on_demand_feature_views:
assert_permissions(
resource=od_feature_view, actions=[AuthzedAction.READ_ONLINE]
)

response_proto = store.get_online_features(
features=features,
entity_rows=entity_rows,
full_feature_names=full_feature_names,
).proto

# Convert the Protobuf object to JSON and return it
return MessageToDict(
response_proto, preserving_proto_field_name=True, float_precision=18
)
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))

response_proto = store.get_online_features(
features=features,
entity_rows=entity_rows,
full_feature_names=full_feature_names,
).proto

# Convert the Protobuf object to JSON and return it
return MessageToDict(
response_proto, preserving_proto_field_name=True, float_precision=18
)

@app.post("/push", dependencies=[Depends(inject_user_details)])
def push(body=Depends(get_body)):
try:
request = PushFeaturesRequest(**json.loads(body))
df = pd.DataFrame(request.df)
actions = []
if request.to == "offline":
to = PushMode.OFFLINE
actions = [AuthzedAction.WRITE_OFFLINE]
elif request.to == "online":
to = PushMode.ONLINE
actions = [AuthzedAction.WRITE_ONLINE]
elif request.to == "online_and_offline":
to = PushMode.ONLINE_AND_OFFLINE
actions = WRITE
else:
raise ValueError(
f"{request.to} is not a supported push format. Please specify one of these ['online', 'offline', 'online_and_offline']."
)

from feast.data_source import PushSource
request = PushFeaturesRequest(**json.loads(body))
df = pd.DataFrame(request.df)
actions = []
if request.to == "offline":
to = PushMode.OFFLINE
actions = [AuthzedAction.WRITE_OFFLINE]
elif request.to == "online":
to = PushMode.ONLINE
actions = [AuthzedAction.WRITE_ONLINE]
elif request.to == "online_and_offline":
to = PushMode.ONLINE_AND_OFFLINE
actions = WRITE
else:
raise ValueError(
f"{request.to} is not a supported push format. Please specify one of these ['online', 'offline', 'online_and_offline']."
)

all_fvs = store.list_feature_views(
allow_cache=request.allow_registry_cache
) + store.list_stream_feature_views(
allow_cache=request.allow_registry_cache
from feast.data_source import PushSource

all_fvs = store.list_feature_views(
allow_cache=request.allow_registry_cache
) + store.list_stream_feature_views(allow_cache=request.allow_registry_cache)
fvs_with_push_sources = {
fv
for fv in all_fvs
if (
fv.stream_source is not None
and isinstance(fv.stream_source, PushSource)
and fv.stream_source.name == request.push_source_name
)
fvs_with_push_sources = {
fv
for fv in all_fvs
if (
fv.stream_source is not None
and isinstance(fv.stream_source, PushSource)
and fv.stream_source.name == request.push_source_name
)
}
}

for feature_view in fvs_with_push_sources:
assert_permissions(resource=feature_view, actions=actions)
for feature_view in fvs_with_push_sources:
assert_permissions(resource=feature_view, actions=actions)

store.push(
push_source_name=request.push_source_name,
df=df,
allow_registry_cache=request.allow_registry_cache,
to=to,
)
except PushSourceNotFoundException as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=422, detail=str(e))
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))
store.push(
push_source_name=request.push_source_name,
df=df,
allow_registry_cache=request.allow_registry_cache,
to=to,
)

@app.post("/write-to-online-store", dependencies=[Depends(inject_user_details)])
def write_to_online_store(body=Depends(get_body)):
request = WriteToFeatureStoreRequest(**json.loads(body))
df = pd.DataFrame(request.df)
feature_view_name = request.feature_view_name
allow_registry_cache = request.allow_registry_cache
try:
request = WriteToFeatureStoreRequest(**json.loads(body))
df = pd.DataFrame(request.df)
feature_view_name = request.feature_view_name
allow_registry_cache = request.allow_registry_cache
try:
feature_view = store.get_stream_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
except FeatureViewNotFoundException:
feature_view = store.get_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)

assert_permissions(
resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE]
feature_view = store.get_stream_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
store.write_to_online_store(
feature_view_name=feature_view_name,
df=df,
allow_registry_cache=allow_registry_cache,
except FeatureViewNotFoundException:
feature_view = store.get_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))

assert_permissions(resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE])
store.write_to_online_store(
feature_view_name=feature_view_name,
df=df,
allow_registry_cache=allow_registry_cache,
)

@app.get("/health")
def health():
return Response(status_code=status.HTTP_200_OK)

@app.post("/materialize", dependencies=[Depends(inject_user_details)])
def materialize(body=Depends(get_body)):
try:
request = MaterializeRequest(**json.loads(body))
for feature_view in request.feature_views:
assert_permissions(
resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE]
)
store.materialize(
utils.make_tzaware(parser.parse(request.start_ts)),
utils.make_tzaware(parser.parse(request.end_ts)),
request.feature_views,
request = MaterializeRequest(**json.loads(body))
for feature_view in request.feature_views:
assert_permissions(
resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE]
)
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))
store.materialize(
utils.make_tzaware(parser.parse(request.start_ts)),
utils.make_tzaware(parser.parse(request.end_ts)),
request.feature_views,
)

@app.post("/materialize-incremental", dependencies=[Depends(inject_user_details)])
def materialize_incremental(body=Depends(get_body)):
try:
request = MaterializeIncrementalRequest(**json.loads(body))
for feature_view in request.feature_views:
assert_permissions(
resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE]
)
store.materialize_incremental(
utils.make_tzaware(parser.parse(request.end_ts)), request.feature_views
request = MaterializeIncrementalRequest(**json.loads(body))
for feature_view in request.feature_views:
assert_permissions(
resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE]
)
store.materialize_incremental(
utils.make_tzaware(parser.parse(request.end_ts)), request.feature_views
)

@app.exception_handler(Exception)
async def rest_exception_handler(request: Request, exc: Exception):
# Print the original exception on the server side
logger.exception(traceback.format_exc())

if isinstance(exc, FeastError):
return JSONResponse(
status_code=exc.http_status_code(),
content=exc.to_error_detail(),
)
else:
return JSONResponse(
status_code=500,
content=str(exc),
)
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))

return app

Expand Down
18 changes: 12 additions & 6 deletions sdk/python/feast/infra/online_stores/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@
from datetime import datetime
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple

import requests
from pydantic import StrictStr

from feast import Entity, FeatureView, RepoConfig
from feast.infra.online_stores.online_store import OnlineStore
from feast.permissions.client.http_auth_requests_wrapper import (
get_http_auth_requests_session,
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel
from feast.rest_error_handler import rest_error_handling_decorator
from feast.type_map import python_values_to_proto_values
from feast.value_type import ValueType

Expand Down Expand Up @@ -72,9 +71,7 @@ def online_read(
req_body = self._construct_online_read_api_json_request(
entity_keys, table, requested_features
)
response = get_http_auth_requests_session(config.auth_config).post(
f"{config.online_store.path}/get-online-features", data=req_body
)
response = get_remote_online_features(config=config, req_body=req_body)
if response.status_code == 200:
logger.debug("Able to retrieve the online features from feature server.")
response_json = json.loads(response.text)
Expand Down Expand Up @@ -167,3 +164,12 @@ def teardown(
entities: Sequence[Entity],
):
pass


@rest_error_handling_decorator
def get_remote_online_features(
session: requests.Session, config: RepoConfig, req_body: str
) -> requests.Response:
return session.post(
f"{config.online_store.path}/get-online-features", data=req_body
)
Loading

0 comments on commit 2118719

Please sign in to comment.