Skip to content

Commit

Permalink
Dev/llama3 (vllm-project#7)
Browse files Browse the repository at this point in the history
* llama support

* flash_attention

* sharded

* expend

* fix: remove redunctant info

* change main

* llama and opt model supported

---------

Co-authored-by: Shao Siyang FYP PDCL <shaosy@scsehg.cm.cluster>
Co-authored-by: lairuiqi <lrq619@outlook.com>
Co-authored-by: LaiRuiqi <58351056+lrq619@users.noreply.github.com>
  • Loading branch information
4 people committed Oct 8, 2024
1 parent ec6642c commit 40f0d62
Show file tree
Hide file tree
Showing 11 changed files with 674 additions and 105 deletions.
8 changes: 4 additions & 4 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ def main():
enforce_eager=True,
# load_format="auto",
# tensor_parallel_size=2,
# liquid_gpu_range = [0,1,2,3],
liquid_gpu_range = [0,1,2,3],
liquid_gpu_space = 32,
liquid_driver_gpu_id = 0,
liquid_total_num_shards = 4,
# gpu_memory_utilization=0.8,

)
sampling_params = SamplingParams(temperature=0, min_tokens=128, max_tokens=128)
request_num = 1
Expand All @@ -37,8 +38,7 @@ def main():
llm.do_liquid(liquid_request)
liquid_request = LiquidRequest(LiquidType.LIQUID_2_1)
llm.do_liquid(liquid_request)
# liquid_request = LiquidRequest(LiquidType.LIQUID_1_2)
# llm.do_liquid(liquid_request)



output = llm.generate(inputs, sampling_params=sampling_params)
Expand All @@ -53,4 +53,4 @@ def main():
main()
# torch.cuda.memory._dump_snapshot(f"./torch_mem_dump.pickle")
# torch.cuda.memory._record_memory_history(enabled=None)
# print(f"dumped finished!")
# print(f"dumped finished!")
58 changes: 58 additions & 0 deletions vanilla.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@

from vllm import LLM, SamplingParams
from vllm.liquid.request import LiquidRequest, LiquidType
# from vllm import EngineArgs, LLMEngine
import asyncio
import torch

import os

model = "meta-llama/Meta-Llama-3-8B"
# model = "facebook/opt-6.7b"
# model_path = os.path.join("./models", model)

def main():
llm = LLM(
model,
enforce_eager=True,
# load_format="auto",
tensor_parallel_size=2,
# liquid_gpu_range = [0,1,2,3],
# liquid_gpu_space = 32,
# liquid_driver_gpu_id = 0,
# liquid_total_num_shards = 4,
gpu_memory_utilization=0.8,
)
sampling_params = SamplingParams(temperature=0, min_tokens=128, max_tokens=128)
request_num = 1
word = "what is LLM?"
prompt = word
inputs = [prompt for _ in range(request_num)]

# for i in range(1):
# print(f"i: {i}")
# liquid_request = LiquidRequest(LiquidType.LIQUID_1_2)
# llm.do_liquid(liquid_request)
# # liquid_request = LiquidRequest(LiquidType.LIQUID_2_4)
# # llm.do_liquid(liquid_request)
# # liquid_request = LiquidRequest(LiquidType.LIQUID_4_2)
# # llm.do_liquid(liquid_request)
# liquid_request = LiquidRequest(LiquidType.LIQUID_2_1)
# llm.do_liquid(liquid_request)

# print("liquid done")


output = llm.generate(inputs, sampling_params=sampling_params)
print(f"output: {output[0].outputs[0].text}")





if __name__ == '__main__':
# torch.cuda.memory._record_memory_history(context="all", stacks="all")
main()
# torch.cuda.memory._dump_snapshot(f"./torch_mem_dump.pickle")
# torch.cuda.memory._record_memory_history(enabled=None)
# print(f"dumped finished!")
4 changes: 2 additions & 2 deletions vllm/attention/backends/flash_attn_liquid.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,15 +265,15 @@ def __init__(

def delete_shard(self, shard_id: int):
assert shard_id in self.shard_ids
self.num_heads -= self.num_kv_heads_per_shard
self.num_heads -= self.num_heads_per_shard
self.num_kv_heads -= self.num_kv_heads_per_shard

index = self.shard_ids.index(shard_id)
self.shard_ids.pop(index)

def append_shard(self, shard_id: int):
assert shard_id not in self.shard_ids
self.num_heads += self.num_kv_heads_per_shard
self.num_heads += self.num_heads_per_shard
self.num_kv_heads += self.num_kv_heads_per_shard
self.shard_ids.append(shard_id)

Expand Down
8 changes: 5 additions & 3 deletions vllm/engine/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ def __init__(
self.liquid_config = liquid_config
self.liquid_request_queue: Queue[LiquidRequest] = Queue()
self.execution_lock: threading.Lock = threading.Lock()
self.auto_scaler = AutoScaler(liquid_config=liquid_config)
if liquid_config is not None:
self.auto_scaler = AutoScaler(liquid_config=liquid_config)
self.request_output_queue: Queue[RequestOutput] = Queue()

if not self.model_config.skip_tokenizer_init:
Expand Down Expand Up @@ -832,8 +833,9 @@ def step(self) -> List[Union[RequestOutput, EmbeddingRequestOutput]]:
"""
# self.model_executor.delete_kv_cache()
cache_usage = self.get_latest_metrics().gpu_cache_usage
# liquid_request = None
liquid_request = self.auto_scaler.step(cache_usage)
liquid_request = None
if self.liquid_config is not None:
liquid_request = self.auto_scaler.step(cache_usage)
if liquid_request is not None:
self.liquid_request_queue.put(liquid_request)

Expand Down
211 changes: 201 additions & 10 deletions vllm/liquid/model_executor/layers/linear.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import torch
import torch.nn.functional as F
from torch.nn.parameter import Parameter
from vllm.liquid.sharded_parameter import ShardedParameter, QKVShardedParameter
from vllm.liquid.sharded_parameter import ShardedParameter, QKVShardedParameter,GateUpShardedParameter

from vllm.distributed import (divide, get_tensor_model_parallel_rank,
get_tensor_model_parallel_world_size,
Expand Down Expand Up @@ -94,15 +94,28 @@ def create_weights(self, layer: torch.nn.Module,
shard_dim: int = -1,
param_class = ShardedParameter,
**extra_weight_attrs):
weight = param_class(
data=torch.empty(sum(output_partition_sizes),
input_size_per_partition,
dtype=params_dtype),
num_shards=len(shard_ids),
shard_dim=shard_dim,
shard_ids=shard_ids,
requires_grad=False,
)
if param_class == QKVShardedParameter:
weight = QKVShardedParameter(
data=torch.empty(sum(output_partition_sizes),
input_size_per_partition,
dtype=params_dtype),
num_shards=len(shard_ids),
shard_dim=shard_dim,
shard_ids=shard_ids,
requires_grad=False,
num_heads_ratio=extra_weight_attrs['num_heads_ratio'],
num_kv_heads_ratio=extra_weight_attrs['num_kv_heads_ratio'],
)
else:
weight = param_class(
data=torch.empty(sum(output_partition_sizes),
input_size_per_partition,
dtype=params_dtype),
num_shards=len(shard_ids),
shard_dim=shard_dim,
shard_ids=shard_ids,
requires_grad=False,
)
set_weight_attrs(weight, {"input_dim": 1, "output_dim": 0})
layer.register_parameter("weight", weight)
set_weight_attrs(weight, extra_weight_attrs)
Expand Down Expand Up @@ -276,6 +289,8 @@ def __init__(self,
shard_ids: List[int] = [0],
total_num_shards: int = 1,
param_class = ShardedParameter,
num_heads_ratio: int=1,
num_kv_heads_ratio: int=1,
):
super().__init__(input_size, output_size, skip_bias_add, params_dtype,
quant_config)
Expand Down Expand Up @@ -310,6 +325,8 @@ def __init__(self,
shard_ids=shard_ids,
shard_dim=shard_dim,
param_class=param_class,
num_heads_ratio=num_heads_ratio,
num_kv_heads_ratio=num_kv_heads_ratio,
)
if bias:
self.bias = param_class(
Expand Down Expand Up @@ -446,6 +463,8 @@ def __init__(self,
shard_ids=shard_ids,
total_num_shards=total_num_shards,
param_class=QKVShardedParameter,
num_heads_ratio=self.num_heads,
num_kv_heads_ratio=self.num_kv_heads,
)

def weight_loader(self,
Expand Down Expand Up @@ -737,3 +756,175 @@ def extra_repr(self) -> str:
s += f", tp_size={self.tp_size}"
s += f", reduce_results={self.reduce_results}"
return s


class MergedColumnParallelLinear(ColumnParallelLinear):
"""Packed linear layers with column parallelism.
Similar to ColumnParallelLinear, but the weight matrix is concatenated
along the output dimension. When the weight matrix is loaded, the
different partitions are sharded separately.
Args:
input_size: input dimension of the linear layer.
output_sizes: list of output dimensions of the linear layer.
bias: If true, add bias.
gather_output: If true, call all-gather on output and make the output
available to all GPUs, otherwise, every GPU will have
its own output.
skip_bias_add: This was added to enable performance optimizations where
bias can be fused with other element-wise operations. we
skip adding bias but instead return it.
params_dtype: Data type for the parameters.
quant_config: Quantization configure.
"""

def __init__(self,
input_size: int,
output_sizes: List[int],
bias: bool = True,
gather_output: bool = False,
skip_bias_add: bool = False,
params_dtype: Optional[torch.dtype] = None,
quant_config: Optional[QuantizationConfig] = None,
shard_ids: List[int] = [0],
total_num_shards: int = 1,):
self.output_sizes = output_sizes
# tp_size = get_tensor_model_parallel_world_size()
# assert all(output_size % tp_size == 0 for output_size in output_sizes)
super().__init__(input_size=input_size,
output_size=sum(output_sizes),
bias=bias,
gather_output=gather_output,
skip_bias_add=skip_bias_add,
params_dtype=params_dtype,
quant_config=quant_config,
shard_ids=shard_ids,
total_num_shards=total_num_shards,
param_class=GateUpShardedParameter,
)

def weight_loader(self,
param: Parameter,
loaded_weight: torch.Tensor,
loaded_shard_id: Optional[int] = None):

param_data = param.data
output_dim = getattr(param, "output_dim", None)
# Special case for AQLM codebooks.
is_metadata = getattr(param, "is_metadata", False)

param_shard_splitter = getattr(param, "shard_splitter", None)

if output_dim is not None and param_shard_splitter is not None:
raise NotImplementedError(
"We do not currently support output_dim != None and "
"shard_splitter != None for a parameter. Please open an issue."
)
# If a parameter has defined a shard_splitter to be used for
# the weight, it should be applied before the weight is
# loaded/copied to the parameter. The shard_splitter applies
# logic by using the loaded_shard_id to ensure that the loaded
# param is loaded to the correct location
# within the parameter defined by the linear method.
if loaded_shard_id is None and param_shard_splitter is not None:
raise NotImplementedError(
"We do not currently support loaded_shard_id == None and "
"shard_splitter != None for a parameter. Please open an issue."
)

# Special case for Fp8 scales.
fp8_scales_shard_indexer = getattr(param, "fp8_scales_shard_indexer",
None)

if loaded_shard_id is None:
# Loaded weight is already packed.
if output_dim is None:
assert param_data.shape == loaded_weight.shape
param_data.copy_(loaded_weight)
return
current_shard_offset = 0
shard_offsets = []
for i, output_size in enumerate(self.output_sizes):
shard_offsets.append((i, current_shard_offset, output_size))
current_shard_offset += output_size
packed_dim = getattr(param, "packed_dim", None)
for shard_id, shard_offset, shard_size in shard_offsets:
# Special case for Quantization.
# If quantized, we need to adjust the offset and size to account
# for the packing.
if packed_dim == output_dim:
shard_size = shard_size // param.pack_factor
shard_offset = shard_offset // param.pack_factor
# Special case for Marlin.
shard_size, shard_offset = adjust_marlin_shard(
param, shard_size, shard_offset)

loaded_weight_shard = loaded_weight.narrow(
output_dim, shard_offset, shard_size)
self.weight_loader(param, loaded_weight_shard, shard_id)
return

assert loaded_shard_id < len(self.output_sizes)
tp_rank = get_tensor_model_parallel_rank()
tp_size = get_tensor_model_parallel_world_size()
if output_dim is not None:
shard_offset = sum(self.output_sizes[:loaded_shard_id]) // tp_size
shard_size = self.output_sizes[loaded_shard_id] // tp_size
# Special case for quantization.
# If quantized, we need to adjust the offset and size to account
# for the packing.
packed_dim = getattr(param, "packed_dim", None)
if packed_dim == output_dim:
shard_size = shard_size // param.pack_factor
shard_offset = shard_offset // param.pack_factor
# Special case for Marlin.
shard_size, shard_offset = adjust_marlin_shard(
param, shard_size, shard_offset)

use_bitsandbytes = getattr(param, "use_bitsandbytes", False)
if use_bitsandbytes:
shard_size = loaded_weight.shape[output_dim]
shard_offset = loaded_weight.shape[output_dim] * \
loaded_shard_id

param_data = param_data.narrow(output_dim, shard_offset,
shard_size)
start_idx = tp_rank * shard_size
loaded_weight = loaded_weight.narrow(output_dim, start_idx,
shard_size)
# Special case for AQLM codebooks.
elif is_metadata:
# metadata indicates fixed size concatenated along dim 0
shard_size = loaded_weight.shape[0]
shard_offset = loaded_shard_id * shard_size
param_data = param_data.narrow(0, shard_offset, shard_size)

# If a param_shard_splitter is defined by the LinearMethod, use it.
elif param_shard_splitter is not None:
logical_widths = getattr(param, "logical_widths", None)
param_data, loaded_weight = param_shard_splitter(
param_data, loaded_weight, loaded_shard_id, logical_widths)

# Special case for Fp8 scales.
elif fp8_scales_shard_indexer is not None:
param_data, loaded_weight = fp8_scales_shard_indexer(
param_data, loaded_weight, loaded_shard_id)

else:
ignore_warning = getattr(param, "ignore_warning", False)
if not ignore_warning:
logger.warning(
"Loading a weight without `output_dim` attribute in "
"MergedColumnParallelLinear, assume the weight is "
"the same for all partitions.")

if fp8_scales_shard_indexer is None:
if len(param_data.shape) == 0:
param_data = param_data.reshape(1)

if len(loaded_weight.shape) == 0:
loaded_weight = loaded_weight.reshape(1)

assert param_data.shape == loaded_weight.shape
param_data.copy_(loaded_weight)
6 changes: 4 additions & 2 deletions vllm/liquid/model_executor/layers/vocab_parallel_embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,11 @@ def __init__(self,
bias: bool = False,
params_dtype: Optional[torch.dtype] = None,
org_num_embeddings: Optional[int] = None,
padding_size: int = DEFAULT_VOCAB_PADDING_SIZE):
padding_size: int = DEFAULT_VOCAB_PADDING_SIZE,
shard_ids: List[int] = [0],
total_num_shards: int = 1,):
super().__init__(num_embeddings, embedding_dim, params_dtype,
org_num_embeddings, padding_size)
org_num_embeddings, padding_size, shard_ids, total_num_shards)
if bias:
self.bias = Parameter(
torch.empty(self.num_embeddings_per_partition,
Expand Down
Loading

0 comments on commit 40f0d62

Please sign in to comment.