Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous worker communication and vllm integration #3146

Merged
merged 40 commits into from
Jun 22, 2024

Conversation

mreso
Copy link
Collaborator

@mreso mreso commented May 21, 2024

Description

This PR adds a new asynchronous communications mode to torchServe in order to accommodate inferences engines like vLLM.

Context:
These engines usually include techniques like pages attention which improve memory utilization by fitting as many requests into the engine as possible and evacuate requests if space gets tight. This means the classical synchronized communication model of TorchServe (batch of N requests in -> batch of N requests out) does not work well with these engines. This feature now offer to forward all incoming requests to the backend where they are fed to the engine which start producing responses which are streamed out asynchronously.

Multi-worker note:
While this in theory works with multiple workers it would distribute the incoming requests in a round robin fashion which might lead to non optimal worker/hardware utilization. It is therefore advised to only use a single worker and utilize tensor parallelism to distribute the model over multiple GPUs.

Fixes #(issue)

Type of change

Please delete options that are not relevant.

  • New feature (non-breaking change which adds functionality)
  • This change requires a documentation update

Feature/Issue validation/testing

Please describe the Unit or Integration tests that you ran to verify your changes and relevant result summary. Provide instructions so it can be reproduced.
Please also list any relevant details for your test configuration.

  • pytest test/pytest/test_async_worker_comm.py test/pytest/test_example_vllm.py
====================================================================================================== test session starts ======================================================================================================
platform linux -- Python 3.10.12, pytest-7.3.1, pluggy-1.3.0
rootdir: /home/ubuntu/serve
plugins: mock-3.10.0, anyio-4.3.0, cov-4.1.0
collected 2 items

test/pytest/test_async_worker_comm.py .                                                                                                                                                                                   [ 50%]
test/pytest/test_example_vllm.py .                                                                                                                                                                                        [100%]

====================================================================================================== 2 passed in 31.83s =======================================================================================================

Checklist:

  • Did you have fun?
  • Have you added tests that prove your fix is effective or that this feature works?
  • Has code been commented, particularly in hard-to-understand areas?
  • Have you made corresponding changes to the documentation?

@mreso mreso force-pushed the feature/async_worker_comm branch from 7fb69b9 to 5d3ae53 Compare June 11, 2024 04:17
@mreso mreso marked this pull request as ready for review June 14, 2024 18:15
@mreso mreso requested a review from lxning June 14, 2024 18:16
@mreso mreso requested a review from namannandan June 14, 2024 18:16
@mreso mreso changed the title [WIP] Feature/async worker comm Asynchronous worker communication and vllm integration Jun 14, 2024

async def preprocess(self, requests):
input_batch = []
assert len(requests) == 1, "Expecting batch_size = 1"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this to ensure that the async request handling in the frontend immediately forwards the request to the backend without waiting to form a batch?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, in engines like the vllm there is no classical setting as the batch size but number of concurrent requests is influenced by the available memory as well as request length. Therefore, send the request to the engine asap is the best option in my opinion.

Comment on lines +105 to +109
} else {
logger.warn(
"Drop response for inference request {} due to client timeout",
job.getPayload().getRequestId());
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, when a job expires, the behavior we expect here is that the backend continues to stream responses until the stopping criteria is reached but the frontend does not send it back to the client? Do we have a way to signal to backend about expired jobs and clean up jobs_in_backend here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would be great to have this kind of functionality. But it will require some kind of command signaling into the backend which we can leverage. E.g. allowing for a C in the otf protocol which we discussed in the past also regarding up and downscaling of thread workers. We can add this functionality in a follow-up PR. For now the behavior will be as you described.

mreso and others added 2 commits June 20, 2024 16:46
…Aggregator.java


Remove job from jobs_in_backend on error

Co-authored-by: Naman Nandan <namankt55@gmail.com>
def __init__(self, service):
self.service = service
self.service.predict = types.MethodType(predict, self.service)
self.in_queue = Queue()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why we need a thread safe queue for the in_queue or we could safely use the non thread-safe async queue, similar to out_queue here as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thats a good point. We need the thread safe queue here as async queue does not provide a timeout. The timeout is actually not strictly necessary for the vllm use case but I wanted to recreate the same batching functionality as in the frontend for future reference. If e.g. someone wants to run multiple workers with batching in the backend.

while len(batch) < BATCH_SIZE and (time.time() - st) < MAX_WAIT:
timeout = max(0, MAX_WAIT - (time.time() - st))
request = self.in_queue.get(timeout=timeout)
batch += request
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We may need to check here if we actually got a request or we timed out before adding to batch

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When .get() times out which is the only alternative to getting a request an Empty expection is thrown which is caught in line 126 and will just restart the batch window.

Comment on lines 144 to 149
fetch = Thread(target=self.fetch_batches)
fetch.start()
receive = Thread(target=self.receive_requests)
receive.start()
send = Thread(target=self.send_responses)
send.start()
Copy link
Collaborator

@namannandan namannandan Jun 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one or more of these threads exit/terminate due to error and the others continue to run, what would the recovery look like? Would we have to restart the thread or somehow trigger an entire worker restart to ensure simple recovery?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, let me see if I can make this more fail resistant.

Copy link
Collaborator

@namannandan namannandan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few follow up questions on backend async service implementation, otherwise looks good to me!

Copy link
Collaborator

@agunapal agunapal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mreso mreso enabled auto-merge June 22, 2024 02:51
@mreso mreso added this pull request to the merge queue Jun 22, 2024
Merged via the queue into master with commit 5f3df71 Jun 22, 2024
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants