Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the ability to run the map_reduce chains process results step as async #6181

Merged

Conversation

MalhotraVijay
Copy link
Contributor

This will add the ability to add an AsyncCallbackManager (handler) for the reducer chain, which would be able to stream the tokens via the async def on_llm_new_token callback method

Fixes # (issue) 5532

@hwchase17 @agola11
The following code snippet explains how this change would be used to enable reduce_llm with streaming support in a map_reduce chain

I have tested this change and it works for the streaming use-case of reducer responses. I am happy to share more information if this makes solution sense.


AsyncHandler
..........................
class StreamingLLMCallbackHandler(AsyncCallbackHandler):
    """Callback handler for streaming LLM responses."""

    def __init__(self, websocket):
        self.websocket = websocket
    
    # This callback method is to be executed in async
    async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        resp = ChatResponse(sender="bot", message=token, type="stream")
        await self.websocket.send_json(resp.dict())


Chain
..........
stream_handler = StreamingLLMCallbackHandler(websocket)
stream_manager = AsyncCallbackManager([stream_handler])

streaming_llm = ChatOpenAI(
        streaming=True,
        callback_manager=stream_manager,
        verbose=False,
        temperature=0,
    )
    main_llm = OpenAI(
        temperature=0,
        verbose=False,
    )

    doc_chain = load_qa_chain(
        llm=main_llm,
        reduce_llm=streaming_llm,
        chain_type="map_reduce", 
        callback_manager=manager
    )
    qa_chain = ConversationalRetrievalChain(
        retriever=vectorstore.as_retriever(),
        combine_docs_chain=doc_chain,
        question_generator=question_generator,
        callback_manager=manager,
    )
    
    # Here `acall` will trigger `acombine_docs` on `map_reduce` which should then call `_aprocess_result` which in turn will call `self.combine_document_chain.arun` hence async callback will be awaited
    result = await qa_chain.acall(
         {"question": question, "chat_history": chat_history}
      )

…o that the callback methods of the reducer chain could be able to stream the tokens via websockets
Copy link
Collaborator

@agola11 agola11 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@MalhotraVijay
Copy link
Contributor Author

Hi @agola11 - Thank you for reviewing earlier. I have added a few lint fixes. Kindly review that when you get a chance. Thanks.

@MalhotraVijay MalhotraVijay requested a review from agola11 June 16, 2023 09:35
@MalhotraVijay
Copy link
Contributor Author

Hi @agola11 - Friendly reminder, if you could check and approve the PR again, as I've added some linting fixes which was causing the automation to fail. Thanks.

@agola11 agola11 merged commit 2b3b4e0 into langchain-ai:master Jun 18, 2023
This was referenced Jun 25, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants