From cd32055f117b5b867d1f116d9023e5c21f8d229c Mon Sep 17 00:00:00 2001 From: letonghan Date: Wed, 24 Apr 2024 17:45:08 +0800 Subject: [PATCH] base classes of microservice Signed-off-by: letonghan --- GenAIComps/mega/async_loop.py | 179 ++++++++++++++++++++++++++++++++ GenAIComps/mega/base_service.py | 149 ++++++++++++++++++++++++++ GenAIComps/mega/logger.py | 68 ++++++++++++ GenAIComps/mega/service.py | 178 ++++++++++++++++++++++++++++++- GenAIComps/mega/utils.py | 28 +++++ 5 files changed, 601 insertions(+), 1 deletion(-) create mode 100644 GenAIComps/mega/async_loop.py create mode 100644 GenAIComps/mega/base_service.py create mode 100644 GenAIComps/mega/logger.py create mode 100644 GenAIComps/mega/utils.py diff --git a/GenAIComps/mega/async_loop.py b/GenAIComps/mega/async_loop.py new file mode 100644 index 0000000000..206e185a2a --- /dev/null +++ b/GenAIComps/mega/async_loop.py @@ -0,0 +1,179 @@ +import asyncio +import signal +import time +from typing import Optional, Dict +from logger import CustomLogger +from utils import check_ports_availability + + +# Define the signals that will be handled by the AsyncLoop class +HANDLED_SIGNALS = ( + signal.SIGINT, # Unix signal 2. Sent by Ctrl+C. + signal.SIGTERM, # Unix signal 15. Sent by `kill `. + signal.SIGSEGV, # Unix signal 11. Caused by an invalid memory reference. +) + + +class AsyncLoop: + """ + Async loop to run a microservice asynchronously. + This class is designed to handle the running of a microservice in an asynchronous manner. + It sets up an event loop and handles certain signals to gracefully stop the service. + """ + + def __init__(self, args: Optional[Dict] = None) -> None: + """ + Initialize the AsyncLoop class. + This method sets up the initial state of the AsyncLoop, including setting up the event loop and signal handlers. + """ + self.args = args + if args.get('name', None): + self.name = f'{args.get("name")}/{self.__class__.__name__}' + else: + self.name = self.__class__.__name__ + self.protocol = args.get('protocol', 'http') + self.host = args.get('host', 'localhost') + self.port = args.get('port', 8080) + self.quiet_error = args.get('quiet_error', False) + self.logger = CustomLogger(self.name) + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + self.is_cancel = asyncio.Event() + self.logger.info(f'Setting signal handlers') + + def _cancel(signum, frame): + """ + Signal handler for the AsyncLoop class. + This method is called when a signal is received. It sets the is_cancel event to stop the event loop. + """ + self.logger.info(f'Received signal {signum}') + self.is_cancel.set(), + + for sig in HANDLED_SIGNALS: + signal.signal(sig, _cancel) + + self._start_time = time.time() + self._loop.run_until_complete(self.async_setup()) + + def run_forever(self): + """ + Running method to block the main thread. + This method runs the event loop until a Future is done. It is designed to be called in the main thread to keep it busy. + """ + self._loop.run_until_complete(self._loop_body()) + + def teardown(self): + """ + Call async_teardown() and stop and close the event loop. + This method is responsible for tearing down the event loop. It first calls the async_teardown method to perform + any necessary cleanup, then it stops and closes the event loop. It also logs the duration of the event loop. + """ + self._loop.run_until_complete(self.async_teardown()) + self._loop.stop() + self._loop.close() + self._stop_time = time.time() + self.logger.info(f"Async loop is tore down. Duration: {self._stop_time - self._start_time}") + + def _get_server(self): + """ + Get the server instance based on the protocol. + This method currently only supports HTTP services. It creates an instance of the HTTPService class with the + necessary arguments. + In the future, it will also support gRPC services. + """ + if self.protocol.lower() == 'http': + from http_service import HTTPService + + runtime_args = self.args.get('runtime_args', None) + runtime_args['protocol'] = self.protocol + runtime_args['host'] = self.host + runtime_args['port'] = self.port + return HTTPService( + uvicorn_kwargs=self.args.get('uvicorn_kwargs', None), + runtime_args=runtime_args, + cors=self.args.get('cors', None), + ) + + async def async_setup(self): + """ + The async method setup the runtime. + This method is responsible for setting up the server. It first checks if the port is available, then it gets + the server instance and initializes it. + """ + if not (check_ports_availability(self.host, self.port)): + raise RuntimeError(f'port:{self.port}') + + self.server = self._get_server() + await self.server.initialize_server() + + async def async_run_forever(self): + """ + Running method of the server. + """ + await self.server.execute_server() + + async def async_teardown(self): + """ + Terminate the server. + """ + await self.server.terminate_server() + + async def _wait_for_cancel(self): + """ + Wait for the cancellation event. + This method waits for the is_cancel event to be set. If the server has a _should_exit attribute, it will also + wait for that to be True. Once either of these conditions is met, it will call the async_teardown method. + """ + if isinstance(self.is_cancel, asyncio.Event) and not hasattr( + self.server, '_should_exit' + ): + await self.is_cancel.wait() + else: + while not self.is_cancel.is_set() and not getattr( + self.server, '_should_exit', False + ): + await asyncio.sleep(0.1) + + await self.async_teardown() + + async def _loop_body(self): + """ + The main body of the event loop. + This method runs the async_run_forever and _wait_for_cancel methods concurrently. If a CancelledError is raised, + it logs a warning message. + """ + try: + await asyncio.gather(self.async_run_forever(), self._wait_for_cancel()) + except asyncio.CancelledError: + self.logger.warning('received terminate ctrl message from main process') + + def __enter__(self): + """ + Enter method for the context manager. + This method simply returns the instance itself. + """ + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """ + Exit method for the context manager. + This method handles any exceptions that occurred within the context. If a KeyboardInterrupt was raised, it logs + an info message. If any other exception was raised, it logs an error message. Finally, it attempts to call the + teardown method. If an OSError is raised during this, it is ignored. Any other exceptions are logged as errors. + """ + if exc_type == KeyboardInterrupt: + self.logger.info(f'{self!r} is interrupted by user') + elif exc_type and issubclass(exc_type, Exception): + self.logger.error( + ( + f'{exc_val!r} during {self.run_forever!r}' + + f'\n add "--quiet-error" to suppress the exception details' + if not self.quiet_error + else '' + ), + exc_info=not self.quiet_error, + ) + else: + self.logger.info(f'{self!r} is ended') + + return True diff --git a/GenAIComps/mega/base_service.py b/GenAIComps/mega/base_service.py new file mode 100644 index 0000000000..932a932893 --- /dev/null +++ b/GenAIComps/mega/base_service.py @@ -0,0 +1,149 @@ +import abc +from logger import CustomLogger +from types import SimpleNamespace +from typing import Dict, Optional, TYPE_CHECKING + + +__all__ = ['BaseServer'] + + +class BaseService(): + """ + BaseService creates a HTTP/gRPC server as a microservice. + """ + + def __init__( + self, + name: Optional[str] = 'Base service', + runtime_args: Optional[Dict] = None, + **kwargs, + ): + """ + Initialize the BaseService with a name, runtime arguments, and any additional arguments. + """ + self.name = name + self.runtime_args = runtime_args + self._process_runtime_args() + self.title = self.runtime_args.title + self.description = self.runtime_args.description + self.logger = CustomLogger(self.name) + self.server = None + + def _process_runtime_args(self): + """ + Process the runtime arguments to ensure they are in the correct format. + """ + _runtime_args = ( + self.runtime_args + if isinstance(self.runtime_args, dict) + else vars(self.runtime_args or {}) + ) + self.runtime_args = SimpleNamespace(**_runtime_args) + + @property + def primary_port(self): + """ + Gets the first port of the port list argument. + :return: The first port to be exposed + """ + return ( + self.runtime_args.port[0] + if isinstance(self.runtime_args.port, list) + else self.runtime_args.port + ) + + @property + def all_ports(self): + """ + Gets all the list of ports from the runtime_args as a list. + :return: The lists of ports to be exposed + """ + return ( + self.runtime_args.port + if isinstance(self.runtime_args.port, list) + else [self.runtime_args.port] + ) + + @property + def protocols(self): + """ + Gets all the list of protocols from the runtime_args as a list. + :return: The lists of protocols to be exposed + """ + return ( + self.runtime_args.protocol + if isinstance(self.runtime_args.protocol, list) + else [self.runtime_args.protocol] + ) + + @property + def host_address(self): + """ + Gets the host from the runtime_args + :return: The host where to bind the gateway + """ + return self.runtime_args.host or '127.0.0.1' + + @abc.abstractmethod + async def initialize_server(self): + """ + Abstract method to setup the server. This should be implemented in the child class. + """ + ... + + @abc.abstractmethod + async def execute_server(self): + """ + Abstract method to run the server indefinitely. This should be implemented in the child class. + """ + ... + + @abc.abstractmethod + async def terminate_server(self): + """ + Abstract method to shutdown the server and free other allocated resources, e.g, health check service, etc. + This should be implemented in the child class. + """ + ... + + @staticmethod + def check_server_readiness( + ctrl_address: str, + protocol: Optional[str] = 'http', + **kwargs, + ) -> bool: + """ + Check if server status is ready. + :param ctrl_address: the address where the control request needs to be sent. + :param protocol: protocol of the service. + :param kwargs: extra keyword arguments. + :return: True if status is ready else False. + """ + from http_service import HTTPService + res = False + if protocol is None or protocol == 'http': + res = HTTPService.check_readiness(ctrl_address) + return res + + + @staticmethod + async def async_check_server_readiness( + ctrl_address: str, + protocol: Optional[str] = 'grpc', + **kwargs, + ) -> bool: + """ + Asynchronously check if server status is ready. + :param ctrl_address: the address where the control request needs to be sent. + :param protocol: protocol of the service. + :param kwargs: extra keyword arguments. + :return: True if status is ready else False. + """ + if TYPE_CHECKING: + from http_service import HTTPService + res = False + if protocol is None or protocol == 'http': + res = await HTTPService.async_check_readiness(ctrl_address) + return res + + diff --git a/GenAIComps/mega/logger.py b/GenAIComps/mega/logger.py new file mode 100644 index 0000000000..3502c69a61 --- /dev/null +++ b/GenAIComps/mega/logger.py @@ -0,0 +1,68 @@ +import logging +import functools +from typing import Callable + + +class CustomLogger: + """ + A custom logger class that adds additional logging levels. + """ + + def __init__(self, name: str = None): + """ + Initialize the logger with a name and custom levels. + """ + name = 'GenAIComps' if not name else name + self.logger = logging.getLogger(name) + + # Define custom log levels + log_config = { + 'DEBUG': 10, + 'INFO': 20, + 'TRAIN': 21, + 'EVAL': 22, + 'WARNING': 30, + 'ERROR': 40, + 'CRITICAL': 50, + 'EXCEPTION': 100, + } + + # Add custom levels to logger + for key, level in log_config.items(): + logging.addLevelName(level, key) + if key == 'EXCEPTION': + self.__dict__[key.lower()] = self.logger.exception + else: + self.__dict__[key.lower()] = functools.partial(self.log_message, level) + + # Set up log format and handler + self.format = logging.Formatter(fmt='[%(asctime)-15s] [%(levelname)8s] - %(message)s') + self.handler = logging.StreamHandler() + self.handler.setFormatter(self.format) + + # Add handler to logger and set log level + self.logger.addHandler(self.handler) + self.logger.setLevel(logging.INFO) + self.logger.propagate = False + + def log_message(self, log_level: str, msg: str): + """ + Log a message at a given level. + + :param log_level: The level at which to log the message. + :param msg: The message to log. + """ + self.logger.log(log_level, msg) + + # Define type hints for pylint check + debug: Callable[[str], None] + info: Callable[[str], None] + train: Callable[[str], None] + eval: Callable[[str], None] + warning: Callable[[str], None] + error: Callable[[str], None] + critical: Callable[[str], None] + exception: Callable[[str], None] + + +logger = CustomLogger() \ No newline at end of file diff --git a/GenAIComps/mega/service.py b/GenAIComps/mega/service.py index 2b5791a2b7..b6f2cafa76 100644 --- a/GenAIComps/mega/service.py +++ b/GenAIComps/mega/service.py @@ -1 +1,177 @@ -# microservice base class +from typing import Optional +from base_service import BaseService +from fastapi import FastAPI +from uvicorn import Config, Server + + +class HTTPService(BaseService): + """ + FastAPI HTTP service based on BaseService class. This property should return a fastapi app. + """ + + def __init__( + self, + uvicorn_kwargs: Optional[dict] = None, + cors: Optional[bool] = False, + **kwargs, + ): + """ + Initialize the HTTPService + :param uvicorn_kwargs: Dictionary of kwargs arguments that will be passed to Uvicorn server when starting the server + :param cors: If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access. + :param kwargs: keyword args + """ + super().__init__(**kwargs) + self.uvicorn_kwargs = uvicorn_kwargs or {} + self.cors = cors + self._app = self._create_app() + + @property + def app(self): + """ + Get the default base API app for Server + :return: Return a FastAPI app for the default HTTPGateway + """ + return self._app + + def _create_app(self): + """ + Create a FastAPI application. + :return: a FastAPI application. + """ + app = FastAPI(title=self.title, description=self.description) + + if self.cors: + from fastapi.middleware.cors import CORSMiddleware + app.add_middleware( + CORSMiddleware, + allow_origins=['*'], + allow_credentials=True, + allow_methods=['*'], + allow_headers=['*'], + ) + self.logger.info('CORS is enabled.') + + @app.get( + path='/v1/health_check', + summary='Get the status of GenAI microservice', + tags=['Debug'], + ) + async def _health_check(): + """ + Get the health status of this GenAI microservice. + """ + return { + 'Service Title': self.title, + 'Service Description': self.description + } + + return app + + async def initialize_server(self): + """ + Initialize and return HTTP server + """ + self.logger.info(f'Setting up HTTP server') + + + class UviServer(Server): + """ + The uvicorn server. + """ + + async def setup_server(self, sockets=None): + """ + Setup uvicorn server. + :param sockets: sockets of server. + """ + config = self.config + if not config.loaded: + config.load() + self.lifespan = config.lifespan_class(config) + await self.startup(sockets=sockets) + if self.should_exit: + return + + async def start_server(self, **kwargs): + """ + Start the server. + :param kwargs: keyword arguments + """ + await self.main_loop() + + app = self.app + + self.server = UviServer( + config=Config( + app=app, + host=self.host_address, + port=self.primary_port, + log_level='info', + **self.uvicorn_kwargs, + ) + ) + self.logger.info(f'Uvicorn server setup on port {self.primary_port}') + await self.server.setup_server() + self.logger.info(f'HTTP server setup successful') + + async def execute_server(self): + """ + Run the HTTP server indefinitely. + """ + await self.server.start_server() + + async def terminate_server(self): + """ + Terminate the HTTP server and free resources allocated when setting up the server. + """ + self.logger.info('Initiating server termination') + await super().shutdown() + self.server.should_exit = True + await self.server.shutdown() + self.logger.info('Server termination completed') + + @staticmethod + def check_server_readiness( + ctrl_address: str, + timeout: float = 1.0, + logger=None, + **kwargs + ) -> bool: + """ + Check if server status is ready. + :param ctrl_address: the address where the control request needs to be sent + :param timeout: timeout of the health check in seconds + :param logger: Customized Logger to be used + :param kwargs: extra keyword arguments + :return: True if status is ready else False. + """ + import urllib.request + from http import HTTPStatus + + try: + conn = urllib.request.urlopen(url=f'http://{ctrl_address}', timeout=timeout) + return conn.code == HTTPStatus.OK + except Exception as exc: + if logger: + logger.info(f'Exception: {exc}') + + return False + + @staticmethod + async def async_check_server_readiness( + ctrl_address: str, + timeout: float = 1.0, + logger=None, + **kwargs + ) -> bool: + """ + Asynchronously check if server status is ready. + :param ctrl_address: the address where the control request needs to be sent + :param timeout: timeout of the health check in seconds + :param logger: Customized Logger to be used + :param kwargs: extra keyword arguments + :return: True if status is ready else False. + """ + return HTTPService.check_server_readiness(ctrl_address, timeout, logger=logger) + diff --git a/GenAIComps/mega/utils.py b/GenAIComps/mega/utils.py new file mode 100644 index 0000000000..39baa014cf --- /dev/null +++ b/GenAIComps/mega/utils.py @@ -0,0 +1,28 @@ +from socket import AF_INET, SOCK_STREAM, socket +from typing import List, Union + + +def _is_port_free(host: str, port: int) -> bool: + """ + Check if a given port on a host is free. + + :param host: The host to check. + :param port: The port to check. + :return: True if the port is free, False otherwise. + """ + with socket(AF_INET, SOCK_STREAM) as session: + return session.connect_ex((host, port)) != 0 + + +def check_ports_availability(host: Union[str, List[str]], port: Union[int, List[int]]) -> bool: + """ + Check if one or more ports on one or more hosts are free. + + :param host: The host(s) to check. + :param port: The port(s) to check. + :return: True if all ports on all hosts are free, False otherwise. + """ + hosts = [host] if isinstance(host, str) else host + ports = [port] if isinstance(port, int) else port + + return all(_is_port_free(h, p) for h in hosts for p in ports) \ No newline at end of file