Skip to Content
Pregel

Last Updated: 3/7/2026


Docs by LangChain home page

Deep AgentsLangChainLangGraphIntegrationsLearnReferenceContribute

Get started
Capabilities
Production
LangGraph APIs

LangGraph APIs

LangGraph runtime

Pregel implements LangGraph’s runtime, managing the execution of LangGraph applications. Compiling a StateGraph  or creating an @entrypoint produces a Pregel instance that can be invoked with input. This guide explains the runtime at a high level and provides instructions for directly implementing applications with Pregel.

Note: The Pregel runtime is named after Google’s Pregel algorithm , which describes an efficient method for large-scale parallel computation using graphs.

Overview

In LangGraph, Pregel combines actors and channels into a single application. Actors read data from channels and write data to channels. Pregel organizes the execution of the application into multiple steps, following the Pregel Algorithm/Bulk Synchronous Parallel model. Each step consists of three phases:

  • Plan: Determine which actors to execute in this step. For example, in the first step, select the actors that subscribe to the special input channels; in subsequent steps, select the actors that subscribe to channels updated in the previous step.
  • Execution: Execute all selected actors in parallel, until all complete, or one fails, or a timeout is reached. During this phase, channel updates are invisible to actors until the next step.
  • Update: Update the channels with the values written by the actors in this step.

Repeat until no actors are selected for execution, or a maximum number of steps is reached.

Actors

An actor is a PregelNode. It subscribes to channels, reads data from them, and writes data to them. It can be thought of as an actor in the Pregel algorithm. PregelNodes implement LangChain’s Runnable interface.

Channels

Channels are used to communicate between actors (PregelNodes). Each channel has a value type, an update type, and an update function – which takes a sequence of updates and modifies the stored value. Channels can be used to send data from one chain to another, or to send data from a chain to itself in a future step. LangGraph provides a number of built-in channels:

  • LastValue: The default channel, stores the last value sent to the channel, useful for input and output values, or for sending data from one step to the next.
  • Topic: A configurable PubSub Topic, useful for sending multiple values between actors, or for accumulating output. Can be configured to deduplicate values or to accumulate values over the course of multiple steps.
  • BinaryOperatorAggregate: stores a persistent value, updated by applying a binary operator to the current value and each update sent to the channel, useful for computing aggregates over multiple steps; e.g.,total = BinaryOperatorAggregate(int, operator.add)

Examples

While most users will interact with Pregel through the StateGraph  API or the @entrypoint decorator, it is possible to interact with Pregel directly. Below are a few different examples to give you a sense of the Pregel API.

  • Single node
  • Multiple nodes
  • Topic
  • BinaryOperatorAggregate
  • Cycle

Copy

from langgraph.channels import EphemeralValue from langgraph.channels import EphemeralValuefrom langgraph.pregel import Pregel, NodeBuilder from langgraph.pregel import Pregel, NodeBuilder node1 = (node1 = ( NodeBuilder().subscribe_only("a") NodeBuilder().subscribe_only("a") .do(lambda x: x + x) .do(lambda x: x + x) .write_to("b") .write_to("b"))) app = Pregel(app = Pregel( nodes={"node1": node1}, nodes ={"node1": node1}, channels={ channels ={ "a": EphemeralValue(str), "a": EphemeralValue(str), "b": EphemeralValue(str), "b": EphemeralValue(str), }, }, input_channels=["a"], input_channels =["a"], output_channels=["b"], output_channels =["b"],)) app.invoke({"a": "foo"})app.invoke({"a": "foo"})

Copy

{'b': 'foofoo'}{'b': 'foofoo'}

Copy

from langgraph.channels import LastValue, EphemeralValue from langgraph.channels import LastValue, EphemeralValuefrom langgraph.pregel import Pregel, NodeBuilder from langgraph.pregel import Pregel, NodeBuilder node1 = (node1 = ( NodeBuilder().subscribe_only("a") NodeBuilder().subscribe_only("a") .do(lambda x: x + x) .do(lambda x: x + x) .write_to("b") .write_to("b"))) node2 = (node2 = ( NodeBuilder().subscribe_only("b") NodeBuilder().subscribe_only("b") .do(lambda x: x + x) .do(lambda x: x + x) .write_to("c") .write_to("c"))) app = Pregel(app = Pregel( nodes={"node1": node1, "node2": node2}, nodes ={"node1": node1, "node2": node2}, channels={ channels ={ "a": EphemeralValue(str), "a": EphemeralValue(str), "b": LastValue(str), "b": LastValue(str), "c": EphemeralValue(str), "c": EphemeralValue(str), }, }, input_channels=["a"], input_channels =["a"], output_channels=["b", "c"], output_channels =["b", "c"],)) app.invoke({"a": "foo"})app.invoke({"a": "foo"})

Copy

{'b': 'foofoo', 'c': 'foofoofoofoo'}{'b': 'foofoo', 'c': 'foofoofoofoo'}

Copy

from langgraph.channels import EphemeralValue, Topic from langgraph.channels import EphemeralValue, Topicfrom langgraph.pregel import Pregel, NodeBuilder from langgraph.pregel import Pregel, NodeBuilder node1 = (node1 = ( NodeBuilder().subscribe_only("a") NodeBuilder().subscribe_only("a") .do(lambda x: x + x) .do(lambda x: x + x) .write_to("b", "c") .write_to("b", "c"))) node2 = (node2 = ( NodeBuilder().subscribe_to("b") NodeBuilder().subscribe_to("b") .do(lambda x: x["b"] + x["b"]) .do(lambda x: x["b"] + x["b"]) .write_to("c") .write_to("c"))) app = Pregel(app = Pregel( nodes={"node1": node1, "node2": node2}, nodes ={"node1": node1, "node2": node2}, channels={ channels ={ "a": EphemeralValue(str), "a": EphemeralValue(str), "b": EphemeralValue(str), "b": EphemeralValue(str), "c": Topic(str, accumulate=True), "c": Topic(str, accumulate = True), }, }, input_channels=["a"], input_channels =["a"], output_channels=["c"], output_channels =["c"],)) app.invoke({"a": "foo"})app.invoke({"a": "foo"})

Copy

{'c': ['foofoo', 'foofoofoofoo']}{'c': ['foofoo', 'foofoofoofoo']}

This example demonstrates how to use the BinaryOperatorAggregate channel to implement a reducer.

Copy

from langgraph.channels import EphemeralValue, BinaryOperatorAggregate from langgraph.channels import EphemeralValue, BinaryOperatorAggregatefrom langgraph.pregel import Pregel, NodeBuilder from langgraph.pregel import Pregel, NodeBuilder node1 = (node1 = ( NodeBuilder().subscribe_only("a") NodeBuilder().subscribe_only("a") .do(lambda x: x + x) .do(lambda x: x + x) .write_to("b", "c") .write_to("b", "c"))) node2 = (node2 = ( NodeBuilder().subscribe_only("b") NodeBuilder().subscribe_only("b") .do(lambda x: x + x) .do(lambda x: x + x) .write_to("c") .write_to("c"))) def reducer(current, update): def reducer(current, update): if current: if current: return current + " | " + update return current + " | " + update else: else: return update return update app = Pregel(app = Pregel( nodes={"node1": node1, "node2": node2}, nodes ={"node1": node1, "node2": node2}, channels={ channels ={ "a": EphemeralValue(str), "a": EphemeralValue(str), "b": EphemeralValue(str), "b": EphemeralValue(str), "c": BinaryOperatorAggregate(str, operator=reducer), "c": BinaryOperatorAggregate(str, operator =reducer), }, }, input_channels=["a"], input_channels =["a"], output_channels=["c"], output_channels =["c"],)) app.invoke({"a": "foo"})app.invoke({"a": "foo"})

This example demonstrates how to introduce a cycle in the graph, by having a chain write to a channel it subscribes to. Execution will continue until a None value is written to the channel.

Copy

from langgraph.channels import EphemeralValue from langgraph.channels import EphemeralValuefrom langgraph.pregel import Pregel, NodeBuilder, ChannelWriteEntry from langgraph.pregel import Pregel, NodeBuilder, ChannelWriteEntry example_node = (example_node = ( NodeBuilder().subscribe_only("value") NodeBuilder().subscribe_only("value") .do(lambda x: x + x if len(x) < 10 else None) .do(lambda x: x + x if len(x) < 10 else None) .write_to(ChannelWriteEntry("value", skip_none=True)) .write_to(ChannelWriteEntry("value", skip_none = True)))) app = Pregel(app = Pregel( nodes={"example_node": example_node}, nodes ={"example_node": example_node}, channels={ channels ={ "value": EphemeralValue(str), "value": EphemeralValue(str), }, }, input_channels=["value"], input_channels =["value"], output_channels=["value"], output_channels =["value"],)) app.invoke({"value": "a"})app.invoke({"value": "a"})

Copy

{'value': 'aaaaaaaaaaaaaaaa'}{'value': 'aaaaaaaaaaaaaaaa'}

High-level API

LangGraph provides two high-level APIs for creating a Pregel application: the StateGraph (Graph API) and the Functional API.

  • StateGraph (Graph API)
  • Functional API

The StateGraph (Graph API)  is a higher-level abstraction that simplifies the creation of Pregel applications. It allows you to define a graph of nodes and edges. When you compile the graph, the StateGraph API automatically creates the Pregel application for you.

Copy

from typing import TypedDict from typing import TypedDict from langgraph.constants import START from langgraph.constants import STARTfrom langgraph.graph import StateGraph from langgraph.graph import StateGraph class Essay(TypedDict): class Essay(TypedDict): topic: str topic: str content: str | None content: str | None score: float | None score: float | None def write_essay(essay: Essay): def write_essay(essay: Essay): return { return { "content": f"Essay about {essay['topic']}", "content": f "Essay about {essay['topic']} ", } } def score_essay(essay: Essay): def score_essay(essay: Essay): return { return { "score": 10 "score": 10 } } builder = StateGraph(Essay) builder = StateGraph(Essay)builder.add_node(write_essay)builder.add_node(write_essay)builder.add_node(score_essay)builder.add_node(score_essay)builder.add_edge(START, "write_essay")builder.add_edge(START, "write_essay")builder.add_edge("write_essay", "score_essay")builder.add_edge("write_essay", "score_essay") # Compile the graph.# Compile the graph.# This will return a Pregel instance.# This will return a Pregel instance.graph = builder.compile() graph = builder.compile()

The compiled Pregel instance will be associated with a list of nodes and channels. You can inspect the nodes and channels by printing them.

Copy

print(graph.nodes) print(graph.nodes)

You will see something like this:

Copy

{'__start__': ,{'__start__': , 'write_essay': , 'write_essay': , 'score_essay': } 'score_essay': }

Copy

print(graph.channels) print(graph.channels)

You should see something like this

Copy

{'topic': ,{'topic': , 'content': , 'content': , 'score': , 'score': , '__start__': , '__start__': , 'write_essay': , 'write_essay': , 'score_essay': , 'score_essay': , 'branch:__start__:__self__:write_essay': , 'branch:__start__:__self__:write_essay': , 'branch:__start__:__self__:score_essay': , 'branch:__start__:__self__:score_essay': , 'branch:write_essay:__self__:write_essay': , 'branch:write_essay:__self__:write_essay': , 'branch:write_essay:__self__:score_essay': , 'branch:write_essay:__self__:score_essay': , 'branch:score_essay:__self__:write_essay': , 'branch:score_essay:__self__:write_essay': , 'branch:score_essay:__self__:score_essay': , 'branch:score_essay:__self__:score_essay': , 'start:write_essay': } 'start:write_essay': }

In the Functional API, you can use an @entrypoint to create a Pregel application. The entrypoint decorator allows you to define a function that takes input and returns output.

Copy

from typing import TypedDict from typing import TypedDict from langgraph.checkpoint.memory import InMemorySaver from langgraph.checkpoint.memory import InMemorySaverfrom langgraph.func import entrypoint from langgraph.func import entrypoint class Essay(TypedDict): class Essay(TypedDict): topic: str topic: str content: str | None content: str | None score: float | None score: float | None checkpointer = InMemorySaver() checkpointer = InMemorySaver() @entrypoint(checkpointer=checkpointer) @entrypoint(checkpointer =checkpointer)def write_essay(essay: Essay): def write_essay(essay: Essay): return { return { "content": f"Essay about {essay['topic']}", "content": f "Essay about {essay['topic']} ", } } print("Nodes: ") print("Nodes: ")print(write_essay.nodes) print(write_essay.nodes)print("Channels: ") print("Channels: ")print(write_essay.channels) print(write_essay.channels)

Copy

Nodes:Nodes:{'write_essay': }{'write_essay': }Channels:Channels:{'__start__': , '__end__': , '__previous__': }{'__start__': , '__end__': , '__previous__': }

Edit this page on GitHub  or file an issue .

Connect these docs to Claude, VSCode, and more via MCP for real-time answers.

Was this page helpful?

Use the functional API