Last Updated: 3/7/2026
Deep AgentsLangChainLangGraphIntegrationsLearnReferenceContribute
Get started
Capabilities
Production
- Application structure
- Test
- LangSmith Studio
- Agent Chat UI
- LangSmith Deployment
- LangSmith Observability
LangGraph APIs
Interrupts
Interrupts allow you to pause graph execution at specific points and wait for external input before continuing. This enables human-in-the-loop patterns where you need external input to proceed. When an interrupt is triggered, LangGraph saves the graph state using its persistence layer and waits indefinitely until you resume execution. Interrupts work by calling the interrupt() function at any point in your graph nodes. The function accepts any JSON-serializable value which is surfaced to the caller. When you’re ready to continue, you resume execution by re-invoking the graph using Command, which then becomes the return value of the interrupt() call from inside the node. Unlike static breakpoints (which pause before or after specific nodes), interrupts are dynamic—they can be placed anywhere in your code and can be conditional based on your application logic.
- Checkpointing keeps your place: the checkpointer writes the exact graph state so you can resume later, even when in an error state.
thread_idis your pointer: setconfig={"configurable": {"thread_id": ...}}to tell the checkpointer which state to load.- Interrupt payloads surface as
__interrupt__: the values you pass tointerrupt()return to the caller in the__interrupt__field so you know what the graph is waiting on.
The thread_id you choose is effectively your persistent cursor. Reusing it resumes the same checkpoint; using a new value starts a brand-new thread with an empty state.
Pause using interrupt
The interrupt function pauses graph execution and returns a value to the caller. When you call interrupt within a node, LangGraph saves the current graph state and waits for you to resume execution with input. To use interrupt, you need:
- A checkpointer to persist the graph state (use a durable checkpointer in production)
- A thread ID in your config so the runtime knows which state to resume from
- To call
interrupt()where you want to pause (payload must be JSON-serializable)
Copy
from langgraph.types import interrupt from langgraph. types import interrupt def approval_node(state: State): def approval_node(state: State): # Pause and ask for approval # Pause and ask for approval approved = interrupt("Do you approve this action?") approved = interrupt("Do you approve this action?") # When you resume, Command(resume=...) returns that value here # When you resume, Command(resume=...) returns that value here return {"approved": approved} return {"approved": approved}When you call interrupt, here’s what happens:
- Graph execution gets suspended at the exact point where
interruptis called - State is saved using the checkpointer so execution can be resumed later, In production, this should be a persistent checkpointer (e.g. backed by a database)
- Value is returned to the caller under
__interrupt__; it can be any JSON-serializable value (string, object, array, etc.) - Graph waits indefinitely until you resume execution with a response
- Response is passed back into the node when you resume, becoming the return value of the
interrupt()call
Resuming interrupts
After an interrupt pauses execution, you resume the graph by invoking it again with a Command that contains the resume value. The resume value is passed back to the interrupt call, allowing the node to continue execution with the external input.
Copy
from langgraph.types import Command from langgraph. types import Command # Initial run - hits the interrupt and pauses# Initial run - hits the interrupt and pauses# thread_id is the persistent pointer (stores a stable ID in production)# thread_id is the persistent pointer (stores a stable ID in production)config = {"configurable": {"thread_id": "thread-1"}} config = {"configurable": {"thread_id": "thread-1"}}result = graph.invoke({"input": "data"}, config=config) result = graph. invoke({"input": "data"}, config = config) # Check what was interrupted # Check what was interrupted# __interrupt__ contains the payload that was passed to interrupt()# __interrupt__ contains the payload that was passed to interrupt()print(result["__interrupt__"]) print(result[" __interrupt__ "])# > [Interrupt(value='Do you approve this action?')]# > [Interrupt(value='Do you approve this action?')] # Resume with the human's response # Resume with the human's response# The resume payload becomes the return value of interrupt() inside the node# The resume payload becomes the return value of interrupt() inside the nodegraph.invoke(Command(resume=True), config=config) graph. invoke(Command(resume = True), config = config)Key points about resuming:
- You must use the same thread ID when resuming that was used when the interrupt occurred
- The value passed to
Command(resume=...)becomes the return value of theinterruptcall - The node restarts from the beginning of the node where the
interruptwas called when resumed, so any code before theinterruptruns again - You can pass any JSON-serializable value as the resume value
Command(resume=...) is the only Command pattern intended as input to invoke()/stream(). The other Command parameters (update, goto, graph) are designed for returning from node functions. Do not pass Command(update=...) as input to continue multi-turn conversations — pass a plain input dict instead.
Common patterns
The key thing that interrupts unlock is the ability to pause execution and wait for external input. This is useful for a variety of use cases, including:
- Approval workflows: Pause before executing critical actions (API calls, database changes, financial transactions)
- Handling multiple interrupts: Pair interrupt IDs with resume values when resuming multiple interrupts in a single invocation
- Review and edit: Let humans review and modify LLM outputs or tool calls before continuing
- Interrupting tool calls: Pause before executing tool calls to review and edit the tool call before execution
- Validating human input: Pause before proceeding to the next step to validate human input
Stream with human-in-the-loop (HITL) interrupts
When building interactive agents with human-in-the-loop workflows, you can stream both message chunks and node updates simultaneously to provide real-time feedback while handling interrupts. Use multiple stream modes ("messages" and "updates") with subgraphs=True (if subgraphs are present) to:
- Stream AI responses in real-time as they’re generated
- Detect when the graph encounters an interrupt
- Handle user input and resume execution seamlessly
Copy
async for metadata, mode, chunk in graph.astream(async for metadata, mode, chunk in graph. astream( initial_input, initial_input, stream_mode=["messages", "updates"], stream_mode =["messages", "updates"], subgraphs=True, subgraphs = True, config=config config = config):): if mode == "messages": if mode == "messages": # Handle streaming message content # Handle streaming message content msg, _ = chunk msg, _ = chunk if isinstance(msg, AIMessageChunk) and msg.content: if isinstance(msg, AIMessageChunk) and msg. content: # Display content in real-time # Display content in real-time display_streaming_content(msg.content) display_streaming_content(msg. content) elif mode == "updates": elif mode == "updates": # Check for interrupts # Check for interrupts if "__interrupt__" in chunk: if "__interrupt__" in chunk: # Stop streaming display # Stop streaming display interrupt_info = chunk["__interrupt__"][0].value interrupt_info = chunk[" __interrupt__ "][0]. value # Handle user input # Handle user input user_response = get_user_input(interrupt_info) user_response = get_user_input(interrupt_info) # Resume graph with updated input # Resume graph with updated input initial_input = Command(resume=user_response) initial_input = Command(resume = user_response) break break else: else: # Track node transitions # Track node transitions current_node = list(chunk.keys())[0] current_node = list(chunk. keys())[0]stream_mode=["messages", "updates"]: Enables dual streaming of both message chunks and graph state updatessubgraphs=True: Required for interrupt detection in nested graphs"__interrupt__"detection: Signals when human input is neededCommand(resume=...): Resumes graph execution with user-provided data
Handling multiple interrupts
When parallel branches interrupt simultaneously (for example, fan-out to multiple nodes that each call interrupt()), you may need to resume multiple interrupts in a single invocation. When resuming multiple interrupts with a single invocation, map each interrupt ID to its resume value. This ensures each response is paired with the correct interrupt at runtime.
Copy
from typing import Annotated, TypedDict from typing import Annotated, TypedDict import operator import operator from langgraph.checkpoint.memory import InMemorySaver from langgraph. checkpoint. memory import InMemorySaverfrom langgraph.graph import START, END, StateGraph from langgraph. graph import START, END, StateGraphfrom langgraph.types import Command, interrupt from langgraph. types import Command, interrupt class State(TypedDict): class State(TypedDict): vals: Annotated[list[str], operator.add] vals: Annotated[list[str], operator. add] def node_a(state): def node_a(state): answer = interrupt("question_a") answer = interrupt("question_a") return {"vals": [f"a:{answer}"]} return {"vals": [f"a:{answer} "]} def node_b(state): def node_b(state): answer = interrupt("question_b") answer = interrupt("question_b") return {"vals": [f"b:{answer}"]} return {"vals": [f"b:{answer} "]} graph = (graph = ( StateGraph(State) StateGraph(State) .add_node("a", node_a) . add_node("a", node_a) .add_node("b", node_b) . add_node("b", node_b) .add_edge(START, "a") . add_edge(START, "a") .add_edge(START, "b") . add_edge(START, "b") .add_edge("a", END) . add_edge("a", END) .add_edge("b", END) . add_edge("b", END) .compile(checkpointer=InMemorySaver()) . compile(checkpointer = InMemorySaver()))) config = {"configurable": {"thread_id": "1"}} config = {"configurable": {"thread_id": "1"}} # Step 1: invoke — both parallel nodes hit interrupt() and pause# Step 1: invoke — both parallel nodes hit interrupt() and pauseinterrupted_result = graph.invoke({"vals": []}, config) interrupted_result = graph. invoke({"vals": []}, config)print(interrupted_result) print(interrupted_result) """ """{{ 'vals': [], 'vals': [], '__interrupt__': [ '__interrupt__': [ Interrupt(value='question_a', id='bd4f3183600f2c41dddafbf8f0f7be7b'), Interrupt(value='question_a', id='bd4f3183600f2c41dddafbf8f0f7be7b'), Interrupt(value='question_b', id='29963e3d3585f0cef025dd0f14323f55') Interrupt(value='question_b', id='29963e3d3585f0cef025dd0f14323f55') ] ]}} """ """ # Step 2: resume all pending interrupts at once# Step 2: resume all pending interrupts at onceresume_map = {resume_map = { i.id: f"answer for {i.value}" i. id: f "answer for {i. value} " for i in interrupted_result["__interrupt__"] for i in interrupted_result[" __interrupt__ "]}}result = graph.invoke(Command(resume=resume_map), config) result = graph. invoke(Command(resume = resume_map), config) print("Final state:", result) print("Final state:", result)#> Final state: {'vals': ['a:answer for question_a', 'b:answer for question_b']}#> Final state: {'vals': ['a:answer for question_a', 'b:answer for question_b']} Approve or reject
One of the most common uses of interrupts is to pause before a critical action and ask for approval. For example, you might want to ask a human to approve an API call, a database change, or any other important decision.
Copy
from typing import Literal from typing import Literalfrom langgraph.types import interrupt, Command from langgraph. types import interrupt, Command def approval_node(state: State) -> Command[Literal["proceed", "cancel"]]: def approval_node(state: State) -> Command[Literal[" proceed ", " cancel "]]: # Pause execution; payload shows up under result["__interrupt__"] # Pause execution; payload shows up under result["__interrupt__"] is_approved = interrupt({ is_approved = interrupt({ "question": "Do you want to proceed with this action?", "question": "Do you want to proceed with this action?", "details": state["action_details"] "details": state[" action_details "] }) }) # Route based on the response # Route based on the response if is_approved: if is_approved: return Command(goto="proceed") # Runs after the resume payload is provided return Command(goto = "proceed") # Runs after the resume payload is provided else: else: return Command(goto="cancel") return Command(goto = "cancel")When you resume the graph, pass true to approve or false to reject:
Copy
# To approve # To approvegraph.invoke(Command(resume=True), config=config) graph. invoke(Command(resume = True), config = config) # To reject # To rejectgraph.invoke(Command(resume=False), config=config) graph. invoke(Command(resume = False), config = config)Full example
Copy
from typing import Literal, Optional, TypedDict from typing import Literal, Optional, TypedDict from langgraph.checkpoint.memory import MemorySaver from langgraph. checkpoint. memory import MemorySaverfrom langgraph.graph import StateGraph, START, END from langgraph. graph import StateGraph, START, ENDfrom langgraph.types import Command, interrupt from langgraph. types import Command, interrupt class ApprovalState(TypedDict): class ApprovalState(TypedDict): action_details: str action_details: str status: Optional[Literal["pending", "approved", "rejected"]] status: Optional[Literal[" pending ", " approved ", " rejected "]] def approval_node(state: ApprovalState) -> Command[Literal["proceed", "cancel"]]: def approval_node(state: ApprovalState) -> Command[Literal[" proceed ", " cancel "]]: # Expose details so the caller can render them in a UI # Expose details so the caller can render them in a UI decision = interrupt({ decision = interrupt({ "question": "Approve this action?", "question": "Approve this action?", "details": state["action_details"], "details": state[" action_details "], }) }) # Route to the appropriate node after resume # Route to the appropriate node after resume return Command(goto="proceed" if decision else "cancel") return Command(goto = "proceed" if decision else "cancel") def proceed_node(state: ApprovalState): def proceed_node(state: ApprovalState): return {"status": "approved"} return {"status": "approved"} def cancel_node(state: ApprovalState): def cancel_node(state: ApprovalState): return {"status": "rejected"} return {"status": "rejected"} builder = StateGraph(ApprovalState) builder = StateGraph(ApprovalState)builder.add_node("approval", approval_node) builder. add_node("approval", approval_node)builder.add_node("proceed", proceed_node) builder. add_node("proceed", proceed_node)builder.add_node("cancel", cancel_node) builder. add_node("cancel", cancel_node)builder.add_edge(START, "approval") builder. add_edge(START, "approval")builder.add_edge("proceed", END) builder. add_edge("proceed", END)builder.add_edge("cancel", END) builder. add_edge("cancel", END) # Use a more durable checkpointer in production # Use a more durable checkpointer in productioncheckpointer = MemorySaver() checkpointer = MemorySaver()graph = builder.compile(checkpointer=checkpointer) graph = builder. compile(checkpointer = checkpointer) config = {"configurable": {"thread_id": "approval-123"}} config = {"configurable": {"thread_id": "approval-123"}}initial = graph.invoke(initial = graph. invoke( {"action_details": "Transfer $500", "status": "pending"}, {"action_details": "Transfer $500", "status": "pending"}, config=config, config = config,))print(initial["__interrupt__"]) # -> [Interrupt(value={'question': ..., 'details': ...})] print(initial[" __interrupt__ "]) # -> [Interrupt(value={'question': ..., 'details': ...})] # Resume with the decision; True routes to proceed, False to cancel# Resume with the decision; True routes to proceed, False to cancelresumed = graph.invoke(Command(resume=True), config=config) resumed = graph. invoke(Command(resume = True), config = config)print(resumed["status"]) # -> "approved" print(resumed[" status "]) # -> "approved" Review and edit state
Sometimes you want to let a human review and edit part of the graph state before continuing. This is useful for correcting LLMs, adding missing information, or making adjustments.
Copy
from langgraph.types import interrupt from langgraph. types import interrupt def review_node(state: State): def review_node(state: State): # Pause and show the current content for review (surfaces in result["__interrupt__"]) # Pause and show the current content for review (surfaces in result["__interrupt__"]) edited_content = interrupt({ edited_content = interrupt({ "instruction": "Review and edit this content", "instruction": "Review and edit this content", "content": state["generated_text"] "content": state[" generated_text "] }) }) # Update the state with the edited version # Update the state with the edited version return {"generated_text": edited_content} return {"generated_text": edited_content}When resuming, provide the edited content:
Copy
graph.invoke(graph. invoke( Command(resume="The edited and improved text"), # Value becomes the return from interrupt() Command(resume = "The edited and improved text"), # Value becomes the return from interrupt() config=config config = config))Full example
Copy
import sqlite3 import sqlite3 from typing import TypedDict from typing import TypedDict from langgraph.checkpoint.memory import MemorySaver from langgraph. checkpoint. memory import MemorySaverfrom langgraph.graph import StateGraph, START, END from langgraph. graph import StateGraph, START, ENDfrom langgraph.types import Command, interrupt from langgraph. types import Command, interrupt class ReviewState(TypedDict): class ReviewState(TypedDict): generated_text: str generated_text: str def review_node(state: ReviewState): def review_node(state: ReviewState): # Ask a reviewer to edit the generated content # Ask a reviewer to edit the generated content updated = interrupt({ updated = interrupt({ "instruction": "Review and edit this content", "instruction": "Review and edit this content", "content": state["generated_text"], "content": state[" generated_text "], }) }) return {"generated_text": updated} return {"generated_text": updated} builder = StateGraph(ReviewState) builder = StateGraph(ReviewState)builder.add_node("review", review_node) builder. add_node("review", review_node)builder.add_edge(START, "review") builder. add_edge(START, "review")builder.add_edge("review", END) builder. add_edge("review", END) checkpointer = MemorySaver() checkpointer = MemorySaver()graph = builder.compile(checkpointer=checkpointer) graph = builder. compile(checkpointer = checkpointer) config = {"configurable": {"thread_id": "review-42"}} config = {"configurable": {"thread_id": "review-42"}}initial = graph.invoke({"generated_text": "Initial draft"}, config=config) initial = graph. invoke({"generated_text": "Initial draft"}, config = config)print(initial["__interrupt__"]) # -> [Interrupt(value={'instruction': ..., 'content': ...})] print(initial[" __interrupt__ "]) # -> [Interrupt(value={'instruction': ..., 'content': ...})] # Resume with the edited text from the reviewer # Resume with the edited text from the reviewerfinal_state = graph.invoke(final_state = graph. invoke( Command(resume="Improved draft after review"), Command(resume = "Improved draft after review"), config=config, config = config,))print(final_state["generated_text"]) # -> "Improved draft after review" print(final_state[" generated_text "]) # -> "Improved draft after review" Interrupts in tools
You can also place interrupts directly inside tool functions. This makes the tool itself pause for approval whenever it’s called, and allows for human review and editing of the tool call before it is executed. First, define a tool that uses interrupt:
Copy
from langchain.tools import tool from langchain. tools import toolfrom langgraph.types import interrupt from langgraph. types import interrupt @tool @tooldef send_email(to: str, subject: str, body: str): def send_email(to: str, subject: str, body: str): """Send an email to a recipient.""" """Send an email to a recipient.""" # Pause before sending; payload surfaces in result["__interrupt__"] # Pause before sending; payload surfaces in result["__interrupt__"] response = interrupt({ response = interrupt({ "action": "send_email", "action": "send_email", "to": to, "to": to, "subject": subject, "subject": subject, "body": body, "body": body, "message": "Approve sending this email?" "message": "Approve sending this email?" }) }) if response.get("action") == "approve": if response. get("action") == "approve": # Resume value can override inputs before executing # Resume value can override inputs before executing final_to = response.get("to", to) final_to = response. get("to", to) final_subject = response.get("subject", subject) final_subject = response. get("subject", subject) final_body = response.get("body", body) final_body = response. get("body", body) return f"Email sent to {final_to} with subject '{final_subject}'" return f "Email sent to {final_to} with subject '{final_subject} '" return "Email cancelled by user" return "Email cancelled by user"This approach is useful when you want the approval logic to live with the tool itself, making it reusable across different parts of your graph. The LLM can call the tool naturally, and the interrupt will pause execution whenever the tool is invoked, allowing you to approve, edit, or cancel the action.
Full example
Copy
import sqlite3 import sqlite3 from typing import TypedDict from typing import TypedDict from langchain.tools import tool from langchain. tools import tool from langchain_anthropic import ChatAnthropic from langchain_anthropic import ChatAnthropicfrom langgraph.checkpoint.sqlite import SqliteSaver from langgraph. checkpoint. sqlite import SqliteSaverfrom langgraph.graph import StateGraph, START, END from langgraph. graph import StateGraph, START, ENDfrom langgraph.types import Command, interrupt from langgraph. types import Command, interrupt class AgentState(TypedDict): class AgentState(TypedDict): messages: list[dict] messages: list[dict] @tool @tooldef send_email(to: str, subject: str, body: str): def send_email(to: str, subject: str, body: str): """Send an email to a recipient.""" """Send an email to a recipient.""" # Pause before sending; payload surfaces in result["__interrupt__"] # Pause before sending; payload surfaces in result["__interrupt__"] response = interrupt({ response = interrupt({ "action": "send_email", "action": "send_email", "to": to, "to": to, "subject": subject, "subject": subject, "body": body, "body": body, "message": "Approve sending this email?", "message": "Approve sending this email?", }) }) if response.get("action") == "approve": if response. get("action") == "approve": final_to = response.get("to", to) final_to = response. get("to", to) final_subject = response.get("subject", subject) final_subject = response. get("subject", subject) final_body = response.get("body", body) final_body = response. get("body", body) # Actually send the email (your implementation here) # Actually send the email (your implementation here) print(f"[send_email] to={final_to} subject={final_subject} body={final_body}") print(f"[send_email] to={final_to} subject={final_subject} body={final_body} ") return f"Email sent to {final_to}" return f "Email sent to {final_to} " return "Email cancelled by user" return "Email cancelled by user" model = ChatAnthropic(model="claude-sonnet-4-6").bind_tools([send_email]) model = ChatAnthropic(model ="claude-sonnet-4-6"). bind_tools([send_email]) def agent_node(state: AgentState): def agent_node(state: AgentState): # LLM may decide to call the tool; interrupt pauses before sending # LLM may decide to call the tool; interrupt pauses before sending result = model.invoke(state["messages"]) result = model. invoke(state[" messages "]) return {"messages": state["messages"] + [result]} return {"messages": state[" messages "] + [result]} builder = StateGraph(AgentState) builder = StateGraph(AgentState)builder.add_node("agent", agent_node) builder. add_node("agent", agent_node)builder.add_edge(START, "agent") builder. add_edge(START, "agent")builder.add_edge("agent", END) builder. add_edge("agent", END) checkpointer = SqliteSaver(sqlite3.connect("tool-approval.db")) checkpointer = SqliteSaver(sqlite3. connect("tool-approval.db"))graph = builder.compile(checkpointer=checkpointer) graph = builder. compile(checkpointer = checkpointer) config = {"configurable": {"thread_id": "email-workflow"}} config = {"configurable": {"thread_id": "email-workflow"}}initial = graph.invoke(initial = graph. invoke( { { "messages": [ "messages": [ {"role": "user", "content": "Send an email to alice@example.com about the meeting"} {"role": "user", "content": "Send an email to alice@example.com about the meeting"} ] ] }, }, config=config, config = config,))print(initial["__interrupt__"]) # -> [Interrupt(value={'action': 'send_email', ...})] print(initial[" __interrupt__ "]) # -> [Interrupt(value={'action': 'send_email', ...})] # Resume with approval and optionally edited arguments # Resume with approval and optionally edited argumentsresumed = graph.invoke(resumed = graph. invoke( Command(resume={"action": "approve", "subject": "Updated subject"}), Command(resume ={"action": "approve", "subject": "Updated subject"}), config=config, config = config,))print(resumed["messages"][-1]) # -> Tool result returned by send_email print(resumed[" messages "][- 1]) # -> Tool result returned by send_email Validating human input
Sometimes you need to validate input from humans and ask again if it’s invalid. You can do this using multiple interrupt calls in a loop.
Copy
from langgraph.types import interrupt from langgraph. types import interrupt def get_age_node(state: State): def get_age_node(state: State): prompt = "What is your age?" prompt = "What is your age?" while True: while True: answer = interrupt(prompt) # payload surfaces in result["__interrupt__"] answer = interrupt(prompt) # payload surfaces in result["__interrupt__"] # Validate the input # Validate the input if isinstance(answer, int) and answer > 0: if isinstance(answer, int) and answer > 0: # Valid input - continue # Valid input - continue break break else: else: # Invalid input - ask again with a more specific prompt # Invalid input - ask again with a more specific prompt prompt = f"'{answer}' is not a valid age. Please enter a positive number." prompt = f "'{answer}' is not a valid age. Please enter a positive number." return {"age": answer} return {"age": answer}Each time you resume the graph with invalid input, it will ask again with a clearer message. Once valid input is provided, the node completes and the graph continues.
Full example
Copy
import sqlite3 import sqlite3 from typing import TypedDict from typing import TypedDict from langgraph.checkpoint.sqlite import SqliteSaver from langgraph. checkpoint. sqlite import SqliteSaverfrom langgraph.graph import StateGraph, START, END from langgraph. graph import StateGraph, START, ENDfrom langgraph.types import Command, interrupt from langgraph. types import Command, interrupt class FormState(TypedDict): class FormState(TypedDict): age: int | None age: int | None def get_age_node(state: FormState): def get_age_node(state: FormState): prompt = "What is your age?" prompt = "What is your age?" while True: while True: answer = interrupt(prompt) # payload surfaces in result["__interrupt__"] answer = interrupt(prompt) # payload surfaces in result["__interrupt__"] if isinstance(answer, int) and answer > 0: if isinstance(answer, int) and answer > 0: return {"age": answer} return {"age": answer} prompt = f"'{answer}' is not a valid age. Please enter a positive number." prompt = f "'{answer}' is not a valid age. Please enter a positive number." builder = StateGraph(FormState) builder = StateGraph(FormState)builder.add_node("collect_age", get_age_node) builder. add_node("collect_age", get_age_node)builder.add_edge(START, "collect_age") builder. add_edge(START, "collect_age")builder.add_edge("collect_age", END) builder. add_edge("collect_age", END) checkpointer = SqliteSaver(sqlite3.connect("forms.db")) checkpointer = SqliteSaver(sqlite3. connect("forms.db"))graph = builder.compile(checkpointer=checkpointer) graph = builder. compile(checkpointer = checkpointer) config = {"configurable": {"thread_id": "form-1"}} config = {"configurable": {"thread_id": "form-1"}}first = graph.invoke({"age": None}, config=config) first = graph. invoke({"age": None}, config = config)print(first["__interrupt__"]) # -> [Interrupt(value='What is your age?', ...)] print(first[" __interrupt__ "]) # -> [Interrupt(value='What is your age?', ...)] # Provide invalid data; the node re-prompts# Provide invalid data; the node re-promptsretry = graph.invoke(Command(resume="thirty"), config=config) retry = graph. invoke(Command(resume = "thirty"), config = config)print(retry["__interrupt__"]) # -> [Interrupt(value="'thirty' is not a valid age...", ...)] print(retry[" __interrupt__ "]) # -> [Interrupt(value="'thirty' is not a valid age...", ...)] # Provide valid data; loop exits and state updates# Provide valid data; loop exits and state updatesfinal = graph.invoke(Command(resume=30), config=config) final = graph. invoke(Command(resume = 30), config = config)print(final["age"]) # -> 30 print(final[" age "]) # -> 30 Rules of interrupts
When you call interrupt within a node, LangGraph suspends execution by raising an exception that signals the runtime to pause. This exception propagates up through the call stack and is caught by the runtime, which notifies the graph to save the current state and wait for external input. When execution resumes (after you provide the requested input), the runtime restarts the entire node from the beginning—it does not resume from the exact line where interrupt was called. This means any code that ran before the interrupt will execute again. Because of this, there’s a few important rules to follow when working with interrupts to ensure they behave as expected.
Do not wrap interrupt calls in try/except
The way that interrupt pauses execution at the point of the call is by throwing a special exception. If you wrap the interrupt call in a try/except block, you will catch this exception and the interrupt will not be passed back to the graph.
- ✅ Separate
interruptcalls from error-prone code - ✅ Use specific exception types in try/except blocks
Copy
def node_a(state: State): def node_a(state: State): # ✅ Good: interrupting first, then handling # ✅ Good: interrupting first, then handling # error conditions separately # error conditions separately interrupt("What's your name?") interrupt("What's your name?") try: try: fetch_data() # This can fail fetch_data() # This can fail except Exception as e: except Exception as e: print(e) print(e) return state return state- 🔴 Do not wrap
interruptcalls in bare try/except blocks
Copy
def node_a(state: State): def node_a(state: State): # ❌ Bad: wrapping interrupt in bare try/except # ❌ Bad: wrapping interrupt in bare try/except # will catch the interrupt exception # will catch the interrupt exception try: try: interrupt("What's your name?") interrupt("What's your name?") except Exception as e: except Exception as e: print(e) print(e) return state return state Do not reorder interrupt calls within a node
It’s common to use multiple interrupts in a single node, however this can lead to unexpected behavior if not handled carefully. When a node contains multiple interrupt calls, LangGraph keeps a list of resume values specific to the task executing the node. Whenever execution resumes, it starts at the beginning of the node. For each interrupt encountered, LangGraph checks if a matching value exists in the task’s resume list. Matching is strictly index-based, so the order of interrupt calls within the node is important.
- ✅ Keep
interruptcalls consistent across node executions
Copy
def node_a(state: State): def node_a(state: State): # ✅ Good: interrupt calls happen in the same order every time # ✅ Good: interrupt calls happen in the same order every time name = interrupt("What's your name?") name = interrupt("What's your name?") age = interrupt("What's your age?") age = interrupt("What's your age?") city = interrupt("What's your city?") city = interrupt("What's your city?") return { return { "name": name, "name": name, "age": age, "age": age, "city": city "city": city } }- 🔴 Do not conditionally skip
interruptcalls within a node - 🔴 Do not loop
interruptcalls using logic that isn’t deterministic across executions
Copy
def node_a(state: State): def node_a(state: State): # ❌ Bad: conditionally skipping interrupts changes the order # ❌ Bad: conditionally skipping interrupts changes the order name = interrupt("What's your name?") name = interrupt("What's your name?") # On first run, this might skip the interrupt # On first run, this might skip the interrupt # On resume, it might not skip it - causing index mismatch # On resume, it might not skip it - causing index mismatch if state.get("needs_age"): if state. get("needs_age"): age = interrupt("What's your age?") age = interrupt("What's your age?") city = interrupt("What's your city?") city = interrupt("What's your city?") return {"name": name, "city": city} return {"name": name, "city": city} Do not return complex values in interrupt calls
Depending on which checkpointer is used, complex values may not be serializable (e.g. you can’t serialize a function). To make your graphs adaptable to any deployment, it’s best practice to only use values that can be reasonably serialized.
- ✅ Pass simple, JSON-serializable types to
interrupt - ✅ Pass dictionaries/objects with simple values
Copy
def node_a(state: State): def node_a(state: State): # ✅ Good: passing simple types that are serializable # ✅ Good: passing simple types that are serializable name = interrupt("What's your name?") name = interrupt("What's your name?") count = interrupt(42) count = interrupt(42) approved = interrupt(True) approved = interrupt(True) return {"name": name, "count": count, "approved": approved} return {"name": name, "count": count, "approved": approved}- 🔴 Do not pass functions, class instances, or other complex objects to
interrupt
Copy
def validate_input(value): def validate_input(value): return len(value) > 0 return len(value) > 0 def node_a(state: State): def node_a(state: State): # ❌ Bad: passing a function to interrupt # ❌ Bad: passing a function to interrupt # The function cannot be serialized # The function cannot be serialized response = interrupt({ response = interrupt({ "question": "What's your name?", "question": "What's your name?", "validator": validate_input # This will fail "validator": validate_input # This will fail }) }) return {"name": response} return {"name": response} Side effects called before interrupt must be idempotent
Because interrupts work by re-running the nodes they were called from, side effects called before interrupt should (ideally) be idempotent. For context, idempotency means that the same operation can be applied multiple times without changing the result beyond the initial execution. As an example, you might have an API call to update a record inside of a node. If interrupt is called after that call is made, it will be re-run multiple times when the node is resumed, potentially overwriting the initial update or creating duplicate records.
- ✅ Use idempotent operations before
interrupt - ✅ Place side effects after
interruptcalls - ✅ Separate side effects into separate nodes when possible
Copy
def node_a(state: State): def node_a(state: State): # ✅ Good: using upsert operation which is idempotent # ✅ Good: using upsert operation which is idempotent # Running this multiple times will have the same result # Running this multiple times will have the same result db.upsert_user( db. upsert_user( user_id=state["user_id"], user_id = state[" user_id "], status="pending_approval" status = "pending_approval" ) ) approved = interrupt("Approve this change?") approved = interrupt("Approve this change?") return {"approved": approved} return {"approved": approved}- 🔴 Do not perform non-idempotent operations before
interrupt - 🔴 Do not create new records without checking if they exist
Copy
def node_a(state: State): def node_a(state: State): # ❌ Bad: creating a new record before interrupt # ❌ Bad: creating a new record before interrupt # This will create duplicate records on each resume # This will create duplicate records on each resume audit_id = db.create_audit_log({ audit_id = db. create_audit_log({ "user_id": state["user_id"], "user_id": state[" user_id "], "action": "pending_approval", "action": "pending_approval", "timestamp": datetime.now() "timestamp": datetime. now() }) }) approved = interrupt("Approve this change?") approved = interrupt("Approve this change?") return {"approved": approved, "audit_id": audit_id} return {"approved": approved, "audit_id": audit_id} Using with subgraphs called as functions
When invoking a subgraph within a node, the parent graph will resume execution from the beginning of the node where the subgraph was invoked and the interrupt was triggered. Similarly, the subgraph will also resume from the beginning of the node where interrupt was called.
Copy
def node_in_parent_graph(state: State): def node_in_parent_graph(state: State): some_code() # <-- This will re-execute when resumed some_code() # <-- This will re-execute when resumed # Invoke a subgraph as a function. # Invoke a subgraph as a function. # The subgraph contains an `interrupt` call. # The subgraph contains an `interrupt` call. subgraph_result = subgraph.invoke(some_input) subgraph_result = subgraph. invoke(some_input) # ... # ... def node_in_subgraph(state: State): def node_in_subgraph(state: State): some_other_code() # <-- This will also re-execute when resumed some_other_code() # <-- This will also re-execute when resumed result = interrupt("What's your name?") result = interrupt("What's your name?") # ... # ... Debugging with interrupts
To debug and test a graph, you can use static interrupts as breakpoints to step through the graph execution one node at a time. Static interrupts are triggered at defined points either before or after a node executes. You can set these by specifying interrupt_before and interrupt_after when compiling the graph.
Static interrupts are not recommended for human-in-the-loop workflows. Use the interrupt function instead.
- At compile time
- At run time
Copy
graph = builder.compile(graph = builder. compile( interrupt_before=["node_a"], interrupt_before =["node_a"], interrupt_after=["node_b", "node_c"], interrupt_after =["node_b", "node_c"], checkpointer=checkpointer, checkpointer = checkpointer,)) # Pass a thread ID to the graph # Pass a thread ID to the graphconfig = {config = { "configurable": { "configurable": { "thread_id": "some_thread" "thread_id": "some_thread" } }}} # Run the graph until the breakpoint # Run the graph until the breakpointgraph.invoke(inputs, config=config) graph. invoke(inputs, config = config) # Resume the graph # Resume the graphgraph.invoke(None, config=config) graph. invoke(None, config = config)- The breakpoints are set during
compiletime. interrupt_beforespecifies the nodes where execution should pause before the node is executed.interrupt_afterspecifies the nodes where execution should pause after the node is executed.- A checkpointer is required to enable breakpoints.
- The graph is run until the first breakpoint is hit.
- The graph is resumed by passing in
Nonefor the input. This will run the graph until the next breakpoint is hit.
Copy
config = {config = { "configurable": { "configurable": { "thread_id": "some_thread" "thread_id": "some_thread" } }}} # Run the graph until the breakpoint # Run the graph until the breakpointgraph.invoke(graph. invoke( inputs, inputs, interrupt_before=["node_a"], interrupt_before =["node_a"], interrupt_after=["node_b", "node_c"], interrupt_after =["node_b", "node_c"], config=config, config = config,)) # Resume the graph # Resume the graphgraph.invoke(None, config=config) graph. invoke(None, config = config)graph.invokeis called with theinterrupt_beforeandinterrupt_afterparameters. This is a run-time configuration and can be changed for every invocation.interrupt_beforespecifies the nodes where execution should pause before the node is executed.interrupt_afterspecifies the nodes where execution should pause after the node is executed.- The graph is run until the first breakpoint is hit.
- The graph is resumed by passing in
Nonefor the input. This will run the graph until the next breakpoint is hit.
To debug your interrupts, use LangSmith.
Using LangSmith Studio
You can use LangSmith Studio to set static interrupts in your graph in the UI before running the graph. You can also use the UI to inspect the graph state at any point in the execution.
Edit this page on GitHub or file an issue .
Connect these docs to Claude, VSCode, and more via MCP for real-time answers.
Was this page helpful?