Threads are the core persistence unit in Aegra. Each thread represents a conversation with its own state, message history, and checkpoints. Runs execute within threads, and state is automatically persisted after each node execution.
Creating threads
import asyncio
from langgraph_sdk import get_client
async def main():
client = get_client(url="http://localhost:2026")
# Create a thread
thread = await client.threads.create()
# Create with metadata
thread = await client.threads.create(
metadata={"user_name": "Alice", "session": "onboarding"},
)
# Create with a specific ID (idempotent)
thread = await client.threads.create(
thread_id="my-thread-123",
if_exists="do_nothing", # Don't error if it already exists
)
asyncio.run(main())
Thread status
Threads have a status that reflects their current state:
| Status | Meaning |
|---|
idle | No active run, ready for new input |
busy | A run is currently executing |
interrupted | A run paused for human input |
error | The last run ended with an error |
thread = await client.threads.get(thread_id)
print(thread["status"]) # "idle", "busy", "interrupted", "error"
The snippets below assume you are inside an async def function with an initialized client — see the example above.
Getting thread state
The state contains the current values of all state fields, information about what nodes will execute next, and any pending interrupts.
state = await client.threads.get_state(thread_id)
# Current state values (your State schema fields)
print(state["values"])
# Next nodes to execute (empty if completed)
print(state["next"])
# Pending tasks
print(state["tasks"])
# Active interrupts (if thread is interrupted)
print(state["interrupts"])
# Checkpoint info
print(state["checkpoint"]["checkpoint_id"])
Updating thread state
You can modify thread state directly, for example to inject data or correct values:
await client.threads.update_state(
thread_id,
values={"messages": [{"type": "human", "content": "Injected message"}]},
as_node="agent", # Apply update as if it came from this node
)
Checkpoint history
Every state change creates a checkpoint. You can browse the full history:
# Get recent checkpoints
history = await client.threads.get_history(thread_id)
for entry in history:
print(f"Checkpoint: {entry['checkpoint']['checkpoint_id']}")
print(f" Values: {entry['values']}")
print(f" Next: {entry['next']}")
Get state at a specific checkpoint
# Go back in time to a specific checkpoint
state = await client.threads.get_state(
thread_id,
checkpoint={"checkpoint_id": "your-checkpoint-id"},
)
This is useful for debugging, replaying, or branching conversations from a previous point.
Searching threads
Find threads by status or metadata:
# Search by metadata
threads = await client.threads.search(
metadata={"user_name": "Alice"},
)
# Search by status
threads = await client.threads.search(
status="interrupted",
)
# With pagination
threads = await client.threads.search(
limit=20,
offset=0,
)
Listing threads
threads = await client.threads.search()
for thread in threads:
print(f"{thread['thread_id']}: {thread['status']}")
Deleting threads
Deleting a thread cancels any active runs and removes all state:
await client.threads.delete(thread_id)
Threads automatically track metadata about their usage:
owner: User identity at creation time (mirrors user_id on the Thread object, which is used for access control)
assistant_id: Last assistant used
graph_id: Last graph executed
You can add your own metadata at creation or update time:
# Update metadata
await client.threads.update(
thread_id,
metadata={"priority": "high"},
)
User isolation
When authentication is enabled, threads are automatically scoped to the authenticated user. Users can only see and interact with their own threads.