From 01db8cce6f82d4c6e496041351fb6b56eb2645b0 Mon Sep 17 00:00:00 2001 From: Harry Date: Wed, 1 Nov 2023 01:14:06 +0700 Subject: [PATCH] feat: Add get online feature rpc to gprc server (#3815) Signed-off-by: Hai Nguyen --- protos/feast/serving/GrpcServer.proto | 11 +-- protos/feast/serving/ServingService.proto | 2 + sdk/python/feast/cli.py | 11 ++- sdk/python/feast/infra/contrib/grpc_server.py | 69 ++++++++++++++++--- sdk/python/feast/type_map.py | 17 +++-- 5 files changed, 90 insertions(+), 20 deletions(-) diff --git a/protos/feast/serving/GrpcServer.proto b/protos/feast/serving/GrpcServer.proto index cd0274c5c7..34edb4ebe9 100644 --- a/protos/feast/serving/GrpcServer.proto +++ b/protos/feast/serving/GrpcServer.proto @@ -1,5 +1,7 @@ syntax = "proto3"; +import "feast/serving/ServingService.proto"; + message PushRequest { map features = 1; string stream_feature_view = 2; @@ -8,7 +10,7 @@ message PushRequest { } message PushResponse { - bool status = 1; + bool status = 1; } message WriteToOnlineStoreRequest { @@ -18,10 +20,11 @@ message WriteToOnlineStoreRequest { } message WriteToOnlineStoreResponse { - bool status = 1; + bool status = 1; } service GrpcFeatureServer { - rpc Push (PushRequest) returns (PushResponse) {}; - rpc WriteToOnlineStore (WriteToOnlineStoreRequest) returns (WriteToOnlineStoreResponse); + rpc Push (PushRequest) returns (PushResponse) {}; + rpc WriteToOnlineStore (WriteToOnlineStoreRequest) returns (WriteToOnlineStoreResponse); + rpc GetOnlineFeatures (feast.serving.GetOnlineFeaturesRequest) returns (feast.serving.GetOnlineFeaturesResponse); } \ No newline at end of file diff --git a/protos/feast/serving/ServingService.proto b/protos/feast/serving/ServingService.proto index 0eef3cd883..154d850099 100644 --- a/protos/feast/serving/ServingService.proto +++ b/protos/feast/serving/ServingService.proto @@ -105,6 +105,8 @@ message GetOnlineFeaturesResponse { repeated FieldStatus statuses = 2; repeated google.protobuf.Timestamp event_timestamps = 3; } + + bool status = 3; } message GetOnlineFeaturesResponseMetadata { diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 3153f02e51..4cce8855f4 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -705,15 +705,24 @@ def serve_command( show_default=False, help="The maximum number of threads that can be used to execute the gRPC calls", ) +@click.option( + "--registry_ttl_sec", + "-r", + help="Number of seconds after which the registry is refreshed", + type=click.INT, + default=5, + show_default=True, +) @click.pass_context def listen_command( ctx: click.Context, address: str, max_workers: int, + registry_ttl_sec: int, ): """Start a gRPC feature server to ingest streaming features on given address""" store = create_feature_store(ctx) - server = get_grpc_server(address, store, max_workers) + server = get_grpc_server(address, store, max_workers, registry_ttl_sec) server.start() server.wait_for_termination() diff --git a/sdk/python/feast/infra/contrib/grpc_server.py b/sdk/python/feast/infra/contrib/grpc_server.py index 2017f1095b..27ac45e77c 100644 --- a/sdk/python/feast/infra/contrib/grpc_server.py +++ b/sdk/python/feast/infra/contrib/grpc_server.py @@ -1,12 +1,14 @@ import logging +import threading from concurrent import futures +from typing import Optional import grpc import pandas as pd from grpc_health.v1 import health, health_pb2_grpc from feast.data_source import PushMode -from feast.errors import PushSourceNotFoundException +from feast.errors import FeatureServiceNotFoundException, PushSourceNotFoundException from feast.feature_store import FeatureStore from feast.protos.feast.serving.GrpcServer_pb2 import ( PushResponse, @@ -16,6 +18,12 @@ GrpcFeatureServerServicer, add_GrpcFeatureServerServicer_to_server, ) +from feast.protos.feast.serving.ServingService_pb2 import ( + GetOnlineFeaturesRequest, + GetOnlineFeaturesResponse, +) + +logger = logging.getLogger(__name__) def parse(features): @@ -28,10 +36,16 @@ def parse(features): class GrpcFeatureServer(GrpcFeatureServerServicer): fs: FeatureStore - def __init__(self, fs: FeatureStore): + _shuting_down: bool = False + _active_timer: Optional[threading.Timer] = None + + def __init__(self, fs: FeatureStore, registry_ttl_sec: int = 5): self.fs = fs + self.registry_ttl_sec = registry_ttl_sec super().__init__() + self._async_refresh() + def Push(self, request, context): try: df = parse(request.features) @@ -53,19 +67,19 @@ def Push(self, request, context): to=to, ) except PushSourceNotFoundException as e: - logging.exception(str(e)) + logger.exception(str(e)) context.set_code(grpc.StatusCode.INVALID_ARGUMENT) context.set_details(str(e)) return PushResponse(status=False) except Exception as e: - logging.exception(str(e)) + logger.exception(str(e)) context.set_code(grpc.StatusCode.INTERNAL) context.set_details(str(e)) return PushResponse(status=False) return PushResponse(status=True) def WriteToOnlineStore(self, request, context): - logging.warning( + logger.warning( "write_to_online_store is deprecated. Please consider using Push instead" ) try: @@ -76,16 +90,55 @@ def WriteToOnlineStore(self, request, context): allow_registry_cache=request.allow_registry_cache, ) except Exception as e: - logging.exception(str(e)) + logger.exception(str(e)) context.set_code(grpc.StatusCode.INTERNAL) context.set_details(str(e)) return PushResponse(status=False) return WriteToOnlineStoreResponse(status=True) + def GetOnlineFeatures(self, request: GetOnlineFeaturesRequest, context): + if request.HasField("feature_service"): + logger.info(f"Requesting feature service: {request.feature_service}") + try: + features = self.fs.get_feature_service( + request.feature_service, allow_cache=True + ) + except FeatureServiceNotFoundException as e: + logger.error(f"Feature service {request.feature_service} not found") + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(str(e)) + return GetOnlineFeaturesResponse() + else: + features = list(request.features.val) + + result = self.fs._get_online_features( + features, + request.entities, + request.full_feature_names, + ).proto + + return result + + def _async_refresh(self): + self.fs.refresh_registry() + if self._shuting_down: + return + self._active_timer = threading.Timer(self.registry_ttl_sec, self._async_refresh) + self._active_timer.start() -def get_grpc_server(address: str, fs: FeatureStore, max_workers: int): + +def get_grpc_server( + address: str, + fs: FeatureStore, + max_workers: int, + registry_ttl_sec: int, +): + logger.info(f"Initializing gRPC server on {address}") server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers)) - add_GrpcFeatureServerServicer_to_server(GrpcFeatureServer(fs), server) + add_GrpcFeatureServerServicer_to_server( + GrpcFeatureServer(fs, registry_ttl_sec=registry_ttl_sec), + server, + ) health_servicer = health.HealthServicer( experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=max_workers), diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index 3f49069066..cdb65f886e 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -428,12 +428,15 @@ def _python_value_to_proto_value( for value in values ] if feast_value_type in PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE: - return [ - ProtoValue(**{field_name: func(value)}) - if not pd.isnull(value) - else ProtoValue() - for value in values - ] + out = [] + for value in values: + if isinstance(value, ProtoValue): + out.append(value) + elif not pd.isnull(value): + out.append(ProtoValue(**{field_name: func(value)})) + else: + out.append(ProtoValue()) + return out raise Exception(f"Unsupported data type: ${str(type(values[0]))}") @@ -746,7 +749,7 @@ def spark_to_feast_value_type(spark_type_as_str: str) -> ValueType: "array": ValueType.UNIX_TIMESTAMP_LIST, } # TODO: Find better way of doing this. - if type(spark_type_as_str) != str or spark_type_as_str not in type_map: + if not isinstance(spark_type_as_str, str) or spark_type_as_str not in type_map: return ValueType.NULL return type_map[spark_type_as_str.lower()]