From 8fc0eecac118c2ae2b082cfb543114baa9c0e9b2 Mon Sep 17 00:00:00 2001 From: Isaac Francisco <78627776+isahers1@users.noreply.github.com> Date: Thu, 5 Sep 2024 20:37:47 -0700 Subject: [PATCH] [docs]: add subgraph streaming how-to (and some small cleanups) (#1606) * wip * spelling * edit format_namespace --- docs/_scripts/copy_notebooks.py | 1 + docs/docs/cloud/faq/studio.md | 6 +- docs/docs/how-tos/index.md | 1 + docs/mkdocs.yml | 1 + examples/streaming-from-final-node.ipynb | 8 +- examples/streaming-subgraphs.ipynb | 364 +++++++++++++++++++++++ 6 files changed, 376 insertions(+), 5 deletions(-) create mode 100644 examples/streaming-subgraphs.ipynb diff --git a/docs/_scripts/copy_notebooks.py b/docs/_scripts/copy_notebooks.py index 382edbb96..28bd89111 100644 --- a/docs/_scripts/copy_notebooks.py +++ b/docs/_scripts/copy_notebooks.py @@ -26,6 +26,7 @@ "streaming-events-from-within-tools.ipynb", "streaming-events-from-within-tools-without-langchain.ipynb", "streaming-from-final-node.ipynb", + "streaming-subgraphs.ipynb", "persistence.ipynb", "input_output_schema.ipynb", "pass_private_state.ipynb", diff --git a/docs/docs/cloud/faq/studio.md b/docs/docs/cloud/faq/studio.md index 070014ce7..232699315 100644 --- a/docs/docs/cloud/faq/studio.md +++ b/docs/docs/cloud/faq/studio.md @@ -54,7 +54,7 @@ The first way to solve this is to add path maps to your conditional edges. A pat === "Javascript" ```ts - graph.addConditionalEdges("node_a", routingFunction, ["node_b", "node_c"]); + graph.addConditionalEdges("node_a", routingFunction, { true: "node_b", false: "node_c" }); ``` In this case, the routing function returns either True or False, which map to `node_b` and `node_c` respectively. @@ -66,8 +66,8 @@ Instead of passing a path map, you can also be explicit about the typing of your ```python def routing_function(state: GraphState) -> Literal["node_b","node_c"]: if state['some_condition'] == True: - return "node_a" - else: return "node_b" + else: + return "node_c" ``` diff --git a/docs/docs/how-tos/index.md b/docs/docs/how-tos/index.md index 171294b3d..966b77221 100644 --- a/docs/docs/how-tos/index.md +++ b/docs/docs/how-tos/index.md @@ -55,6 +55,7 @@ These guides show how to use different streaming modes. - [How to stream events from within a tool](streaming-events-from-within-tools.ipynb) - [How to stream events from within a tool without LangChain models](streaming-events-from-within-tools-without-langchain.ipynb) - [How to stream events from the final node](streaming-from-final-node.ipynb) +- [How to stream from subgraphs](streaming-subgraphs.ipynb) ## Tool calling diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 9291a309f..2c02a66cb 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -155,6 +155,7 @@ nav: - Stream events from within tools: how-tos/streaming-events-from-within-tools.ipynb - Stream events from within tools without LangChain models: how-tos/streaming-events-from-within-tools-without-langchain.ipynb - Stream events from the final node: how-tos/streaming-from-final-node.ipynb + - Stream from subgraphs: how-tos/streaming-subgraphs.ipynb - Tool calling: - Call tools using ToolNode: how-tos/tool-calling.ipynb - Handle tool calling errors: how-tos/tool-calling-errors.ipynb diff --git a/examples/streaming-from-final-node.ipynb b/examples/streaming-from-final-node.ipynb index 30496a3a1..f7a8d8c66 100644 --- a/examples/streaming-from-final-node.ipynb +++ b/examples/streaming-from-final-node.ipynb @@ -13,7 +13,11 @@ "id": "964686a6-8fed-4360-84d2-958c48186008", "metadata": {}, "source": [ - "A common use case is streaming from an agent is to stream LLM tokens from inside the final node. This guide demonstrates how you can do this." + "A common use case is streaming from an agent is to stream LLM tokens from inside the final node. This guide demonstrates how you can do this.\n", + "\n", + "## Setup\n", + "\n", + "First let's install our required packages and set our environment variables." ] }, { @@ -34,7 +38,7 @@ "metadata": {}, "outputs": [ { - "name": "stdin", + "name": "stdout", "output_type": "stream", "text": [ "OPENAI_API_KEY: ········\n" diff --git a/examples/streaming-subgraphs.ipynb b/examples/streaming-subgraphs.ipynb new file mode 100644 index 000000000..ca7f0808c --- /dev/null +++ b/examples/streaming-subgraphs.ipynb @@ -0,0 +1,364 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# How to stream from subgraphs\n", + "\n", + "If you have created a graph with subgraphs you may wish to stream things occurring inside those subgraphs (or you may not!). This guide will walk through how you can control the information that is streamed back from subgraphs.\n", + "\n", + "## Setup\n", + "\n", + "First let's download the required packages and set our OpenAI API key since we will need that to run the models " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%capture --no-stderr\n", + "%pip install -U langgraph langchain-openai" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import getpass\n", + "import os\n", + "\n", + "\n", + "def _set_env(var: str):\n", + " if not os.environ.get(var):\n", + " os.environ[var] = getpass.getpass(f\"{var}: \")\n", + "\n", + "\n", + "_set_env(\"OPENAI_API_KEY\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Define subgraphs\n", + "\n", + "We are going to use the same subgraph from [this how-to](https://langchain-ai.github.io/langgraph/how-tos/subgraph/)." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "from typing import TypedDict, Optional, Annotated\n", + "from langgraph.checkpoint.memory import MemorySaver\n", + "from langgraph.graph import StateGraph, START, END\n", + "\n", + "\n", + "# The structure of the logs\n", + "class Logs(TypedDict):\n", + " id: str\n", + " question: str\n", + " answer: str\n", + " grade: Optional[int]\n", + " feedback: Optional[str]\n", + "\n", + "\n", + "# Define custom reducer (see more on this in the \"Custom reducer\" section below)\n", + "def add_logs(left: list[Logs], right: list[Logs]) -> list[Logs]:\n", + " if not left:\n", + " left = []\n", + " \n", + " if not right:\n", + " right = []\n", + "\n", + " logs = left.copy()\n", + " left_id_to_idx = {log[\"id\"]: idx for idx, log in enumerate(logs)}\n", + " # update if the new logs are already in the state, otherwise append\n", + " for log in right:\n", + " idx = left_id_to_idx.get(log[\"id\"])\n", + " if idx is not None:\n", + " logs[idx] = log\n", + " else:\n", + " logs.append(log)\n", + " return logs\n", + "\n", + "\n", + "# Failure Analysis Subgraph\n", + "class FailureAnalysisState(TypedDict):\n", + " # keys shared with the parent graph (EntryGraphState)\n", + " logs: Annotated[list[Logs], add_logs]\n", + " failure_report: str\n", + " # subgraph key\n", + " failures: list[Logs]\n", + "\n", + "\n", + "def get_failures(state: FailureAnalysisState):\n", + " failures = [log for log in state[\"logs\"] if log[\"grade\"] == 0]\n", + " return {\"failures\": failures}\n", + "\n", + "\n", + "def generate_summary(state: FailureAnalysisState):\n", + " failures = state[\"failures\"]\n", + " # NOTE: you can implement custom summarization logic here\n", + " failure_ids = [log[\"id\"] for log in failures]\n", + " fa_summary = f\"Poor quality of retrieval for document IDs: {', '.join(failure_ids)}\"\n", + " return {\"failure_report\": fa_summary}\n", + "\n", + "\n", + "fa_builder = StateGraph(FailureAnalysisState)\n", + "fa_builder.add_node(\"get_failures\", get_failures)\n", + "fa_builder.add_node(\"generate_summary\", generate_summary)\n", + "fa_builder.add_edge(START, \"get_failures\")\n", + "fa_builder.add_edge(\"get_failures\", \"generate_summary\")\n", + "fa_builder.add_edge(\"generate_summary\", END)\n", + "\n", + "\n", + "# Summarization subgraph\n", + "class QuestionSummarizationState(TypedDict):\n", + " # keys that are shared with the parent graph (EntryGraphState)\n", + " summary_report: str\n", + " logs: Annotated[list[Logs], add_logs]\n", + " # subgraph keys\n", + " summary: str\n", + "\n", + "def generate_summary(state: QuestionSummarizationState):\n", + " docs = state[\"logs\"]\n", + " # NOTE: you can implement custom summarization logic here\n", + " summary = \"Questions focused on usage of ChatOllama and Chroma vector store.\"\n", + " return {\"summary\": summary}\n", + "\n", + "\n", + "def send_to_slack(state: QuestionSummarizationState):\n", + " summary = state[\"summary\"]\n", + " # NOTE: you can implement custom logic here, for example sending the summary generated in the previous step to Slack\n", + " return {\"summary_report\": summary}\n", + "\n", + "\n", + "qs_builder = StateGraph(QuestionSummarizationState)\n", + "qs_builder.add_node(\"generate_summary\", generate_summary)\n", + "qs_builder.add_node(\"send_to_slack\", send_to_slack)\n", + "qs_builder.add_edge(START, \"generate_summary\")\n", + "qs_builder.add_edge(\"generate_summary\", \"send_to_slack\")\n", + "qs_builder.add_edge(\"send_to_slack\", END)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Define parent graph" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "image/jpeg": "", + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# Entry Graph\n", + "class EntryGraphState(TypedDict):\n", + " raw_logs: Annotated[list[Logs], add_logs]\n", + " logs: Annotated[list[Logs], add_logs] # This will be used in subgraphs\n", + " failure_report: str # This will be generated in the FA subgraph\n", + " summary_report: str # This will be generated in the QS subgraph\n", + "\n", + "\n", + "def select_logs(state):\n", + " return {\"logs\": [log for log in state[\"raw_logs\"] if \"grade\" in log]}\n", + "\n", + "\n", + "entry_builder = StateGraph(EntryGraphState)\n", + "entry_builder.add_node(\"select_logs\", select_logs)\n", + "entry_builder.add_node(\"question_summarization\", qs_builder.compile())\n", + "entry_builder.add_node(\"failure_analysis\", fa_builder.compile())\n", + "\n", + "entry_builder.add_edge(START, \"select_logs\")\n", + "entry_builder.add_edge(\"select_logs\", \"failure_analysis\")\n", + "entry_builder.add_edge(\"select_logs\", \"question_summarization\")\n", + "entry_builder.add_edge(\"failure_analysis\", END)\n", + "entry_builder.add_edge(\"question_summarization\", END)\n", + "\n", + "graph = entry_builder.compile()\n", + "\n", + "from IPython.display import Image, display\n", + "\n", + "# Setting xray to 1 will show the internal structure of the nested graph\n", + "display(Image(graph.get_graph(xray=1).draw_mermaid_png()))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Stream\n", + "\n", + "Now let's see how we can stream from our graph!\n", + "\n", + "### Define input\n", + "\n", + "First, let's define the input we will use for the rest of the notebook:" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "# Dummy logs\n", + "dummy_logs = [\n", + " Logs(\n", + " id=\"1\",\n", + " question=\"How can I import ChatOllama?\",\n", + " grade=1,\n", + " answer=\"To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'\",\n", + " ),\n", + " Logs(\n", + " id=\"2\",\n", + " question=\"How can I use Chroma vector store?\",\n", + " answer=\"To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).\",\n", + " grade=0,\n", + " feedback=\"The retrieved documents discuss vector stores in general, but not Chroma specifically\",\n", + " ),\n", + " Logs(\n", + " id=\"3\",\n", + " question=\"How do I create react agent in langgraph?\",\n", + " answer=\"from langgraph.prebuilt import create_react_agent\",\n", + " )\n", + "]\n", + "\n", + "input = {\"raw_logs\": dummy_logs}" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Stream normally\n", + "\n", + "First let us examine the output of streaming normally:" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "---------- Update from node select_logs ---------\n", + "{'logs': [{'id': '1', 'question': 'How can I import ChatOllama?', 'grade': 1, 'answer': \"To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'\"}, {'id': '2', 'question': 'How can I use Chroma vector store?', 'answer': 'To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).', 'grade': 0, 'feedback': 'The retrieved documents discuss vector stores in general, but not Chroma specifically'}]}\n", + "---------- Update from node failure_analysis ---------\n", + "{'logs': [{'id': '1', 'question': 'How can I import ChatOllama?', 'grade': 1, 'answer': \"To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'\"}, {'id': '2', 'question': 'How can I use Chroma vector store?', 'answer': 'To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).', 'grade': 0, 'feedback': 'The retrieved documents discuss vector stores in general, but not Chroma specifically'}], 'failure_report': 'Poor quality of retrieval for document IDs: 2'}\n", + "---------- Update from node question_summarization ---------\n", + "{'logs': [{'id': '1', 'question': 'How can I import ChatOllama?', 'grade': 1, 'answer': \"To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'\"}, {'id': '2', 'question': 'How can I use Chroma vector store?', 'answer': 'To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).', 'grade': 0, 'feedback': 'The retrieved documents discuss vector stores in general, but not Chroma specifically'}], 'summary_report': 'Questions focused on usage of ChatOllama and Chroma vector store.'}\n" + ] + } + ], + "source": [ + "for chunk in graph.stream(input, stream_mode=\"updates\"):\n", + " node_name = list(chunk.keys())[0]\n", + " print(f\"---------- Update from node {node_name} ---------\")\n", + " print(chunk[node_name])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "As you can see there are only 3 updates made to our overall graph state. The first one is by the `select_logs` node, and then we receive one update from each subgraph (note if you don't want to see the `log` update from each subgraph that you can set the [output schema](https://langchain-ai.github.io/langgraph/how-tos/input_output_schema/) to exclude it). What we do not see however, is the updates occurring *inside* each subgraph. The next section will explain how to do that.\n", + "\n", + "### Stream subgraph \n", + "\n", + "To show the updates occurring inside of each subgraph, we can simply set `subgraphs=True` to the streaming call:" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "---------- Update from node select_logs in parent graph ---------\n", + "{'logs': [{'id': '1', 'question': 'How can I import ChatOllama?', 'grade': 1, 'answer': \"To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'\"}, {'id': '2', 'question': 'How can I use Chroma vector store?', 'answer': 'To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).', 'grade': 0, 'feedback': 'The retrieved documents discuss vector stores in general, but not Chroma specifically'}]}\n", + "---------- Update from node get_failures in failure_analysis subgraph ---------\n", + "{'failures': [{'id': '2', 'question': 'How can I use Chroma vector store?', 'answer': 'To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).', 'grade': 0, 'feedback': 'The retrieved documents discuss vector stores in general, but not Chroma specifically'}]}\n", + "---------- Update from node generate_summary in failure_analysis subgraph ---------\n", + "{'failure_report': 'Poor quality of retrieval for document IDs: 2'}\n", + "---------- Update from node failure_analysis in parent graph ---------\n", + "{'logs': [{'id': '1', 'question': 'How can I import ChatOllama?', 'grade': 1, 'answer': \"To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'\"}, {'id': '2', 'question': 'How can I use Chroma vector store?', 'answer': 'To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).', 'grade': 0, 'feedback': 'The retrieved documents discuss vector stores in general, but not Chroma specifically'}], 'failure_report': 'Poor quality of retrieval for document IDs: 2'}\n", + "---------- Update from node generate_summary in question_summarization subgraph ---------\n", + "{'summary': 'Questions focused on usage of ChatOllama and Chroma vector store.'}\n", + "---------- Update from node send_to_slack in question_summarization subgraph ---------\n", + "{'summary_report': 'Questions focused on usage of ChatOllama and Chroma vector store.'}\n", + "---------- Update from node question_summarization in parent graph ---------\n", + "{'logs': [{'id': '1', 'question': 'How can I import ChatOllama?', 'grade': 1, 'answer': \"To import ChatOllama, use: 'from langchain_community.chat_models import ChatOllama.'\"}, {'id': '2', 'question': 'How can I use Chroma vector store?', 'answer': 'To use Chroma, define: rag_chain = create_retrieval_chain(retriever, question_answer_chain).', 'grade': 0, 'feedback': 'The retrieved documents discuss vector stores in general, but not Chroma specifically'}], 'summary_report': 'Questions focused on usage of ChatOllama and Chroma vector store.'}\n" + ] + } + ], + "source": [ + "# Format the namespace slightly nicer\n", + "def format_namespace(namespace):\n", + " return namespace[-1].split(':')[0]+' subgraph' if len(namespace) > 0 else 'parent graph'\n", + "\n", + "for namespace, chunk in graph.stream(input, stream_mode=\"updates\", subgraphs=True):\n", + " node_name = list(chunk.keys())[0]\n", + " print(f\"---------- Update from node {node_name} in {format_namespace(namespace)} ---------\")\n", + " print(chunk[node_name])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The first thing you will notice as different is that we are no longer just receiving chunks, but we also receive namespaces which tell us what subgraph we are currently inside of.\n", + "\n", + "If you look carefully at the logs you can see we are now receiving the updates made by nodes inside of each subgraph, for instance we now see updates to the `summary_report` state channel from the `get_failure` node which lives in the `failure_analysis` subgraph. When we didn't set `subgraphs=True` all we saw was the overall update made by the subgraph `failure_analysis`." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}