Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Initial commit targetting grpc registry server #4458

Merged
merged 10 commits into from
Aug 30, 2024
29 changes: 28 additions & 1 deletion sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, List, Set
from typing import Any, List, Optional, Set

from colorama import Fore, Style

Expand Down Expand Up @@ -419,3 +419,30 @@ def __init__(self, query: str):
class ZeroColumnQueryResult(Exception):
def __init__(self, query: str):
super().__init__(f"This query returned zero columns:\n{query}")


def to_error_detail(error: Exception) -> str:
import json

m = {
"module": f"{type(error).__module__}",
"class": f"{type(error).__name__}",
"message": f"{str(error)}",
}
return json.dumps(m)


def from_error_detail(detail: str) -> Optional[Exception]:
import importlib
import json

m = json.loads(detail)
if all(f in m for f in ["module", "class", "message"]):
module_name = m["module"]
class_name = m["class"]
message = m["message"]
module = importlib.import_module(module_name)
ClassReference = getattr(module, class_name)
instance = ClassReference(message)
return instance
return None
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import grpc

from feast.errors import from_error_detail
from feast.permissions.auth_model import AuthConfig
from feast.permissions.client.auth_client_manager_factory import get_auth_token

Expand All @@ -20,26 +21,31 @@ def __init__(self, auth_type: AuthConfig):
def intercept_unary_unary(
self, continuation, client_call_details, request_iterator
):
client_call_details = self._append_auth_header_metadata(client_call_details)
return continuation(client_call_details, request_iterator)
return self._handle_call(continuation, client_call_details, request_iterator)

def intercept_unary_stream(
self, continuation, client_call_details, request_iterator
):
client_call_details = self._append_auth_header_metadata(client_call_details)
return continuation(client_call_details, request_iterator)
return self._handle_call(continuation, client_call_details, request_iterator)

def intercept_stream_unary(
self, continuation, client_call_details, request_iterator
):
client_call_details = self._append_auth_header_metadata(client_call_details)
return continuation(client_call_details, request_iterator)
return self._handle_call(continuation, client_call_details, request_iterator)

def intercept_stream_stream(
self, continuation, client_call_details, request_iterator
):
return self._handle_call(continuation, client_call_details, request_iterator)

def _handle_call(self, continuation, client_call_details, request_iterator):
client_call_details = self._append_auth_header_metadata(client_call_details)
return continuation(client_call_details, request_iterator)
result = continuation(client_call_details, request_iterator)
if result.exception() is not None:
mapped_error = from_error_detail(result.exception().details())
if mapped_error is not None:
raise mapped_error
return result

def _append_auth_header_metadata(self, client_call_details):
logger.debug(
Expand Down
55 changes: 54 additions & 1 deletion sdk/python/feast/permissions/server/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import grpc

from feast.errors import FeastObjectNotFoundException, to_error_detail
from feast.permissions.auth.auth_manager import (
get_auth_manager,
)
Expand Down Expand Up @@ -31,7 +32,7 @@ def grpc_interceptors(
if auth_type == AuthManagerType.NONE:
return None

return [AuthInterceptor()]
return [AuthInterceptor(), ErrorInterceptor()]
dmartinol marked this conversation as resolved.
Show resolved Hide resolved


class AuthInterceptor(grpc.ServerInterceptor):
Expand All @@ -52,3 +53,55 @@ def intercept_service(self, continuation, handler_call_details):
sm.set_current_user(current_user)

return continuation(handler_call_details)


class ErrorInterceptor(grpc.ServerInterceptor):
def intercept_service(self, continuation, handler_call_details):
def exception_wrapper(behavior, request, context):
try:
return behavior(request, context)
except grpc.RpcError as e:
context.abort(e.code(), e.details())
except Exception as e:
context.abort(
_error_to_status_code(e),
to_error_detail(e),
)

handler = continuation(handler_call_details)
if handler is None:
return None

if handler.unary_unary:
return grpc.unary_unary_rpc_method_handler(
lambda req, ctx: exception_wrapper(handler.unary_unary, req, ctx),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)
elif handler.unary_stream:
return grpc.unary_stream_rpc_method_handler(
lambda req, ctx: exception_wrapper(handler.unary_stream, req, ctx),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)
elif handler.stream_unary:
return grpc.stream_unary_rpc_method_handler(
lambda req, ctx: exception_wrapper(handler.stream_unary, req, ctx),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)
elif handler.stream_stream:
return grpc.stream_stream_rpc_method_handler(
lambda req, ctx: exception_wrapper(handler.stream_stream, req, ctx),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)
return handler


def _error_to_status_code(error: Exception) -> grpc.StatusCode:
if isinstance(error, FeastObjectNotFoundException):
return grpc.StatusCode.NOT_FOUND
if isinstance(error, FeastObjectNotFoundException):
dmartinol marked this conversation as resolved.
Show resolved Hide resolved
return grpc.StatusCode.PERMISSION_DENIED
return grpc.StatusCode.INTERNAL
Loading