Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Initial commit targetting grpc registry server #4458

Merged
merged 10 commits into from
Aug 30, 2024
56 changes: 52 additions & 4 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,69 @@
from typing import Any, List, Set
import importlib
import json
import logging
from typing import Any, List, Optional, Set

from colorama import Fore, Style
from fastapi import status as HttpStatusCode
from grpc import StatusCode as GrpcStatusCode

from feast.field import Field

logger = logging.getLogger(__name__)


class FeastError(Exception):
pass

def rpc_status_code(self) -> GrpcStatusCode:
def grpc_status_code(self) -> GrpcStatusCode:
return GrpcStatusCode.INTERNAL

def http_status_code(self) -> int:
return HttpStatusCode.HTTP_500_INTERNAL_SERVER_ERROR

def __str__(self) -> str:
if hasattr(self, "__overridden_message__"):
return str(getattr(self, "__overridden_message__"))
return super().__str__()

def __repr__(self) -> str:
if hasattr(self, "__overridden_message__"):
return f"{type(self).__name__}('{getattr(self,'__overridden_message__')}')"
return super().__repr__()

def to_error_detail(self) -> str:
"""
Returns a JSON representation of the error for serialization purposes.

Returns:
str: a string representation of a JSON document including `module`, `class` and `message` fields.
"""

m = {
"module": f"{type(self).__module__}",
"class": f"{type(self).__name__}",
"message": f"{str(self)}",
}
return json.dumps(m)

@staticmethod
def from_error_detail(detail: str) -> Optional["FeastError"]:
try:
m = json.loads(detail)
if all(f in m for f in ["module", "class", "message"]):
module_name = m["module"]
class_name = m["class"]
message = m["message"]
module = importlib.import_module(module_name)
class_reference = getattr(module, class_name)

instance = class_reference(message)
setattr(instance, "__overridden_message__", message)
return instance
except Exception as e:
logger.warning(f"Invalid error detail: {detail}: {e}")
return None


class DataSourceNotFoundException(FeastError):
def __init__(self, path):
Expand All @@ -41,7 +89,7 @@ def __init__(self, ds_name: str):
class FeastObjectNotFoundException(FeastError):
pass

def rpc_status_code(self) -> GrpcStatusCode:
def grpc_status_code(self) -> GrpcStatusCode:
return GrpcStatusCode.NOT_FOUND

def http_status_code(self) -> int:
Expand Down Expand Up @@ -443,7 +491,7 @@ class FeastPermissionError(FeastError, PermissionError):
def __init__(self, details: str):
super().__init__(f"Permission error:\n{details}")

def rpc_status_code(self) -> GrpcStatusCode:
def grpc_status_code(self) -> GrpcStatusCode:
return GrpcStatusCode.PERMISSION_DENIED

def http_status_code(self) -> int:
Expand Down
48 changes: 48 additions & 0 deletions sdk/python/feast/grpc_error_interceptor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import grpc

from feast.errors import FeastError


def exception_wrapper(behavior, request, context):
try:
return behavior(request, context)
except grpc.RpcError as e:
context.abort(e.code(), e.details())
except FeastError as e:
context.abort(
e.grpc_status_code(),
e.to_error_detail(),
)


class ErrorInterceptor(grpc.ServerInterceptor):
def intercept_service(self, continuation, handler_call_details):
handler = continuation(handler_call_details)
if handler is None:
return None

if handler.unary_unary:
return grpc.unary_unary_rpc_method_handler(
lambda req, ctx: exception_wrapper(handler.unary_unary, req, ctx),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)
elif handler.unary_stream:
return grpc.unary_stream_rpc_method_handler(
lambda req, ctx: exception_wrapper(handler.unary_stream, req, ctx),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)
elif handler.stream_unary:
return grpc.stream_unary_rpc_method_handler(
lambda req, ctx: exception_wrapper(handler.stream_unary, req, ctx),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)
elif handler.stream_stream:
return grpc.stream_stream_rpc_method_handler(
lambda req, ctx: exception_wrapper(handler.stream_stream, req, ctx),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)
return handler
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import grpc

from feast.errors import FeastError
from feast.permissions.auth_model import AuthConfig
from feast.permissions.client.auth_client_manager_factory import get_auth_token

Expand All @@ -20,26 +21,31 @@ def __init__(self, auth_type: AuthConfig):
def intercept_unary_unary(
self, continuation, client_call_details, request_iterator
):
client_call_details = self._append_auth_header_metadata(client_call_details)
return continuation(client_call_details, request_iterator)
return self._handle_call(continuation, client_call_details, request_iterator)

def intercept_unary_stream(
self, continuation, client_call_details, request_iterator
):
client_call_details = self._append_auth_header_metadata(client_call_details)
return continuation(client_call_details, request_iterator)
return self._handle_call(continuation, client_call_details, request_iterator)

def intercept_stream_unary(
self, continuation, client_call_details, request_iterator
):
client_call_details = self._append_auth_header_metadata(client_call_details)
return continuation(client_call_details, request_iterator)
return self._handle_call(continuation, client_call_details, request_iterator)

def intercept_stream_stream(
self, continuation, client_call_details, request_iterator
):
return self._handle_call(continuation, client_call_details, request_iterator)

def _handle_call(self, continuation, client_call_details, request_iterator):
client_call_details = self._append_auth_header_metadata(client_call_details)
return continuation(client_call_details, request_iterator)
result = continuation(client_call_details, request_iterator)
if result.exception() is not None:
mapped_error = FeastError.from_error_detail(result.exception().details())
if mapped_error is not None:
raise mapped_error
return result

def _append_auth_header_metadata(self, client_call_details):
logger.debug(
Expand Down
22 changes: 0 additions & 22 deletions sdk/python/feast/permissions/server/grpc.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,17 @@
import asyncio
import logging
from typing import Optional

import grpc

from feast.permissions.auth.auth_manager import (
get_auth_manager,
)
from feast.permissions.security_manager import get_security_manager
from feast.permissions.server.utils import (
AuthManagerType,
)

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


def grpc_interceptors(
auth_type: AuthManagerType,
) -> Optional[list[grpc.ServerInterceptor]]:
"""
A list of the authorization interceptors.

Args:
auth_type: The type of authorization manager, from the feature store configuration.

Returns:
list[grpc.ServerInterceptor]: Optional list of interceptors. If the authorization type is set to `NONE`, it returns `None`.
"""
if auth_type == AuthManagerType.NONE:
return None

return [AuthInterceptor()]


class AuthInterceptor(grpc.ServerInterceptor):
def intercept_service(self, continuation, handler_call_details):
sm = get_security_manager()
Expand Down
26 changes: 23 additions & 3 deletions sdk/python/feast/registry_server.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from concurrent import futures
from datetime import datetime, timezone
from typing import Union, cast
from typing import Optional, Union, cast

import grpc
from google.protobuf.empty_pb2 import Empty
Expand All @@ -13,6 +13,7 @@
from feast.errors import FeatureViewNotFoundException
from feast.feast_object import FeastObject
from feast.feature_view import FeatureView
from feast.grpc_error_interceptor import ErrorInterceptor
from feast.infra.infra_object import Infra
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
Expand All @@ -23,8 +24,9 @@
assert_permissions_to_update,
permitted_resources,
)
from feast.permissions.server.grpc import grpc_interceptors
from feast.permissions.server.grpc import AuthInterceptor
from feast.permissions.server.utils import (
AuthManagerType,
ServerType,
init_auth_manager,
init_security_manager,
Expand Down Expand Up @@ -645,7 +647,7 @@ def start_server(store: FeatureStore, port: int, wait_for_termination: bool = Tr

server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=grpc_interceptors(auth_manager_type),
interceptors=_grpc_interceptors(auth_manager_type),
)
RegistryServer_pb2_grpc.add_RegistryServerServicer_to_server(
RegistryServer(store.registry), server
Expand All @@ -668,3 +670,21 @@ def start_server(store: FeatureStore, port: int, wait_for_termination: bool = Tr
server.wait_for_termination()
else:
return server


def _grpc_interceptors(
auth_type: AuthManagerType,
) -> Optional[list[grpc.ServerInterceptor]]:
"""
A list of the interceptors for the registry server.

Args:
auth_type: The type of authorization manager, from the feature store configuration.

Returns:
list[grpc.ServerInterceptor]: Optional list of interceptors. If the authorization type is set to `NONE`, it returns `None`.
"""
if auth_type == AuthManagerType.NONE:
return [ErrorInterceptor()]

return [AuthInterceptor(), ErrorInterceptor()]
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
from feast import (
FeatureStore,
)
from feast.errors import (
EntityNotFoundException,
FeastPermissionError,
FeatureViewNotFoundException,
)
from feast.permissions.permission import Permission
from feast.registry_server import start_server
from feast.wait import wait_retry_backoff # noqa: E402
Expand Down Expand Up @@ -70,7 +75,9 @@ def test_registry_apis(
print(f"Running for\n:{auth_config}")
remote_feature_store = get_remote_registry_store(server_port, feature_store)
permissions = _test_list_permissions(remote_feature_store, applied_permissions)
_test_get_entity(remote_feature_store, applied_permissions)
_test_list_entities(remote_feature_store, applied_permissions)
_test_get_fv(remote_feature_store, applied_permissions)
_test_list_fvs(remote_feature_store, applied_permissions)

if _permissions_exist_in_permission_list(
Expand Down Expand Up @@ -118,6 +125,20 @@ def _test_get_historical_features(client_fs: FeatureStore):
assertpy.assert_that(training_df).is_not_none()


def _test_get_entity(client_fs: FeatureStore, permissions: list[Permission]):
if not _is_auth_enabled(client_fs) or _is_permission_enabled(
client_fs, permissions, read_entities_perm
):
entity = client_fs.get_entity("driver")
assertpy.assert_that(entity).is_not_none()
assertpy.assert_that(entity.name).is_equal_to("driver")
else:
with pytest.raises(FeastPermissionError):
client_fs.get_entity("driver")
with pytest.raises(EntityNotFoundException):
client_fs.get_entity("invalid-name")


def _test_list_entities(client_fs: FeatureStore, permissions: list[Permission]):
entities = client_fs.list_entities()

Expand Down Expand Up @@ -188,6 +209,20 @@ def _is_auth_enabled(client_fs: FeatureStore) -> bool:
return client_fs.config.auth_config.type != "no_auth"


def _test_get_fv(client_fs: FeatureStore, permissions: list[Permission]):
if not _is_auth_enabled(client_fs) or _is_permission_enabled(
client_fs, permissions, read_fv_perm
):
fv = client_fs.get_feature_view("driver_hourly_stats")
assertpy.assert_that(fv).is_not_none()
assertpy.assert_that(fv.name).is_equal_to("driver_hourly_stats")
else:
with pytest.raises(FeastPermissionError):
client_fs.get_feature_view("driver_hourly_stats")
with pytest.raises(FeatureViewNotFoundException):
client_fs.get_feature_view("invalid-name")


def _test_list_fvs(client_fs: FeatureStore, permissions: list[Permission]):
if _is_auth_enabled(client_fs) and _permissions_exist_in_permission_list(
[invalid_list_entities_perm], permissions
Expand Down
26 changes: 26 additions & 0 deletions sdk/python/tests/unit/test_errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import re

import assertpy

import feast.errors as errors


def test_error_error_detail():
e = errors.FeatureViewNotFoundException("abc")

d = e.to_error_detail()

assertpy.assert_that(d).is_not_none()
assertpy.assert_that(d).contains('"module": "feast.errors"')
assertpy.assert_that(d).contains('"class": "FeatureViewNotFoundException"')
assertpy.assert_that(re.search(r"abc", d)).is_true()

converted_e = errors.FeastError.from_error_detail(d)
assertpy.assert_that(converted_e).is_not_none()
assertpy.assert_that(str(converted_e)).is_equal_to(str(e))
assertpy.assert_that(repr(converted_e)).is_equal_to(repr(e))


def test_invalid_error_error_detail():
e = errors.FeastError.from_error_detail("invalid")
assertpy.assert_that(e).is_none()
Loading