Skip to content

Commit

Permalink
sdk: Add support for resuming from checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
nfcampos committed Sep 21, 2024
1 parent d42bdd0 commit 41537c5
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 10 deletions.
20 changes: 12 additions & 8 deletions libs/sdk-py/langgraph_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,7 @@ def stream(
stream_subgraphs: bool = False,
metadata: Optional[dict] = None,
config: Optional[Config] = None,
checkpoint: Optional[Checkpoint] = None,
checkpoint_id: Optional[str] = None,
interrupt_before: Optional[list[str]] = None,
interrupt_after: Optional[list[str]] = None,
Expand Down Expand Up @@ -1052,6 +1053,7 @@ def stream(
stream_subgraphs: bool = False,
metadata: Optional[dict] = None,
config: Optional[Config] = None,
checkpoint: Optional[Checkpoint] = None,
checkpoint_id: Optional[str] = None,
interrupt_before: Optional[list[str]] = None,
interrupt_after: Optional[list[str]] = None,
Expand All @@ -1074,11 +1076,9 @@ def stream(
stream_subgraphs: Whether to stream output from subgraphs.
metadata: Metadata to assign to the run.
config: The configuration for the assistant.
checkpoint_id: The checkpoint to start streaming from.
checkpoint: The checkpoint to resume from.
interrupt_before: Nodes to interrupt immediately before they get executed.
interrupt_after: Nodes to Nodes to interrupt immediately after they get executed.
feedback_keys: Feedback keys to assign to run.
webhook: Webhook to call after LangGraph API call is done.
multitask_strategy: Multitask strategy to use.
Expand All @@ -1102,7 +1102,6 @@ def stream(
stream_mode=["values","debug"],
metadata={"name":"my_run"},
config={"configurable": {"model_name": "anthropic"}},
checkpoint_id="my_checkpoint",
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
feedback_keys=["my_feedback_key_1","my_feedback_key_2"],
Expand Down Expand Up @@ -1130,6 +1129,7 @@ def stream(
"interrupt_after": interrupt_after,
"feedback_keys": feedback_keys,
"webhook": webhook,
"checkpoint": checkpoint,
"checkpoint_id": checkpoint_id,
"multitask_strategy": multitask_strategy,
"on_disconnect": on_disconnect,
Expand Down Expand Up @@ -1174,6 +1174,7 @@ async def create(
stream_subgraphs: bool = False,
metadata: Optional[dict] = None,
config: Optional[Config] = None,
checkpoint: Optional[Checkpoint] = None,
checkpoint_id: Optional[str] = None,
interrupt_before: Optional[list[str]] = None,
interrupt_after: Optional[list[str]] = None,
Expand All @@ -1192,6 +1193,7 @@ async def create(
stream_subgraphs: bool = False,
metadata: Optional[dict] = None,
config: Optional[Config] = None,
checkpoint: Optional[Checkpoint] = None,
checkpoint_id: Optional[str] = None,
interrupt_before: Optional[list[str]] = None,
interrupt_after: Optional[list[str]] = None,
Expand All @@ -1212,7 +1214,7 @@ async def create(
stream_subgraphs: Whether to stream output from subgraphs.
metadata: Metadata to assign to the run.
config: The configuration for the assistant.
checkpoint_id: The checkpoint to start streaming from.
checkpoint: The checkpoint to resume from.
interrupt_before: Nodes to interrupt immediately before they get executed.
interrupt_after: Nodes to Nodes to interrupt immediately after they get executed.
webhook: Webhook to call after LangGraph API call is done.
Expand All @@ -1234,7 +1236,6 @@ async def create(
input={"messages": [{"role": "user", "content": "hello!"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "openai"}},
checkpoint_id="my_checkpoint",
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
Expand Down Expand Up @@ -1301,6 +1302,7 @@ async def create(
"interrupt_before": interrupt_before,
"interrupt_after": interrupt_after,
"webhook": webhook,
"checkpoint": checkpoint,
"checkpoint_id": checkpoint_id,
"multitask_strategy": multitask_strategy,
"on_completion": on_completion,
Expand Down Expand Up @@ -1330,6 +1332,7 @@ async def wait(
input: Optional[dict] = None,
metadata: Optional[dict] = None,
config: Optional[Config] = None,
checkpoint: Optional[Checkpoint] = None,
checkpoint_id: Optional[str] = None,
interrupt_before: Optional[list[str]] = None,
interrupt_after: Optional[list[str]] = None,
Expand Down Expand Up @@ -1364,6 +1367,7 @@ async def wait(
input: Optional[dict] = None,
metadata: Optional[dict] = None,
config: Optional[Config] = None,
checkpoint: Optional[Checkpoint] = None,
checkpoint_id: Optional[str] = None,
interrupt_before: Optional[list[str]] = None,
interrupt_after: Optional[list[str]] = None,
Expand All @@ -1383,7 +1387,7 @@ async def wait(
input: The input to the graph.
metadata: Metadata to assign to the run.
config: The configuration for the assistant.
checkpoint_id: The checkpoint to start streaming from.
checkpoint: The checkpoint to resume from.
interrupt_before: Nodes to interrupt immediately before they get executed.
interrupt_after: Nodes to Nodes to interrupt immediately after they get executed.
webhook: Webhook to call after LangGraph API call is done.
Expand All @@ -1407,7 +1411,6 @@ async def wait(
input={"messages": [{"role": "user", "content": "how are you?"}]},
metadata={"name":"my_run"},
config={"configurable": {"model_name": "anthropic"}},
checkpoint_id="my_checkpoint",
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
Expand Down Expand Up @@ -1452,6 +1455,7 @@ async def wait(
"interrupt_before": interrupt_before,
"interrupt_after": interrupt_after,
"webhook": webhook,
"checkpoint": checkpoint,
"checkpoint_id": checkpoint_id,
"multitask_strategy": multitask_strategy,
"on_disconnect": on_disconnect,
Expand Down
16 changes: 14 additions & 2 deletions libs/sdk-py/langgraph_sdk/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ class Config(TypedDict, total=False):
class Checkpoint(TypedDict):
"""Checkpoint model."""

checkpoint_id: str
thread_id: str
checkpoint_ns: str
checkpoint_map: dict[str, Any]
checkpoint_id: Optional[str]
checkpoint_map: Optional[dict[str, Any]]


class GraphSchema(TypedDict):
Expand Down Expand Up @@ -112,6 +113,15 @@ class Thread(TypedDict):
"""The current state of the thread."""


class ThreadTask(TypedDict):
id: str
name: str
error: Optional[str]
interrupts: list[dict]
checkpoint: Optional[Checkpoint]
state: Optional["ThreadState"]


class ThreadState(TypedDict):
values: Union[list[dict], dict[str, Any]]
"""The state values."""
Expand All @@ -126,6 +136,8 @@ class ThreadState(TypedDict):
"""Timestamp of state creation"""
parent_checkpoint: Optional[Checkpoint]
"""The ID of the parent checkpoint. If missing, this is the root checkpoint."""
tasks: Sequence[ThreadTask]
"""Tasks to execute in this step. If already attempted, may contain an error."""


class Run(TypedDict):
Expand Down

0 comments on commit 41537c5

Please sign in to comment.