Skip to content

Commit

Permalink
feat: Add materialize and materialize-incremental rest endpoints (fea…
Browse files Browse the repository at this point in the history
…st-dev#3761)

* resolve feast-dev#3760

Signed-off-by: snowron <snowronark@gmail.com>

* format feature_server.py

Signed-off-by: snowron <snowronark@gmail.com>

---------

Signed-off-by: snowron <snowronark@gmail.com>
  • Loading branch information
snowron committed Sep 12, 2023
1 parent 709c709 commit fa600fe
Showing 1 changed file with 42 additions and 1 deletion.
43 changes: 42 additions & 1 deletion sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import json
import traceback
import warnings
from typing import List, Optional

import gunicorn.app.base
import pandas as pd
from dateutil import parser
from fastapi import FastAPI, HTTPException, Request, Response, status
from fastapi.logger import logger
from fastapi.params import Depends
from google.protobuf.json_format import MessageToDict, Parse
from pydantic import BaseModel

import feast
from feast import proto_json
from feast import proto_json, utils
from feast.data_source import PushMode
from feast.errors import PushSourceNotFoundException
from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesRequest
Expand All @@ -31,6 +33,17 @@ class PushFeaturesRequest(BaseModel):
to: str = "online"


class MaterializeRequest(BaseModel):
start_ts: str
end_ts: str
feature_views: Optional[List[str]] = None


class MaterializeIncrementalRequest(BaseModel):
end_ts: str
feature_views: Optional[List[str]] = None


def get_app(store: "feast.FeatureStore"):
proto_json.patch()

Expand Down Expand Up @@ -134,6 +147,34 @@ def write_to_online_store(body=Depends(get_body)):
def health():
return Response(status_code=status.HTTP_200_OK)

@app.post("/materialize")
def materialize(body=Depends(get_body)):
try:
request = MaterializeRequest(**json.loads(body))
store.materialize(
utils.make_tzaware(parser.parse(request.start_ts)),
utils.make_tzaware(parser.parse(request.end_ts)),
request.feature_views,
)
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))

@app.post("/materialize-incremental")
def materialize_incremental(body=Depends(get_body)):
try:
request = MaterializeIncrementalRequest(**json.loads(body))
store.materialize_incremental(
utils.make_tzaware(parser.parse(request.end_ts)), request.feature_views
)
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))

return app


Expand Down

0 comments on commit fa600fe

Please sign in to comment.