Skip to content

Commit

Permalink
feat: Add get online feature rpc to gprc server (feast-dev#3815)
Browse files Browse the repository at this point in the history
Signed-off-by: Hai Nguyen <quanghai.ng1512@gmail.com>
  • Loading branch information
sudohainguyen committed Oct 31, 2023
1 parent 0151961 commit 01db8cc
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 20 deletions.
11 changes: 7 additions & 4 deletions protos/feast/serving/GrpcServer.proto
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
syntax = "proto3";

import "feast/serving/ServingService.proto";

message PushRequest {
map<string, string> features = 1;
string stream_feature_view = 2;
Expand All @@ -8,7 +10,7 @@ message PushRequest {
}

message PushResponse {
bool status = 1;
bool status = 1;
}

message WriteToOnlineStoreRequest {
Expand All @@ -18,10 +20,11 @@ message WriteToOnlineStoreRequest {
}

message WriteToOnlineStoreResponse {
bool status = 1;
bool status = 1;
}

service GrpcFeatureServer {
rpc Push (PushRequest) returns (PushResponse) {};
rpc WriteToOnlineStore (WriteToOnlineStoreRequest) returns (WriteToOnlineStoreResponse);
rpc Push (PushRequest) returns (PushResponse) {};
rpc WriteToOnlineStore (WriteToOnlineStoreRequest) returns (WriteToOnlineStoreResponse);
rpc GetOnlineFeatures (feast.serving.GetOnlineFeaturesRequest) returns (feast.serving.GetOnlineFeaturesResponse);
}
2 changes: 2 additions & 0 deletions protos/feast/serving/ServingService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ message GetOnlineFeaturesResponse {
repeated FieldStatus statuses = 2;
repeated google.protobuf.Timestamp event_timestamps = 3;
}

bool status = 3;
}

message GetOnlineFeaturesResponseMetadata {
Expand Down
11 changes: 10 additions & 1 deletion sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -705,15 +705,24 @@ def serve_command(
show_default=False,
help="The maximum number of threads that can be used to execute the gRPC calls",
)
@click.option(
"--registry_ttl_sec",
"-r",
help="Number of seconds after which the registry is refreshed",
type=click.INT,
default=5,
show_default=True,
)
@click.pass_context
def listen_command(
ctx: click.Context,
address: str,
max_workers: int,
registry_ttl_sec: int,
):
"""Start a gRPC feature server to ingest streaming features on given address"""
store = create_feature_store(ctx)
server = get_grpc_server(address, store, max_workers)
server = get_grpc_server(address, store, max_workers, registry_ttl_sec)
server.start()
server.wait_for_termination()

Expand Down
69 changes: 61 additions & 8 deletions sdk/python/feast/infra/contrib/grpc_server.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import logging
import threading
from concurrent import futures
from typing import Optional

import grpc
import pandas as pd
from grpc_health.v1 import health, health_pb2_grpc

from feast.data_source import PushMode
from feast.errors import PushSourceNotFoundException
from feast.errors import FeatureServiceNotFoundException, PushSourceNotFoundException
from feast.feature_store import FeatureStore
from feast.protos.feast.serving.GrpcServer_pb2 import (
PushResponse,
Expand All @@ -16,6 +18,12 @@
GrpcFeatureServerServicer,
add_GrpcFeatureServerServicer_to_server,
)
from feast.protos.feast.serving.ServingService_pb2 import (
GetOnlineFeaturesRequest,
GetOnlineFeaturesResponse,
)

logger = logging.getLogger(__name__)


def parse(features):
Expand All @@ -28,10 +36,16 @@ def parse(features):
class GrpcFeatureServer(GrpcFeatureServerServicer):
fs: FeatureStore

def __init__(self, fs: FeatureStore):
_shuting_down: bool = False
_active_timer: Optional[threading.Timer] = None

def __init__(self, fs: FeatureStore, registry_ttl_sec: int = 5):
self.fs = fs
self.registry_ttl_sec = registry_ttl_sec
super().__init__()

self._async_refresh()

def Push(self, request, context):
try:
df = parse(request.features)
Expand All @@ -53,19 +67,19 @@ def Push(self, request, context):
to=to,
)
except PushSourceNotFoundException as e:
logging.exception(str(e))
logger.exception(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details(str(e))
return PushResponse(status=False)
except Exception as e:
logging.exception(str(e))
logger.exception(str(e))
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(str(e))
return PushResponse(status=False)
return PushResponse(status=True)

def WriteToOnlineStore(self, request, context):
logging.warning(
logger.warning(
"write_to_online_store is deprecated. Please consider using Push instead"
)
try:
Expand All @@ -76,16 +90,55 @@ def WriteToOnlineStore(self, request, context):
allow_registry_cache=request.allow_registry_cache,
)
except Exception as e:
logging.exception(str(e))
logger.exception(str(e))
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(str(e))
return PushResponse(status=False)
return WriteToOnlineStoreResponse(status=True)

def GetOnlineFeatures(self, request: GetOnlineFeaturesRequest, context):
if request.HasField("feature_service"):
logger.info(f"Requesting feature service: {request.feature_service}")
try:
features = self.fs.get_feature_service(
request.feature_service, allow_cache=True
)
except FeatureServiceNotFoundException as e:
logger.error(f"Feature service {request.feature_service} not found")
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(str(e))
return GetOnlineFeaturesResponse()
else:
features = list(request.features.val)

result = self.fs._get_online_features(
features,
request.entities,
request.full_feature_names,
).proto

return result

def _async_refresh(self):
self.fs.refresh_registry()
if self._shuting_down:
return
self._active_timer = threading.Timer(self.registry_ttl_sec, self._async_refresh)
self._active_timer.start()

def get_grpc_server(address: str, fs: FeatureStore, max_workers: int):

def get_grpc_server(
address: str,
fs: FeatureStore,
max_workers: int,
registry_ttl_sec: int,
):
logger.info(f"Initializing gRPC server on {address}")
server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers))
add_GrpcFeatureServerServicer_to_server(GrpcFeatureServer(fs), server)
add_GrpcFeatureServerServicer_to_server(
GrpcFeatureServer(fs, registry_ttl_sec=registry_ttl_sec),
server,
)
health_servicer = health.HealthServicer(
experimental_non_blocking=True,
experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=max_workers),
Expand Down
17 changes: 10 additions & 7 deletions sdk/python/feast/type_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,12 +428,15 @@ def _python_value_to_proto_value(
for value in values
]
if feast_value_type in PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE:
return [
ProtoValue(**{field_name: func(value)})
if not pd.isnull(value)
else ProtoValue()
for value in values
]
out = []
for value in values:
if isinstance(value, ProtoValue):
out.append(value)
elif not pd.isnull(value):
out.append(ProtoValue(**{field_name: func(value)}))
else:
out.append(ProtoValue())
return out

raise Exception(f"Unsupported data type: ${str(type(values[0]))}")

Expand Down Expand Up @@ -746,7 +749,7 @@ def spark_to_feast_value_type(spark_type_as_str: str) -> ValueType:
"array<timestamp>": ValueType.UNIX_TIMESTAMP_LIST,
}
# TODO: Find better way of doing this.
if type(spark_type_as_str) != str or spark_type_as_str not in type_map:
if not isinstance(spark_type_as_str, str) or spark_type_as_str not in type_map:
return ValueType.NULL
return type_map[spark_type_as_str.lower()]

Expand Down

0 comments on commit 01db8cc

Please sign in to comment.