Skip to content

Commit

Permalink
support jobQueueSize and job ticket per model in model config YAML (#…
Browse files Browse the repository at this point in the history
…2350)

* add jobQsize

* add jobQueueSize in model_config.yaml for mnist_scripted.mar

* feature job tickets

* rename hasJobTickets to getJobTickets

* fmt doc

* update doc

* fmt doc

* fmt doc

* update doc
  • Loading branch information
lxning authored Jun 13, 2023
1 parent 7f9967e commit 89fbd88
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 0 deletions.
11 changes: 11 additions & 0 deletions docs/large_model_inference.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,14 @@ torchrun:
# gpus you wish to split your model
OMP_NUMBER_THREADS: 2
```
#### Feature Job ticket is recommended for the use case of inference latency sensitive
When the job ticket feature is enabled, TorchServe verifies the availability of a model's active worker for processing a client's request. If an active worker is available, the request is accepted and processed immediately without waiting time incurred from job queue or dynamic batching; otherwise, a 503 response is sent back to client.

This feature help with use cases where inference latency can be high, such as generative models, auto regressive decoder models like chatGPT. This feature help such applications to take effective actions, for example, routing the rejected request to a different server, or scaling up model server capacity, based on the business requirements. Here is an example of enabling job ticket.
```yaml
minWorkers: 2
maxWorkers: 2
jobQueueSize: 2
useJobTicket: true
```
In this example, a model has 2 workers with job queue size 2. An inference request will be either processed by TorchServe immediately, or rejected with response code 503.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ public class ModelConfig {
* timeout. default: 0 which means no timeout (ie. clientExpireTS default value Long.MAX_VALUE.
*/
private long clientTimeoutInMills;
/**
* the job queue size of a model. By default, job_queue_size is set as 100 in config.property
* for all models. Here, jobQueueSize: -1 means no customized setting for the model.
*/
private int jobQueueSize;
/**
* the useJobTicket is a flag which allows an inference request to be accepted only if there are
* available workers.
*/
private boolean useJobTicket;

public static ModelConfig build(Map<String, Object> yamlMap) {
ModelConfig modelConfig = new ModelConfig();
Expand Down Expand Up @@ -134,6 +144,20 @@ public static ModelConfig build(Map<String, Object> yamlMap) {
v);
}
break;
case "jobQueueSize":
if (v instanceof Integer) {
modelConfig.setJobQueueSize((int) v);
} else {
logger.warn("Invalid jobQueueSize: {}, should be positive int", v);
}
break;
case "useJobTicket":
if (v instanceof Boolean) {
modelConfig.setUseJobTicket((boolean) v);
} else {
logger.warn("Invalid useJobTicket: {}, should be true or false", v);
}
break;
default:
break;
}
Expand Down Expand Up @@ -271,6 +295,24 @@ public void setClientTimeoutInMills(long clientTimeoutInMills) {
}
}

public int getJobQueueSize() {
return jobQueueSize;
}

public void setJobQueueSize(int jobQueueSize) {
if (jobQueueSize > 0) {
this.jobQueueSize = jobQueueSize;
}
}

public boolean isUseJobTicket() {
return useJobTicket;
}

public void setUseJobTicket(boolean useJobTicket) {
this.useJobTicket = useJobTicket;
}

public enum ParallelType {
NONE(""),
PP("pp"),
Expand Down
Binary file modified frontend/archive/src/test/resources/models/mnist_scripted.mar
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public BaseModelRequest getRequest(String threadName, WorkerState state)
model.pollBatch(
threadName, (state == WorkerState.WORKER_MODEL_LOADED) ? 0 : Long.MAX_VALUE, jobs);

if (model.isUseJobTicket() && jobs.isEmpty()) {
model.decNumJobTickets();
return req;
}

for (Job j : jobs.values()) {
if (j.isControlCmd()) {
if (jobs.size() > 1) {
Expand Down
37 changes: 37 additions & 0 deletions frontend/server/src/main/java/org/pytorch/serve/wlm/Model.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public class Model {
// Per worker thread job queue. This separates out the control queue from data queue
private ConcurrentMap<String, LinkedBlockingDeque<Job>> jobsDb;

private boolean useJobTicket;
private AtomicInteger numJobTickets;

public Model(ModelArchive modelArchive, int queueSize) {
this.modelArchive = modelArchive;
if (modelArchive != null && modelArchive.getModelConfig() != null) {
Expand Down Expand Up @@ -94,6 +97,11 @@ public Model(ModelArchive modelArchive, int queueSize) {
}
maxRetryTimeoutInMill = modelArchive.getModelConfig().getMaxRetryTimeoutInSec() * 1000;
clientTimeoutInMills = modelArchive.getModelConfig().getClientTimeoutInMills();
if (modelArchive.getModelConfig().getJobQueueSize() > 0) {
// overwrite the queueSize defined on config.property
queueSize = modelArchive.getModelConfig().getJobQueueSize();
}
useJobTicket = modelArchive.getModelConfig().isUseJobTicket();
} else {
batchSize = 1;
maxBatchDelay = 100;
Expand All @@ -111,6 +119,7 @@ public Model(ModelArchive modelArchive, int queueSize) {
// Always have a queue for data
jobsDb.putIfAbsent(DEFAULT_DATA_QUEUE, new LinkedBlockingDeque<>(queueSize));
failedInfReqs = new AtomicInteger(0);
numJobTickets = new AtomicInteger(0);
lock = new ReentrantLock();
modelVersionName =
new ModelVersionName(
Expand Down Expand Up @@ -225,6 +234,10 @@ public void removeJobQueue(String threadId) {
}

public boolean addJob(Job job) {
if (isUseJobTicket() && !getJobTickets()) {
logger.info("There are no job tickets");
return false;
}
return jobsDb.get(DEFAULT_DATA_QUEUE).offer(job);
}

Expand Down Expand Up @@ -253,6 +266,9 @@ public void pollBatch(String threadId, long waitTime, Map<String, Job> jobsRepo)
}

try {
if (isUseJobTicket()) {
incNumJobTickets();
}
lock.lockInterruptibly();
long maxDelay = maxBatchDelay;
jobsQueue = jobsDb.get(DEFAULT_DATA_QUEUE);
Expand Down Expand Up @@ -365,4 +381,25 @@ public long getClientTimeoutInMills() {
public void setClientTimeoutInMills(long clientTimeoutInMills) {
this.clientTimeoutInMills = clientTimeoutInMills;
}

public boolean isUseJobTicket() {
return useJobTicket;
}

public int incNumJobTickets() {
return this.numJobTickets.incrementAndGet();
}

public int decNumJobTickets() {
return this.numJobTickets.decrementAndGet();
}

public synchronized boolean getJobTickets() {
if (this.numJobTickets.get() == 0) {
return false;
}

this.numJobTickets.decrementAndGet();
return true;
}
}

0 comments on commit 89fbd88

Please sign in to comment.