From 41537c583a3d24d586cb174b44f400c5ca73fac5 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Fri, 20 Sep 2024 17:19:59 -0700 Subject: [PATCH] sdk: Add support for resuming from checkpoint --- libs/sdk-py/langgraph_sdk/client.py | 20 ++++++++++++-------- libs/sdk-py/langgraph_sdk/schema.py | 16 ++++++++++++++-- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/libs/sdk-py/langgraph_sdk/client.py b/libs/sdk-py/langgraph_sdk/client.py index d5daf3b57..80c01e520 100644 --- a/libs/sdk-py/langgraph_sdk/client.py +++ b/libs/sdk-py/langgraph_sdk/client.py @@ -1012,6 +1012,7 @@ def stream( stream_subgraphs: bool = False, metadata: Optional[dict] = None, config: Optional[Config] = None, + checkpoint: Optional[Checkpoint] = None, checkpoint_id: Optional[str] = None, interrupt_before: Optional[list[str]] = None, interrupt_after: Optional[list[str]] = None, @@ -1052,6 +1053,7 @@ def stream( stream_subgraphs: bool = False, metadata: Optional[dict] = None, config: Optional[Config] = None, + checkpoint: Optional[Checkpoint] = None, checkpoint_id: Optional[str] = None, interrupt_before: Optional[list[str]] = None, interrupt_after: Optional[list[str]] = None, @@ -1074,11 +1076,9 @@ def stream( stream_subgraphs: Whether to stream output from subgraphs. metadata: Metadata to assign to the run. config: The configuration for the assistant. - checkpoint_id: The checkpoint to start streaming from. + checkpoint: The checkpoint to resume from. interrupt_before: Nodes to interrupt immediately before they get executed. - interrupt_after: Nodes to Nodes to interrupt immediately after they get executed. - feedback_keys: Feedback keys to assign to run. webhook: Webhook to call after LangGraph API call is done. multitask_strategy: Multitask strategy to use. @@ -1102,7 +1102,6 @@ def stream( stream_mode=["values","debug"], metadata={"name":"my_run"}, config={"configurable": {"model_name": "anthropic"}}, - checkpoint_id="my_checkpoint", interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"], interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"], feedback_keys=["my_feedback_key_1","my_feedback_key_2"], @@ -1130,6 +1129,7 @@ def stream( "interrupt_after": interrupt_after, "feedback_keys": feedback_keys, "webhook": webhook, + "checkpoint": checkpoint, "checkpoint_id": checkpoint_id, "multitask_strategy": multitask_strategy, "on_disconnect": on_disconnect, @@ -1174,6 +1174,7 @@ async def create( stream_subgraphs: bool = False, metadata: Optional[dict] = None, config: Optional[Config] = None, + checkpoint: Optional[Checkpoint] = None, checkpoint_id: Optional[str] = None, interrupt_before: Optional[list[str]] = None, interrupt_after: Optional[list[str]] = None, @@ -1192,6 +1193,7 @@ async def create( stream_subgraphs: bool = False, metadata: Optional[dict] = None, config: Optional[Config] = None, + checkpoint: Optional[Checkpoint] = None, checkpoint_id: Optional[str] = None, interrupt_before: Optional[list[str]] = None, interrupt_after: Optional[list[str]] = None, @@ -1212,7 +1214,7 @@ async def create( stream_subgraphs: Whether to stream output from subgraphs. metadata: Metadata to assign to the run. config: The configuration for the assistant. - checkpoint_id: The checkpoint to start streaming from. + checkpoint: The checkpoint to resume from. interrupt_before: Nodes to interrupt immediately before they get executed. interrupt_after: Nodes to Nodes to interrupt immediately after they get executed. webhook: Webhook to call after LangGraph API call is done. @@ -1234,7 +1236,6 @@ async def create( input={"messages": [{"role": "user", "content": "hello!"}]}, metadata={"name":"my_run"}, config={"configurable": {"model_name": "openai"}}, - checkpoint_id="my_checkpoint", interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"], interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"], webhook="https://my.fake.webhook.com", @@ -1301,6 +1302,7 @@ async def create( "interrupt_before": interrupt_before, "interrupt_after": interrupt_after, "webhook": webhook, + "checkpoint": checkpoint, "checkpoint_id": checkpoint_id, "multitask_strategy": multitask_strategy, "on_completion": on_completion, @@ -1330,6 +1332,7 @@ async def wait( input: Optional[dict] = None, metadata: Optional[dict] = None, config: Optional[Config] = None, + checkpoint: Optional[Checkpoint] = None, checkpoint_id: Optional[str] = None, interrupt_before: Optional[list[str]] = None, interrupt_after: Optional[list[str]] = None, @@ -1364,6 +1367,7 @@ async def wait( input: Optional[dict] = None, metadata: Optional[dict] = None, config: Optional[Config] = None, + checkpoint: Optional[Checkpoint] = None, checkpoint_id: Optional[str] = None, interrupt_before: Optional[list[str]] = None, interrupt_after: Optional[list[str]] = None, @@ -1383,7 +1387,7 @@ async def wait( input: The input to the graph. metadata: Metadata to assign to the run. config: The configuration for the assistant. - checkpoint_id: The checkpoint to start streaming from. + checkpoint: The checkpoint to resume from. interrupt_before: Nodes to interrupt immediately before they get executed. interrupt_after: Nodes to Nodes to interrupt immediately after they get executed. webhook: Webhook to call after LangGraph API call is done. @@ -1407,7 +1411,6 @@ async def wait( input={"messages": [{"role": "user", "content": "how are you?"}]}, metadata={"name":"my_run"}, config={"configurable": {"model_name": "anthropic"}}, - checkpoint_id="my_checkpoint", interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"], interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"], webhook="https://my.fake.webhook.com", @@ -1452,6 +1455,7 @@ async def wait( "interrupt_before": interrupt_before, "interrupt_after": interrupt_after, "webhook": webhook, + "checkpoint": checkpoint, "checkpoint_id": checkpoint_id, "multitask_strategy": multitask_strategy, "on_disconnect": on_disconnect, diff --git a/libs/sdk-py/langgraph_sdk/schema.py b/libs/sdk-py/langgraph_sdk/schema.py index 042670d5b..69f219c6e 100644 --- a/libs/sdk-py/langgraph_sdk/schema.py +++ b/libs/sdk-py/langgraph_sdk/schema.py @@ -44,9 +44,10 @@ class Config(TypedDict, total=False): class Checkpoint(TypedDict): """Checkpoint model.""" - checkpoint_id: str + thread_id: str checkpoint_ns: str - checkpoint_map: dict[str, Any] + checkpoint_id: Optional[str] + checkpoint_map: Optional[dict[str, Any]] class GraphSchema(TypedDict): @@ -112,6 +113,15 @@ class Thread(TypedDict): """The current state of the thread.""" +class ThreadTask(TypedDict): + id: str + name: str + error: Optional[str] + interrupts: list[dict] + checkpoint: Optional[Checkpoint] + state: Optional["ThreadState"] + + class ThreadState(TypedDict): values: Union[list[dict], dict[str, Any]] """The state values.""" @@ -126,6 +136,8 @@ class ThreadState(TypedDict): """Timestamp of state creation""" parent_checkpoint: Optional[Checkpoint] """The ID of the parent checkpoint. If missing, this is the root checkpoint.""" + tasks: Sequence[ThreadTask] + """Tasks to execute in this step. If already attempted, may contain an error.""" class Run(TypedDict):