diff --git a/llm/fastdeploy_llm/config.py b/llm/fastdeploy_llm/config.py index d78c98952f..47b6f05a61 100644 --- a/llm/fastdeploy_llm/config.py +++ b/llm/fastdeploy_llm/config.py @@ -25,12 +25,18 @@ def __init__(self, model_dir, decode_strategy="sampling", mp_num=None): self.model_dir = model_dir is_static, rank = check_model(model_dir) + self.log_home = os.getenv("LOG_HOME", ".") + fastdeploy_llm.utils.logging_util.warning_logger = Logger( + name="fastDeploy_llm_serving_warning", + log_file=os.path.join(self.log_home, "fastdeploy_llm_serving_warning.log"), + time_rotation=7, + level=logging.DEBUG) if os.getenv("ENABLE_DEBUG_LOG", "0") == "1": logger.info( "Detect enviroment variable `ENABLE_DEBUG_LOG`, all the debug log information will output to fastdeploy_llm_serving.log." ) fastdeploy_llm.utils.logging_util.logger = Logger( - log_file="fastdeploy_llm_serving.log", + log_file=os.path.join(self.log_home, "fastdeploy_llm_serving.log"), time_rotation=7, level=logging.DEBUG) else: @@ -38,7 +44,7 @@ def __init__(self, model_dir, decode_strategy="sampling", mp_num=None): "The logging level is set as INFO, if more information needed, please execute `export ENABLE_DEBUG_LOG=1` before launching service." ) fastdeploy_llm.utils.logging_util.logger = Logger( - log_file="fastdeploy_llm_serving.log", + log_file=os.path.join(self.log_home, "fastdeploy_llm_serving.log"), time_rotation=7, level=logging.INFO) diff --git a/llm/fastdeploy_llm/serving/triton_model.py b/llm/fastdeploy_llm/serving/triton_model.py index d488f483eb..f76d061192 100644 --- a/llm/fastdeploy_llm/serving/triton_model.py +++ b/llm/fastdeploy_llm/serving/triton_model.py @@ -21,7 +21,7 @@ import functools from collections import defaultdict from fastdeploy_llm.serving.serving_model import ServingModel -from fastdeploy_llm.utils.logging_util import logger +from fastdeploy_llm.utils.logging_util import logger, warning_logger from fastdeploy_llm.utils.logging_util import error_format, ErrorCode, ErrorType from fastdeploy_llm.task import Task, BatchTask import fastdeploy_llm as fdlm @@ -33,8 +33,6 @@ pass -tokens_all_dict = defaultdict(list) - def stream_call_back(call_back_task, token_tuple, index, is_last_token, sender): out = dict() @@ -43,17 +41,17 @@ def stream_call_back(call_back_task, token_tuple, index, is_last_token, out["token_ids"] = [token_tuple[0]] out['send_idx'] = index out["is_end"] = is_last_token - tokens_all_dict[call_back_task.task_id].append(token_tuple[1]) out_tensor = pb_utils.Tensor( "OUT", np.array( [json.dumps(out)], dtype=np.object_)) if is_last_token: - logger.info("Model output for req_id: {} results_all: {}".format(call_back_task.task_id, ''.join(tokens_all_dict[call_back_task.task_id]))) + all_token_ids = [t[0] for t in call_back_task.result.completion_tokens] + all_strs = "".join[t[1] for t in call_back_task.result.completion_tokens] + logger.info("Model output for req_id: {} results_all: {} tokens_all: {}".format(call_back_task.task_id, all_strs, all_token_ids)) sender[call_back_task.task_id].send( pb_utils.InferenceResponse([out_tensor]), flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL) del sender[call_back_task.task_id] - del tokens_all_dict[call_back_task.task_id] else: sender[call_back_task.task_id].send( pb_utils.InferenceResponse([out_tensor])) @@ -81,7 +79,7 @@ def initialize(self, args): enable decoupled transaction policy in model configuration to serve this model""".format(args["model_name"]) error_msg = error_format.format(error_type.name, error_code.name, error_info) - logger.error(error_msg) + warning_logger.error(error_msg) raise pb_utils.TritonModelException(error_msg) parameters = self.model_config["parameters"] @@ -127,7 +125,7 @@ def execute(self, requests): error_code = ErrorCode.C0000 error_info = "Cannot load json data from request, received data = {} error={}.".format(request_tensor, e) error_msg = error_format.format(error_type.name, error_code.name, error_info) - logger.error(error_msg) + warning_logger.error(error_msg) error_res = pb_utils.InferenceResponse( error=pb_utils.TritonError(error_msg)) res_sender = request.get_response_sender() @@ -145,7 +143,7 @@ def execute(self, requests): error_code = ErrorCode.C0001 error_info = "There's error while deserializing data from request, received data = {} error={}".format(data, e) error_msg = error_format.format(error_type.name, error_code.name, error_info) - logger.error(error_msg) + warning_logger.error(error_msg) error_res = pb_utils.InferenceResponse( error=pb_utils.TritonError(error_msg)) res_sender = request.get_response_sender() @@ -162,7 +160,7 @@ def execute(self, requests): error_code = ErrorCode.C0001 error_info = "Task id conflict with {}.".format(task.task_id) error_msg = error_format.format(error_type.name, error_code.name, error_info) - logger.error(error_msg) + warning_logger.error(error_msg) error_res = pb_utils.InferenceResponse( error=pb_utils.TritonError(error_msg)) res_sender = request.get_response_sender() @@ -179,7 +177,7 @@ def execute(self, requests): error_code = ErrorCode.C0001 error_info = "There's error while checking task, task={} error={}".format(task, e) error_msg = error_format.format(error_type.name, error_code.name, error_info) - logger.error(error_msg) + warning_logger.error(error_msg) error_res = pb_utils.InferenceResponse( error=pb_utils.TritonError(error_msg)) res_sender = request.get_response_sender() @@ -194,7 +192,7 @@ def execute(self, requests): error_code = ErrorCode.S0000 error_info = "The queue is full now(size={}), please wait for a while.".format(self.model.max_queue_num) error_msg = error_format.format(error_type.name, error_code.name, error_info) - logger.error(error_msg) + warning_logger.error(error_msg) error_res = pb_utils.InferenceResponse(error=pb_utils.TritonError(error_msg)) res_sender = request.get_response_sender() res_sender.send( @@ -227,7 +225,7 @@ def execute(self, requests): error_code = ErrorCode.C0001 error_info = "There's error while inserting new request, task={} error={}".format(task, e) error_msg = error_format.format(error_type.name, error_code.name, error_info) - logger.error(error_msg) + warning_logger.error(error_msg) error_res = pb_utils.InferenceResponse(error=pb_utils.TritonError(error_msg)) res_sender = request.get_response_sender() res_sender.send( @@ -241,7 +239,7 @@ def finalize(self): info_type = ErrorType.Server info_code = ErrorCode.S0002 info_msg = error_format.format(info_type.name, info_code.name, "The triton server is going to terminating...") - logger.info(info_msg) + warning_logger.info(info_msg) self.model.stop() os.system(""" bash -c 'pids=$(ps auxww | grep -E "triton_python_backend_stub|multiprocessing.resource_tracker|engine.py" | grep -v grep | awk '"'"'{print $2}'"'"'); diff --git a/llm/fastdeploy_llm/utils/launch_infer.py b/llm/fastdeploy_llm/utils/launch_infer.py index f4ff54d536..2f23aaed5b 100644 --- a/llm/fastdeploy_llm/utils/launch_infer.py +++ b/llm/fastdeploy_llm/utils/launch_infer.py @@ -46,7 +46,8 @@ def launch(device_ids, **kwargs: dict): pd_cmd = "python3 {} {}".format(infer_script_path, ' '.join(args)) logger.info("Launch model with command: {}".format(pd_cmd)) logger.info("Model is initializing...") - infer_logger = open('modelmatrix/log/infer.log', 'a') + log_home = os.getenv("LOG_HOME", ".") + infer_logger = open('{}/infer.log'.format(log_home), 'a') p = subprocess.Popen( pd_cmd, shell=True, diff --git a/llm/fastdeploy_llm/utils/logging_util.py b/llm/fastdeploy_llm/utils/logging_util.py index feb5986606..5bb569e612 100644 --- a/llm/fastdeploy_llm/utils/logging_util.py +++ b/llm/fastdeploy_llm/utils/logging_util.py @@ -173,3 +173,4 @@ def use_terminator(self, terminator: str) -> Generator[None, None, None]: logger = Logger() +warning_logger = Logger(name="fastDeploy_llm_serving_warning") \ No newline at end of file