-
Notifications
You must be signed in to change notification settings - Fork 832
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Health and Status endpoints to grpc spec #1387
Comments
I was thinking about this issue when working on #1447 as that would also have the same problem (ie not having these basic endpoints), including other endpoints like Also thinking through it, I realised that even if the microservice sets up a GRPC or KAFKA (or any other) server, the expectation would be that these basic endpoints are still consumable through rest (ie prometheus won't be consuming When trying to solve this, the first idea was to leverage the already existing functionality in the python I implemented this, and it successfully ran the KAFKA server, together with a REST server (in port 8000) which exposed def get_rest_basic_endpoints(user_model):
app = Flask(__name__, static_url_path="")
CORS(app)
_set_flask_app_configs(app)
if hasattr(user_model, "model_error_handler"):
logger.info("Registering the custom error handler...")
app.register_blueprint(user_model.model_error_handler)
@app.errorhandler(SeldonMicroserviceException)
def handle_invalid_usage(error):
response = jsonify(error.to_dict())
logger.error("%s", error.to_dict())
response.status_code = error.status_code
return response
@app.route("/seldon.json", methods=["GET"])
def openAPI():
return send_from_directory("", "openapi/seldon.json")
@app.route("/health/ping", methods=["GET"])
def HealthPing():
"""
Lightweight endpoint to check the liveness of the REST endpoint
"""
return "pong"
@app.route("/health/status", methods=["GET"])
def HealthStatus():
logger.debug("REST Health Status Request")
response = seldon_core.seldon_methods.health_status(user_model)
logger.debug("REST Health Status Response: %s", response)
return jsonify(response)
@app.route("/metadata", methods=["GET"])
def Metadata():
logger.debug("REST Metadata Request")
response = seldon_core.seldon_methods.metadata(user_model)
logger.debug("REST Metadata Response: %s", response)
return jsonify(response)
return app Then this new secondary server was started by adding a new code block to add this to if hasattr(user_object, "custom_service") and callable(
getattr(user_object, "custom_service")
):
server2_func = user_object.custom_service
elif args.api_type != "REST":
def rest_prediction_server():
options = {
"bind": "%s:%s" % ("0.0.0.0", 8000),
"access_logfile": "-",
"loglevel": "info",
"timeout": 5000,
"reload": "true",
"workers": args.workers,
"max_requests": args.max_requests,
"max_requests_jitter": args.max_requests_jitter,
}
app = seldon_microservice.get_rest_basic_endpoints(user_object)
StandaloneApplication(app, user_object, options=options).run()
logger.info("REST gunicorn microservice running on port %i", 8000)
server2_func = rest_prediction_server
else:
server2_func = None This would then successfully start the GRPC or KAFKA servers and concurrently it will also start the REST service. This main problem with this approach is that we now would have two processes running which although point to the same class, they no longer are able to share the state of the object, and hence it would also be a problem because:
As a quick-fix we could decide that the The other alternative is to think how this data could be passed through in a different way, such as through multiprocessing.Array or even Redis (such as implementing a What's your thoughts? |
As a small update, it seems that the multiprocessing thread-safe object approach may not be too much of a bad idea. I was able to implement metadata with a multiprocessing.Manager Dictionary, which seems to work pretty well. The implementation required the changes outlined below. First I had to modify the def start_servers(target1: Callable, target2: Callable) -> None:
"""
Start servers
Parameters
----------
target1
Main flask process
target2
Auxilary flask process
"""
manager = mp.Manager()
metadata_dict = manager.dict()
p2 = mp.Process(target=target2, args=(metadata_dict,))
p2.daemon = True
p2.start()
target1(metadata_dict)
p2.join() Now I had to just ensure in the server functions I actually assign this thread safe dictionary. This was done as follows. For the elif args.api_type != "REST":
def basic_rest_prediction_server(metadata_dict):
options = {
"bind": "%s:%s" % ("0.0.0.0", 8000),
"access_logfile": "-",
"loglevel": "info",
"timeout": 5000,
"reload": "true",
"workers": args.workers,
"max_requests": args.max_requests,
"max_requests_jitter": args.max_requests_jitter,
}
user_object.metadata_dict = metadata_dict
app = seldon_microservice.get_rest_basic_endpoints(user_object)
StandaloneApplication(app, user_object, options=options).run()
logger.info("REST gunicorn microservice running on port %i", port)
server2_func = basic_rest_prediction_server And for the Kafka / GRPC servers: elif args.api_type == "GRPC":
def grpc_prediction_server(metadata_dict):
user_object.metadata_dict = metadata_dict
if args.tracing:
from grpc_opentracing import open_tracing_server_interceptor
logger.info("Adding tracer")
interceptor = open_tracing_server_interceptor(tracer)
else:
interceptor = None
server = seldon_microservice.get_grpc_server(
user_object, annotations=annotations, trace_interceptor=interceptor
)
try:
user_object.load()
except (NotImplementedError, AttributeError):
pass
server.add_insecure_port(f"{host}:{port}")
server.start()
logger.info("GRPC microservice Running on port %i", port)
while True:
time.sleep(1000)
server1_func = grpc_prediction_server
elif args.api_type == "KAFKA":
def kafka_prediction_server(metadata_dict):
if args.tracing:
from grpc_opentracing import open_tracing_server_interceptor
logger.info("Adding tracer")
interceptor = open_tracing_server_interceptor(tracer)
else:
interceptor = None
user_object.metadata_dict = metadata_dict
kafka_worker = seldon_microservice.get_kafka_worker(
user_object,
log_level=args.log_level,
tracing=interceptor,
host=host,
port=port,
)
try:
user_object.load()
except (NotImplementedError, AttributeError):
pass
kafka_worker.execute_from_commandline() What this means, is that now the user does not need to implement a metadata() function, and instead they only have to modify the class StreamingModel:
def __init__(self):
print("INITIALIZING STREAMINGMODEL")
def predict(self, data, names=[], meta=[]):
print(f"Inside predict: data [{data}] names [{names}] meta [{meta}]")
if not "count" in self.metadata_dict:
self.metadata_dict["count"] = 0
else:
self.metadata_dict["count"] += 1
return data
# This function would not be necessary once we implement it in our side
def metadata(self):
return dict(self.metadata_dict) |
One important thing to note however is that this would require a similar approach for the executor, especially on the metadata endpoint (which is currently being explored though #1362), as arguably when an external system wants to query metadata information of the model, they may want to query the metadata of the deployment/pod as a whole as opposed to querying the metadata of a container inside the pod. But something that would be worth thinking about when implementing this approach. |
Will be available in V2 dataplane in grpc spec. |
These are present in the python wrapper API but we need to consolidate the grpc and REST APIs.
Also connected to #1362
The text was updated successfully, but these errors were encountered: