Skip to content

Commit

Permalink
merge code from fastdeploy
Browse files Browse the repository at this point in the history
  • Loading branch information
kevincheng2 committed Jan 21, 2025
1 parent e80c2d3 commit f17e071
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 46 deletions.
45 changes: 44 additions & 1 deletion llm/server/server/server/data/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ def process_request(self, request, max_seq_len=None):
request["eos_token_ids"] = []
request["eos_token_ids"].extend(get_eos_token_id(self.tokenizer, self.config.generation_config))

if "stop_seqs" not in request or (isinstance(request["stop_seqs"], (list, tuple)) and len(request["stop_seqs"]) == 0):
self.update_stop_seq(request)

if "input_ids" not in request or \
(isinstance(request["input_ids"], (list, tuple)) and len(request["input_ids"]) == 0):
if "text" in request:
Expand Down Expand Up @@ -282,7 +285,7 @@ def _load_tokenizer(self):
"""
if self.config.use_hf_tokenizer:
from transformers import AutoTokenizer
return AutoTokenizer.from_pretrained(self.config.model_dir, use_fast=False, vocab_file=os.path.join(self.config.model_dir, "sentencepiece.bpe.model"))
return AutoTokenizer.from_pretrained(self.config.model_dir, use_fast=False)
else:
from paddlenlp.transformers import AutoTokenizer
return AutoTokenizer.from_pretrained(self.config.model_dir)
Expand Down Expand Up @@ -334,3 +337,43 @@ def get_pad_id(self):
if isinstance(self.tokenizer, (LlamaTokenizer, Llama3Tokenizer)) and not self.tokenizer.pad_token_id:
return self.tokenizer.eos_token
return self.tokenizer.pad_token_id

def pad_batch_data(self, insts, pad_id=0, return_seq_len=False, return_array=True, pad_style="right"):
"""Pad the instances to the max sequence length in batch."""
if len(insts) == 0:
padded_insts = np.array([[]], dtype=np.int64) if return_array else [[]]
if return_seq_len:
seq_len = np.array([], dtype=np.int64) if return_array else []
return padded_insts, seq_len
return padded_insts

max_len = max(map(len, insts))
if pad_style == "left":
padded_insts = [[pad_id] * (max_len - len(inst)) + list(inst) for inst in insts]
else:
padded_insts = [list(inst) + [pad_id] * (max_len - len(inst)) for inst in insts]
if return_array:
padded_insts = np.array(padded_insts, dtype=np.int64).reshape([-1, max_len])

if return_seq_len:
seq_len = [len(inst) for inst in insts]
if return_array:
seq_len = np.array(seq_len, dtype=np.int64).reshape(-1, 1)
return padded_insts, seq_len
return padded_insts

def update_stop_seq(self, request):
"""
Update stop sequences from request.
"""
stop_seqs = []
for seq in request.get("stop_sequences", []):
if seq != self.tokenizer.eos_token_id:
stop_seqs.append(self.tokenizer.convert_tokens_to_ids(self.tokenizer.tokenize(seq)))
request["stop_seqs"], request["stop_seqs_len"] = self.pad_batch_data(
stop_seqs,
pad_id=-1,
return_seq_len=True,
return_array=False
)
data_processor_logger.debug(f"processed request: {request['stop_seqs'], request['stop_seqs_len']}")
29 changes: 29 additions & 0 deletions llm/server/server/server/engine/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from paddlenlp.generation import GenerationConfig
from server.utils import model_server_logger
from dataclasses import dataclass


class Config:
Expand Down Expand Up @@ -203,6 +204,27 @@ def get_model_config(self):
model_config_json = json.load(open(self.model_config_path, 'r', encoding='utf-8'))
return model_config_json

def get_speculate_config(self):
"""
get speculate_decoding related config
Returns:
SpeculateConfig: the speculate related config
"""
speculate_config = SpeculateConfig()
model_cfg = self.get_model_config()
if model_cfg.get("speculate_method", "None") != "None":
speculate_config.speculate_method = str(model_cfg["speculate_method"])
speculate_config.speculate_max_draft_token_num = model_cfg[
"speculate_max_draft_token_num"]
speculate_config.speculate_max_ngram_size = model_cfg[
"speculate_max_ngram_size"]

if speculate_config.speculate_method not in ["None", "inference_with_reference"]:
model_server_logger.error(f"Unsupport speculate method: {speculate_config.speculate_method}")

return speculate_config

def read_from_config(self):
"""
reset model config from json file
Expand Down Expand Up @@ -234,3 +256,10 @@ def get_unique_name(self, name):

def __str__(self) -> str:
return json.dumps(self.__dict__, indent=4)


@dataclass
class SpeculateConfig:
speculate_method: str = "None"
speculate_max_draft_token_num: int = 1
speculate_max_ngram_size: int = 1
103 changes: 101 additions & 2 deletions llm/server/server/server/engine/infer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from paddlenlp_ops import step_paddle
from server.data.processor import DataProcessor
from server.engine.config import Config
from paddlenlp.experimental.transformers import InferenceWithReferenceProposer
from server.utils import get_logger
from task_queue_manager import TaskQueueManager

Expand All @@ -47,12 +48,19 @@ def __init__(self, args):

self.config = Config()
self.model_cfg = self.config.get_model_config()
self.speculate_config = self.config.get_speculate_config()
self.is_speculate_decoding = self.speculate_config.speculate_method != "None"
self.format_print_configuration()

self.args.num_layers = self.get_value(self.model_cfg, ["num_hidden_layers", "num_layers"])
self.args.num_attention_heads = self.get_value(self.model_cfg, ["num_attention_heads", "n_head"])
self.args.hidden_size = self.model_cfg["hidden_size"]

self.reduce_dialogue_repetition = int(os.environ.get("REDUCE_DIALOGUE_REPETITION", 0))

self.max_stop_seqs_num = int(os.getenv("MAX_STOP_SEQS_NUM", 5))
self.stop_seqs_max_len = int(os.getenv("STOP_SEQS_MAX_LEN", 8))

self.nranks = dist.get_world_size()
self.init_dist_env()
self.rank = fleet.worker_index()
Expand All @@ -63,6 +71,17 @@ def __init__(self, args):
self.cache_kvs = {}
self.init_inputs()

if self.is_speculate_decoding:
logger.info(f'Using speculate decoding, method: {self.speculate_config.speculate_method}.')
if self.speculate_config.speculate_method == "inference_with_reference":
self.proposer = InferenceWithReferenceProposer(
self.speculate_config.speculate_max_draft_token_num,
self.speculate_config.speculate_max_ngram_size,
self.args.max_batch_size,
self.args.max_seq_len)
else:
self.proposer = None

self.infer_queue = TaskQueueManager(rank=self.rank, mp_num=self.nranks, port=self.config.infer_port)

model_rank_path = os.path.join(self.args.model_dir, f"rank_{self.rank}")
Expand Down Expand Up @@ -247,6 +266,31 @@ def init_inputs(self):
self.share_inputs['free_list_len'] = paddle.full(
shape=[1], fill_value=self.free_list_len, dtype="int32")

self.share_inputs['stop_seqs_len'] = paddle.full(shape=[self.max_stop_seqs_num,],
fill_value=0,
dtype="int32")
self.share_inputs['stop_seqs'] = paddle.full(shape=[self.max_stop_seqs_num, self.stop_seqs_max_len],
fill_value=-1,
dtype="int64")

if self.reduce_dialogue_repetition:
self.share_inputs["first_token_ids"] = paddle.full(
shape=[self.args.max_batch_size, 1], fill_value=-1, dtype="int64")
self.share_inputs["ori_seq_lens_encoder"] = paddle.full(
shape=[self.args.max_batch_size, 1], fill_value=0, dtype="int32")
# speculate decoding input
if self.is_speculate_decoding:
self.share_inputs["accept_tokens"] = paddle.full(
shape=[self.args.max_batch_size, self.speculate_config.speculate_max_draft_token_num + 1], fill_value=0, dtype="int64"
)
self.share_inputs["accept_num"] = paddle.full(shape=[self.args.max_batch_size], fill_value=0, dtype="int32")
self.share_inputs["draft_tokens"] = paddle.full(
shape=[self.args.max_batch_size, self.speculate_config.speculate_max_draft_token_num + 1], fill_value=0, dtype="int64"
)
self.share_inputs["actual_draft_token_num"] = paddle.full(
shape=[self.args.max_batch_size], fill_value=self.speculate_config.speculate_max_draft_token_num, dtype="int32"
)

def dy_input_preprocess(self, tasks):
"""
dynamic insertion
Expand Down Expand Up @@ -280,6 +324,10 @@ def dy_input_preprocess(self, tasks):
self.share_inputs['max_length'][idx:idx + 1] = max_dec_len
self.share_inputs['stop_flags'][idx:idx + 1] = False

if self.reduce_dialogue_repetition:
self.share_inputs['first_token_ids'][idx:idx + 1] = self.share_inputs['input_ids'][idx:idx + 1, :1]
self.share_inputs["ori_seq_lens_encoder"][idx:idx + 1] = length

if "infer_seed" in task:
self.share_inputs['infer_seed'][idx:idx + 1] = task['infer_seed']

Expand All @@ -289,10 +337,29 @@ def dy_input_preprocess(self, tasks):
self.share_inputs["block_tables"][idx:idx + 1, :encoder_block_num] = np.array(
task['block_tables'], dtype="int32")

if "stop_seqs_len" in task:
stop_seqs_num = len(task["stop_seqs_len"])
for i in range(stop_seqs_num, self.max_stop_seqs_num):
task["stop_seqs_len"].append(0)
self.share_inputs['stop_seqs_len'][:] = np.array(
task["stop_seqs_len"], dtype="int32")
self.share_inputs['stop_seqs'][:stop_seqs_num, :len(task['stop_seqs'][0])] = np.array(
task["stop_seqs"], dtype="int64")

if self.is_speculate_decoding:
self.share_inputs["draft_tokens"][idx:idx + 1] = np.zeros([self.speculate_config.speculate_max_draft_token_num + 1])
self.share_inputs["actual_draft_token_num"][idx:idx + 1] = np.array([self.speculate_config.speculate_max_draft_token_num])

def step_cuda(self, seq_lens_this_time):
"""
step cuda
"""
# whether speculate decoding
if self.is_speculate_decoding:
speculate_step_token_num = self.speculate_config.speculate_max_draft_token_num + 1
else:
speculate_step_token_num = 0

step_paddle(self.share_inputs['stop_flags'], seq_lens_this_time,
self.share_inputs['step_seq_lens_encoder'],
self.share_inputs['seq_lens_encoder'],
Expand All @@ -305,7 +372,8 @@ def step_cuda(self, seq_lens_this_time):
self.share_inputs['free_list'], self.share_inputs['free_list_len'],
self.share_inputs['input_ids'], self.share_inputs['pre_ids'],
self.share_inputs['step_idx'], self.share_inputs['next_tokens'],
self.args.block_size, self.args.enc_dec_block_num, self.args.first_token_id)
self.args.block_size, self.args.enc_dec_block_num, self.args.first_token_id,
speculate_step_token_num)

def initialize_engine_ready_check_flag(self):
"""
Expand Down Expand Up @@ -430,6 +498,13 @@ def run(self):
time.sleep(0.001)
continue

if self.proposer is not None:
self.proposer.run(
self.share_inputs,
real_batch_size=seq_lens_this_time.shape[0],
seq_lens_this_time=seq_lens_this_time,
)

self.infer_engine.predictor.run()
self.share_inputs['infer_seed'].add_(infer_seed_increment)
self.share_inputs['infer_seed'][:] %= self.MAX_INFER_SEED
Expand Down Expand Up @@ -478,6 +553,30 @@ def _init_predictor(self):

config.enable_use_gpu(100, device_id)

pir_flag = int(os.environ.get("FLAGS_enable_pir_api", 0))
if pir_flag == 1:
config.enable_new_executor()
config.enable_new_ir()

# distributed config
if self.mp_degree > 1:
trainer_endpoints = fleet.worker_endpoints()
current_endpoint = trainer_endpoints[self.rank]
dist_config = config.dist_config()
dist_config.set_ranks(self.nranks, self.rank)
dist_config.set_endpoints(trainer_endpoints, current_endpoint)
dist_config.enable_dist_model(True)
if self.config.distributed_config_path:
dist_config.set_comm_init_config(self.config.distributed_config_path)
else:
raise Exception("Please set DISTRIBUTED_CONFIG env variable.")
logger.warning(
f"Use default distributed config, please set env DISTRIBUTED_CONFIG"
)
dist_config.set_comm_init_config(
os.path.join(Dir_Path + "/config", "rank_mapping_mp{}.csv".format(self.nranks)))

config.set_dist_config(dist_config)
self.predictor = paddle.inference.create_predictor(config)
self.input_names = self.predictor.get_input_names()
self.seq_lens_handle = self.predictor.get_input_handle('seq_lens_this_time')
Expand Down Expand Up @@ -513,7 +612,7 @@ def parse_args():
"""
parse args from command line
"""
parser = argparse.ArgumentParser("Deploy LLM Inference")
parser = argparse.ArgumentParser("FastDeploy LLM Inference")
parser.add_argument('-m',
'--model_dir',
type=str,
Expand Down
Loading

0 comments on commit f17e071

Please sign in to comment.