Skip to content

Commit

Permalink
Add detailed log for llm service
Browse files Browse the repository at this point in the history
  • Loading branch information
rainyfly committed Dec 14, 2023
1 parent 5cd3968 commit 73b5af2
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 17 deletions.
10 changes: 8 additions & 2 deletions llm/fastdeploy_llm/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,26 @@ def __init__(self, model_dir, decode_strategy="sampling", mp_num=None):
self.model_dir = model_dir
is_static, rank = check_model(model_dir)

self.log_home = os.getenv("LOG_HOME", ".")
fastdeploy_llm.utils.logging_util.warning_logger = Logger(
name="fastDeploy_llm_serving_warning",
log_file=os.path.join(self.log_home, "fastdeploy_llm_serving_warning.log"),
time_rotation=7,
level=logging.DEBUG)
if os.getenv("ENABLE_DEBUG_LOG", "0") == "1":
logger.info(
"Detect enviroment variable `ENABLE_DEBUG_LOG`, all the debug log information will output to fastdeploy_llm_serving.log."
)
fastdeploy_llm.utils.logging_util.logger = Logger(
log_file="fastdeploy_llm_serving.log",
log_file=os.path.join(self.log_home, "fastdeploy_llm_serving.log"),
time_rotation=7,
level=logging.DEBUG)
else:
logger.info(
"The logging level is set as INFO, if more information needed, please execute `export ENABLE_DEBUG_LOG=1` before launching service."
)
fastdeploy_llm.utils.logging_util.logger = Logger(
log_file="fastdeploy_llm_serving.log",
log_file=os.path.join(self.log_home, "fastdeploy_llm_serving.log"),
time_rotation=7,
level=logging.INFO)

Expand Down
26 changes: 12 additions & 14 deletions llm/fastdeploy_llm/serving/triton_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import functools
from collections import defaultdict
from fastdeploy_llm.serving.serving_model import ServingModel
from fastdeploy_llm.utils.logging_util import logger
from fastdeploy_llm.utils.logging_util import logger, warning_logger
from fastdeploy_llm.utils.logging_util import error_format, ErrorCode, ErrorType
from fastdeploy_llm.task import Task, BatchTask
import fastdeploy_llm as fdlm
Expand All @@ -33,8 +33,6 @@
pass


tokens_all_dict = defaultdict(list)

def stream_call_back(call_back_task, token_tuple, index, is_last_token,
sender):
out = dict()
Expand All @@ -43,17 +41,17 @@ def stream_call_back(call_back_task, token_tuple, index, is_last_token,
out["token_ids"] = [token_tuple[0]]
out['send_idx'] = index
out["is_end"] = is_last_token
tokens_all_dict[call_back_task.task_id].append(token_tuple[1])
out_tensor = pb_utils.Tensor(
"OUT", np.array(
[json.dumps(out)], dtype=np.object_))
if is_last_token:
logger.info("Model output for req_id: {} results_all: {}".format(call_back_task.task_id, ''.join(tokens_all_dict[call_back_task.task_id])))
all_token_ids = [t[0] for t in call_back_task.result.completion_tokens]
all_strs = "".join[t[1] for t in call_back_task.result.completion_tokens]
logger.info("Model output for req_id: {} results_all: {} tokens_all: {}".format(call_back_task.task_id, all_strs, all_token_ids))
sender[call_back_task.task_id].send(
pb_utils.InferenceResponse([out_tensor]),
flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL)
del sender[call_back_task.task_id]
del tokens_all_dict[call_back_task.task_id]
else:
sender[call_back_task.task_id].send(
pb_utils.InferenceResponse([out_tensor]))
Expand Down Expand Up @@ -81,7 +79,7 @@ def initialize(self, args):
enable decoupled transaction policy in model configuration to
serve this model""".format(args["model_name"])
error_msg = error_format.format(error_type.name, error_code.name, error_info)
logger.error(error_msg)
warning_logger.error(error_msg)
raise pb_utils.TritonModelException(error_msg)

parameters = self.model_config["parameters"]
Expand Down Expand Up @@ -127,7 +125,7 @@ def execute(self, requests):
error_code = ErrorCode.C0000
error_info = "Cannot load json data from request, received data = {} error={}.".format(request_tensor, e)
error_msg = error_format.format(error_type.name, error_code.name, error_info)
logger.error(error_msg)
warning_logger.error(error_msg)
error_res = pb_utils.InferenceResponse(
error=pb_utils.TritonError(error_msg))
res_sender = request.get_response_sender()
Expand All @@ -145,7 +143,7 @@ def execute(self, requests):
error_code = ErrorCode.C0001
error_info = "There's error while deserializing data from request, received data = {} error={}".format(data, e)
error_msg = error_format.format(error_type.name, error_code.name, error_info)
logger.error(error_msg)
warning_logger.error(error_msg)
error_res = pb_utils.InferenceResponse(
error=pb_utils.TritonError(error_msg))
res_sender = request.get_response_sender()
Expand All @@ -162,7 +160,7 @@ def execute(self, requests):
error_code = ErrorCode.C0001
error_info = "Task id conflict with {}.".format(task.task_id)
error_msg = error_format.format(error_type.name, error_code.name, error_info)
logger.error(error_msg)
warning_logger.error(error_msg)
error_res = pb_utils.InferenceResponse(
error=pb_utils.TritonError(error_msg))
res_sender = request.get_response_sender()
Expand All @@ -179,7 +177,7 @@ def execute(self, requests):
error_code = ErrorCode.C0001
error_info = "There's error while checking task, task={} error={}".format(task, e)
error_msg = error_format.format(error_type.name, error_code.name, error_info)
logger.error(error_msg)
warning_logger.error(error_msg)
error_res = pb_utils.InferenceResponse(
error=pb_utils.TritonError(error_msg))
res_sender = request.get_response_sender()
Expand All @@ -194,7 +192,7 @@ def execute(self, requests):
error_code = ErrorCode.S0000
error_info = "The queue is full now(size={}), please wait for a while.".format(self.model.max_queue_num)
error_msg = error_format.format(error_type.name, error_code.name, error_info)
logger.error(error_msg)
warning_logger.error(error_msg)
error_res = pb_utils.InferenceResponse(error=pb_utils.TritonError(error_msg))
res_sender = request.get_response_sender()
res_sender.send(
Expand Down Expand Up @@ -227,7 +225,7 @@ def execute(self, requests):
error_code = ErrorCode.C0001
error_info = "There's error while inserting new request, task={} error={}".format(task, e)
error_msg = error_format.format(error_type.name, error_code.name, error_info)
logger.error(error_msg)
warning_logger.error(error_msg)
error_res = pb_utils.InferenceResponse(error=pb_utils.TritonError(error_msg))
res_sender = request.get_response_sender()
res_sender.send(
Expand All @@ -241,7 +239,7 @@ def finalize(self):
info_type = ErrorType.Server
info_code = ErrorCode.S0002
info_msg = error_format.format(info_type.name, info_code.name, "The triton server is going to terminating...")
logger.info(info_msg)
warning_logger.info(info_msg)
self.model.stop()
os.system("""
bash -c 'pids=$(ps auxww | grep -E "triton_python_backend_stub|multiprocessing.resource_tracker|engine.py" | grep -v grep | awk '"'"'{print $2}'"'"');
Expand Down
3 changes: 2 additions & 1 deletion llm/fastdeploy_llm/utils/launch_infer.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ def launch(device_ids, **kwargs: dict):
pd_cmd = "python3 {} {}".format(infer_script_path, ' '.join(args))
logger.info("Launch model with command: {}".format(pd_cmd))
logger.info("Model is initializing...")
infer_logger = open('modelmatrix/log/infer.log', 'a')
log_home = os.getenv("LOG_HOME", ".")
infer_logger = open('{}/infer.log'.format(log_home), 'a')
p = subprocess.Popen(
pd_cmd,
shell=True,
Expand Down
1 change: 1 addition & 0 deletions llm/fastdeploy_llm/utils/logging_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,4 @@ def use_terminator(self, terminator: str) -> Generator[None, None, None]:


logger = Logger()
warning_logger = Logger(name="fastDeploy_llm_serving_warning")

0 comments on commit 73b5af2

Please sign in to comment.