Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce pipeline parallel degree config #2171

Merged
merged 6 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions engines/python/setup/djl_python/arg_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ def python_engine_args():
dest="tensor_parallel_degree",
type=int,
help='The tensor parallel degree')
parser.add_argument('--pipeline-parallel-degree',
required=False,
dest="pipeline_parallel_degree",
type=int,
help='The pipeline parallel degree')
parser.add_argument('--cluster-size',
required=False,
dest="cluster_size",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class HuggingFaceProperties(Properties):
device_id: int = -1
task: str = None
tensor_parallel_degree: int = -1
pipeline_parallel_degree: int = -1
cluster_size: int = 1
device_map: str = None
load_in_4bit: Optional[bool] = None
Expand Down Expand Up @@ -120,14 +121,19 @@ def construct_kwargs_device_map(self):
self.device = None
logging.info(f"Using device map {self.device_map}")
elif self.tensor_parallel_degree > 0 \
and self.pipeline_parallel_degree > 0 \
and self.cluster_size > 0 \
and torch.cuda.device_count() > 0:
self.kwargs["device_map"] = "auto"
self.device = None
world_size = torch.cuda.device_count() * self.cluster_size
assert world_size == self.tensor_parallel_degree, \
f"TP degree ({self.tensor_parallel_degree}) doesn't match available GPUs ({world_size})"
logging.info(f"Using {world_size} gpus")

if world_size != self.tensor_parallel_degree * self.pipeline_parallel_degree:
raise ValueError(
f"TP*PP degree ({self.tensor_parallel_degree*self.pipeline_parallel_degree}) doesn't match available GPUs ({world_size})"
)

logging.info(f"Using {world_size} gpus collectively.")
return self

@model_validator(mode='after')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class LmiDistRbProperties(Properties):
load_format: Optional[str] = "auto"
quantize: Optional[LmiDistQuantizeMethods] = None
tensor_parallel_degree: Optional[int] = None
pipeline_parallel_degree: Optional[int] = None
max_rolling_batch_prefill_tokens: Optional[int] = None
# Adjustable prefix model length for certain 32k or longer model
max_model_len: Optional[int] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ def __init__(self, model_id_or_path: str, properties: dict, **kwargs):
engine_args = VllmEngineArgs(
model=self.lmi_dist_config.model_id_or_path,
tensor_parallel_size=self.lmi_dist_config.tensor_parallel_degree,
pipeline_parallel_size=self.lmi_dist_config.
pipeline_parallel_degree,
dtype=DTYPE_MAPPER[self.lmi_dist_config.dtype],
seed=0,
max_model_len=self.lmi_dist_config.max_model_len,
Expand All @@ -81,6 +83,8 @@ def __init__(self, model_id_or_path: str, properties: dict, **kwargs):
**engine_kwargs)

kwargs = {}
logging.info(f"engine_args: {engine_args}, kwargs: {kwargs}")

if self.lmi_dist_config.max_rolling_batch_prefill_tokens is None:
kwargs["warmup_prefill_tokens"] = _WARMUP_PREFILL_TOKENS
self.engine = engine_from_args(engine_args, **kwargs)
Expand Down
4 changes: 4 additions & 0 deletions engines/python/setup/djl_python_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def __init__(self, args, service):
self.service = service
self.device_id = args.device_id
self.tensor_parallel_degree = args.tensor_parallel_degree
self.pipeline_parallel_degree = args.pipeline_parallel_degree
self.cluster_size = args.cluster_size
self.entry_point = args.entry_point
self.recommended_entry_point = args.recommended_entry_point
Expand Down Expand Up @@ -123,6 +124,9 @@ def run_server(self):
prop = inputs.get_properties()
if self.tensor_parallel_degree:
prop["tensor_parallel_degree"] = self.tensor_parallel_degree
if self.pipeline_parallel_degree:
prop[
"pipeline_parallel_degree"] = self.pipeline_parallel_degree
if self.cluster_size:
prop["cluster_size"] = self.cluster_size
prop["device_id"] = self.device_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,31 @@ static String[] getPythonStartCmd(
int deviceId = device.getDeviceId();
int clusterSize = PyEnv.getClusterSize();
int tensorParallelDegree = pyEnv.getTensorParallelDegree();
int pipelineParallelDegree = pyEnv.getPipelineParallelDegree();
String entryPoint = pyEnv.getEntryPoint();
String recommendedEntryPoint = pyEnv.getRecommendedEntryPoint();

if (PyEnv.isMultiNode()) {
String cudaDevices = getVisibleDevices(workerId, tensorParallelDegree / clusterSize);

int worldSize = tensorParallelDegree * pipelineParallelDegree;

if (tensorParallelDegree * pipelineParallelDegree % clusterSize != 0) {
throw new IllegalArgumentException(
"Error: Cannot use cluster size: "
+ clusterSize
+ "for world size (number of total GPUs): "
+ worldSize);
}

int localSize = (tensorParallelDegree * pipelineParallelDegree) / clusterSize;

String cudaDevices = getVisibleDevices(workerId, localSize);
logger.info("Set before mpirun CUDA_VISIBLE_DEVICES={}", cudaDevices);
logger.info(
"Received: pp degree: {} and tp depgree: {} and cluster size: {}",
pipelineParallelDegree,
tensorParallelDegree,
clusterSize);
StringBuilder sb = new StringBuilder();
boolean first = true;
for (String host : hosts) {
Expand All @@ -140,12 +159,12 @@ static String[] getPythonStartCmd(
} else {
sb.append(',');
}
sb.append(host).append(':').append(tensorParallelDegree / clusterSize);
sb.append(host).append(':').append(localSize);
}
String[] args = new String[46];
String[] args = new String[48];
args[0] = "mpirun";
args[1] = "-np";
args[2] = String.valueOf(tensorParallelDegree);
args[2] = String.valueOf(worldSize);
args[3] = "--host";
args[4] = sb.toString();
args[5] = "--allow-run-as-root";
Expand Down Expand Up @@ -185,10 +204,12 @@ static String[] getPythonStartCmd(
args[39] = String.valueOf(port);
args[40] = "--tensor-parallel-degree";
args[41] = String.valueOf(tensorParallelDegree);
args[42] = "--cluster-size";
args[43] = String.valueOf(clusterSize);
args[44] = "--recommended-entry-point";
args[45] = recommendedEntryPoint == null ? "" : recommendedEntryPoint;
args[42] = "--pipeline-parallel-degree";
args[43] = String.valueOf(pipelineParallelDegree);
args[44] = "--cluster-size";
args[45] = String.valueOf(clusterSize);
args[46] = "--recommended-entry-point";
args[47] = recommendedEntryPoint == null ? "" : recommendedEntryPoint;
return args;
}

Expand Down
30 changes: 29 additions & 1 deletion engines/python/src/main/java/ai/djl/python/engine/PyEnv.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class PyEnv {
private int predictTimeout;
private int modelLoadingTimeout;
private int tensorParallelDegree;
private int pipelineParallelDegree;
private Map<String, String> envs;
private Map<String, String> initParameters;
private boolean initialized;
Expand Down Expand Up @@ -363,6 +364,33 @@ public void setTensorParallelDegree(int tensorParallelDegree) {
this.tensorParallelDegree = tensorParallelDegree;
}

/**
* Returns the pipeline parallel degree.
*
* @return the pipeline parallel degree
*/
public int getPipelineParallelDegree() {
if (pipelineParallelDegree == 0) {
String value = Utils.getenv("PIPELINE_PARALLEL_DEGREE");
if (value != null) {
pipelineParallelDegree = Integer.parseInt(value);
} else {
pipelineParallelDegree = 1;
}
}

return pipelineParallelDegree;
}

/**
* Sets the pipeline parallel degree.
*
* @param pipelineParallelDegree the pipeline parallel degree
*/
public void setPipelineParallelDegree(int pipelineParallelDegree) {
this.pipelineParallelDegree = pipelineParallelDegree;
}

int getMpiWorkers() {
int gpuCount = CudaUtils.getGpuCount() * clusterSize;
String visibleDevices = Utils.getenv("CUDA_VISIBLE_DEVICES");
Expand All @@ -373,7 +401,7 @@ int getMpiWorkers() {
}
gpuCount = visibleCount;
}
return gpuCount / getTensorParallelDegree();
return gpuCount / (getTensorParallelDegree() * getPipelineParallelDegree());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ public void load(Path modelPath, String prefix, Map<String, ?> options) throws I
pyEnv.setTensorParallelDegree(Integer.parseInt(value));
}
break;
case "pipeline_parallel_degree":
if (value != null) {
pyEnv.setPipelineParallelDegree(Integer.parseInt(value));
} else {
pyEnv.setPipelineParallelDegree(1);
}
break;
case "handler":
pyEnv.setHandler(value);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,26 @@ class PyProcess {
int port = counter.getAndIncrement();
if (pyEnv.isMpiMode()) {
int tensorParallelDegree = pyEnv.getTensorParallelDegree();
int pipelineParallelDegree = pyEnv.getPipelineParallelDegree();
int clusterSize = PyEnv.getClusterSize();
connections = new ArrayList<>(tensorParallelDegree);
connections = new ArrayList<>(tensorParallelDegree * pipelineParallelDegree);

if (clusterSize > 1) {
hosts = getHosts(clusterSize);
for (int i = 0; i < tensorParallelDegree; ++i) {
for (int i = 0; i < tensorParallelDegree * pipelineParallelDegree; ++i) {
connections.add(
new Connection(
pyEnv,
port,
i,
hosts[i / (tensorParallelDegree / clusterSize)]));
hosts[
i
/ (tensorParallelDegree
* pipelineParallelDegree
/ clusterSize)]));
}
} else {
for (int i = 0; i < tensorParallelDegree; ++i) {
for (int i = 0; i < tensorParallelDegree * pipelineParallelDegree; ++i) {
connections.add(new Connection(pyEnv, port, i, "127.0.0.1"));
}
}
Expand Down
12 changes: 12 additions & 0 deletions serving/docker/partition/properties_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ def __init__(self, args, **kwargs):
if args.tensor_parallel_degree:
self.properties[
'option.tensor_parallel_degree'] = args.tensor_parallel_degree
if args.pipeline_parallel_degree:
self.properties[
'option.pipeline_parallel_degree'] = args.pipeline_parallel_degree
if args.quantize:
self.properties['option.quantize'] = args.quantize

Expand All @@ -57,6 +60,7 @@ def __init__(self, args, **kwargs):

if self.is_mpi_mode:
self.validate_tp_degree()
self.validate_pp_degree()

self.set_and_validate_entry_point()
self.set_and_validate_save_mp_checkpoint_path()
Expand Down Expand Up @@ -144,6 +148,14 @@ def validate_tp_degree(self):
f'GPU devices are not enough to run {tensor_parallel_degree} partitions.'
)

def validate_pp_degree(self):
pipeline_parallel_degree = self.properties.get(
'option.pipeline_parallel_degree')
if not pipeline_parallel_degree:
raise ValueError(
'Expecting pipeline_parallel_degree to be set of a default of 1'
)

def set_and_validate_entry_point(self):
entry_point = self.properties.get('option.entryPoint')
quantize = self.properties.get('option.quantize')
Expand Down
5 changes: 5 additions & 0 deletions serving/docker/partition/trt_llm_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def create_trt_llm_repo(properties, args):
kwargs = remove_option_from_properties(properties)
kwargs['trt_llm_model_repo'] = args.trt_llm_model_repo
kwargs["tensor_parallel_degree"] = args.tensor_parallel_degree
kwargs["pipeline_parallel_degree"] = args.pipeline_parallel_degree
model_id_or_path = args.model_path or kwargs['model_id']
create_model_repo(model_id_or_path, **kwargs)

Expand All @@ -48,6 +49,10 @@ def main():
type=int,
required=True,
help='Tensor parallel degree')
parser.add_argument('--pipeline_parallel_degree',
type=int,
required=True,
help='Pipeline parallel degree')
parser.add_argument('--model_path',
type=str,
required=False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ static void configure(
setRollingBatch(lmiProperties, modelConfig, features);
setMpiMode(lmiProperties, modelConfig, features);
setTensorParallelDegree(lmiProperties);
setPipelineParallelDegree(lmiProperties);
setRollingBatchSize(lmiProperties);
}

Expand Down Expand Up @@ -149,6 +150,14 @@ private static void setTensorParallelDegree(Properties lmiProperties) {
lmiProperties.setProperty("option.tensor_parallel_degree", tpDegree);
}

private static void setPipelineParallelDegree(Properties lmiProperties) {
if (lmiProperties.containsKey("option.pipeline_parallel_degree")) {
return;
}
String ppDegree = Utils.getenv("PIPELINE_PARALLEL_DEGREE", "1");
lmiProperties.setProperty("option.pipeline_parallel_degree", ppDegree);
}

private static void setDynamicBatch(
Properties lmiProperties,
LmiUtils.HuggingFaceModelConfig modelConfig,
Expand Down
16 changes: 12 additions & 4 deletions wlm/src/main/java/ai/djl/serving/wlm/LmiUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ static void convertTrtLLM(ModelInfo<?, ?> info) throws IOException {
if (modelId == null) {
modelId = trtRepo.toString();
}

String tpDegree = info.prop.getProperty("option.tensor_parallel_degree");
if (tpDegree == null) {
tpDegree = Utils.getenv("TENSOR_PARALLEL_DEGREE", "max");
Expand All @@ -118,19 +119,24 @@ static void convertTrtLLM(ModelInfo<?, ?> info) throws IOException {
tpDegree = String.valueOf(CudaUtils.getGpuCount());
}

String ppDegree = info.prop.getProperty("option.pipeline_parallel_degree");
if (ppDegree == null) {
ppDegree = Utils.getenv("PIPELINE_PARALLEL_DEGREE", "1");
}

// TODO TrtLLM python backend: Change it once TrtLLM supports T5 with inflight batching.
if (info.prop.containsKey("trtllm_python_backend")) {
// Inflight batching support is not available for certain models like t5.
// Python backend models have different model repo format compared to C++ backend.
// And whether it is valid or not is checked in tensorrt_llm_toolkit. So it is not
// necessary to check here.
if (!isValidTrtLlmPythonModelRepo(trtRepo)) {
info.downloadDir = buildTrtLlmArtifacts(info.modelDir, modelId, tpDegree);
info.downloadDir = buildTrtLlmArtifacts(info.modelDir, modelId, tpDegree, ppDegree);
}
} else {
info.prop.put("option.rolling_batch", "trtllm");
if (!isValidTrtLlmModelRepo(trtRepo)) {
info.downloadDir = buildTrtLlmArtifacts(info.modelDir, modelId, tpDegree);
info.downloadDir = buildTrtLlmArtifacts(info.modelDir, modelId, tpDegree, ppDegree);
}
}
}
Expand Down Expand Up @@ -308,8 +314,8 @@ private static HuggingFaceModelConfig getHuggingFaceModelConfig(ModelInfo<?, ?>
}
}

private static Path buildTrtLlmArtifacts(Path modelDir, String modelId, String tpDegree)
throws IOException {
private static Path buildTrtLlmArtifacts(
Path modelDir, String modelId, String tpDegree, String ppDegree) throws IOException {
logger.info("Converting model to TensorRT-LLM artifacts");
String hash = Utils.hash(modelId + tpDegree);
String download = Utils.getenv("SERVING_DOWNLOAD_DIR", null);
Expand All @@ -329,6 +335,8 @@ private static Path buildTrtLlmArtifacts(Path modelDir, String modelId, String t
trtLlmRepoDir.toString(),
"--tensor_parallel_degree",
tpDegree,
"--pipeline_parallel_degree",
ppDegree,
"--model_path",
modelId
};
Expand Down
Loading