Aegra streams agent execution to clients using Server-Sent Events (SSE). This gives you real-time token-by-token output, tool call updates, and state changes as they happen.
Quick example
import asyncio
from langgraph_sdk import get_client
async def main():
client = get_client(url="http://localhost:8000")
thread = await client.threads.create()
async for chunk in client.runs.stream(
thread_id=thread["thread_id"],
assistant_id="agent",
input={"messages": [{"type": "human", "content": "Hello!"}]},
stream_mode=["messages-tuple"],
):
if hasattr(chunk, "data") and chunk.data:
print(chunk.data)
asyncio.run(main())
Stream modes
Control what data you receive by setting stream_mode on the run. You can pass a single mode as a string or multiple modes as a list.
| Mode | What it streams |
|---|
values | Full state snapshot after each node execution |
updates | Only the state changes (delta) produced by each node |
messages | LLM tokens and tool calls as (message, metadata) tuples with accumulation into messages/partial and messages/complete events |
messages-tuple | Raw message tuples without accumulation (JavaScript graph compatibility) |
custom | User-defined data emitted from inside nodes via get_stream_writer() |
events | Low-level LangGraph astream_events for fine-grained tracing |
debug | Checkpoint and task result events (auto-enabled on all streams) |
debug and updates are used internally on every stream for checkpoint tracking and interrupt detection, but their events are only sent to the client if you explicitly request them in stream_mode. When updates is not requested, only interrupt events (human-in-the-loop) are forwarded — remapped as values events for compatibility.
Custom streaming
Send custom data from inside graph nodes using get_stream_writer():
from langgraph.config import get_stream_writer
def my_node(state):
writer = get_stream_writer()
writer({"progress": "Fetching data..."})
# ... do work ...
writer({"progress": "Processing results..."})
return {"result": "done"}
Receive custom events by including custom in the stream modes:
async for chunk in client.runs.stream(
thread_id=thread_id,
assistant_id="agent",
input={"messages": [{"type": "human", "content": "Analyze this"}]},
stream_mode=["custom", "values"],
):
print(f"Event: {chunk.event}, Data: {chunk.data}")
Using multiple modes
async for chunk in client.runs.stream(
thread_id=thread_id,
assistant_id="agent",
input={"messages": [{"type": "human", "content": "Search for AI news"}]},
stream_mode=["values", "messages-tuple"],
):
print(f"Event: {chunk.event}, Data: {chunk.data}")
When multiple modes are active, the event field tells you which mode each chunk comes from.
Event types
During streaming, you’ll receive these event types:
| Event | Description |
|---|
metadata | Run metadata (run_id, attempt number) — sent first |
values | Full state snapshot (when using values mode) |
updates | State delta from a single node (when using updates mode) |
messages/partial | Partial message chunk (streaming token) |
messages/complete | Complete message after all tokens received |
messages/metadata | Message metadata (run_id) |
custom | User-defined data from get_stream_writer() |
events | LangGraph internal events (when using events mode) |
debug | Debug checkpoint and task result events |
error | Error during execution |
end | Stream complete |
Streaming endpoints
Create and stream
The most common pattern — create a run and stream its output in one call:
async for chunk in client.runs.stream(
thread_id=thread_id,
assistant_id="agent",
input={"messages": [{"type": "human", "content": "Hello"}]},
):
print(chunk)
This calls POST /threads/{thread_id}/runs/stream under the hood.
Stream an existing run
If you created a background run, you can stream it later:
GET /threads/{thread_id}/runs/{run_id}/stream
This supports reconnection via the Last-Event-ID header. If the connection drops, the client can reconnect and receive events from where it left off.
Wait for completion
If you don’t need streaming but want to wait for the result:
result = await client.runs.wait(
thread_id=thread_id,
assistant_id="agent",
input={"messages": [{"type": "human", "content": "Hello"}]},
)
print(result)
This calls POST /threads/{thread_id}/runs/wait and returns the final output.
Subgraph streaming
If your graph uses subgraphs, you can stream events from them too:
async for chunk in client.runs.stream(
thread_id=thread_id,
assistant_id="agent",
input={"messages": [{"type": "human", "content": "Hello"}]},
stream_subgraphs=True,
):
print(chunk)
Disconnection behavior
By default, when a client disconnects during streaming, the run is cancelled. You can change this:
async for chunk in client.runs.stream(
thread_id=thread_id,
assistant_id="agent",
input={"messages": [{"type": "human", "content": "Hello"}]},
on_disconnect="continue", # keep running even if client disconnects
):
print(chunk)
| Value | Behavior |
|---|
"cancel" (default) | Cancel the run when client disconnects |
"continue" | Run continues in the background; reconnect later to get results |
Background runs
For long-running tasks, you can create a run in the background and check on it later:
# Create run (returns immediately)
run = await client.runs.create(
thread_id=thread_id,
assistant_id="agent",
input={"messages": [{"type": "human", "content": "Analyze this dataset"}]},
)
# Check status
run = await client.runs.get(thread_id=thread_id, run_id=run["run_id"])
print(run["status"]) # "pending", "running", "success", "error"
# Wait for completion and get output
output = await client.runs.join(thread_id=thread_id, run_id=run["run_id"])
print(output)
Cancelling runs
Cancel or interrupt a running execution:
# Hard cancel
await client.runs.cancel(thread_id=thread_id, run_id=run["run_id"])
# Cooperative interrupt (if graph supports it)
await client.runs.cancel(
thread_id=thread_id,
run_id=run["run_id"],
action="interrupt",
)
SSE reconnection
Aegra stores streaming events in the database for replay. If your connection drops:
- Track the last event ID you received
- Reconnect to
GET /threads/{thread_id}/runs/{run_id}/stream with Last-Event-ID header
- You’ll receive all events from where you left off
Events are retained for 1 hour after the run completes.