Aegra uses a worker-based execution model for production deployments. Runs are dispatched through a Redis job queue and executed by concurrent asyncio workers across any number of instances. Lease-based crash recovery ensures no run is lost, even if an instance dies mid-execution.
In dev mode (aegra dev), none of this is needed — runs execute as simple in-process asyncio tasks.
Run lifecycle overview
How a single run flows through the system — from client request to completion:
View fullscreen ↗
The happy path of a single run, step by step:
- Client sends a run request to any instance via the load balancer.
- FastAPI validates, persists the run to PostgreSQL (with serialized execution params), and pushes the run ID onto the Redis job queue.
- Worker picks up the job via
BLPOP (blocking pop — instant delivery, no polling).
- Worker acquires an exclusive lease in PostgreSQL, executes the graph, heartbeats every 10s, and finalizes the run (status, output, release lease).
- Worker publishes each event to Redis Pub/Sub for real-time streaming.
- SSE Broker subscribes to Redis Pub/Sub and relays events.
- Client receives events over an SSE connection.
Behind the scenes, the Lease Reaper scans PostgreSQL every 15 seconds for runs with expired leases (crashed workers) and re-enqueues them. Workers also send OpenTelemetry traces to Langfuse, Phoenix, or any OTLP collector.
Horizontal scaling
Every instance is stateless — add more instances behind a load balancer to increase capacity. All instances share the same Redis and PostgreSQL backends.
View fullscreen ↗
Each instance runs multiple worker loops (default 3), and each worker loop handles up to N_JOBS_PER_WORKER concurrent runs (default 10) via an asyncio semaphore. Total capacity per instance is WORKER_COUNT x N_JOBS_PER_WORKER — 30 concurrent runs by default. A job enqueued by Instance A can be picked up by Instance B — any worker can execute any run.
Job lifecycle
The detailed sequence — every Redis command, database query, and semaphore operation involved in executing a single run:
View fullscreen ↗
Key details:
- Execution params are stored in PostgreSQL alongside the run, so any worker can reconstruct the full job context (user identity, config, trace metadata, interrupt settings, etc.).
- Lease acquisition is atomic —
UPDATE ... WHERE claimed_by IS NULL ensures only one worker wins the race.
- Heartbeats extend the lease every 10 seconds. If a worker crashes, the lease expires and the run becomes eligible for recovery.
- OpenTelemetry trace context propagates across the Redis queue boundary, so traces span the full request lifecycle.
Streaming run with crash recovery
The full picture — a streaming run shows how the client SSE connection, worker execution, and reaper recovery all interact:
View fullscreen ↗
Crash recovery
Every instance runs a LeaseReaper background task that scans for runs with expired leases. When a worker crashes (OOM, kill signal, network partition), its heartbeat stops and the lease expires. The reaper resets the run to pending and re-enqueues it. The new worker resumes from the last checkpoint.
View fullscreen ↗
The reaper runs on every instance (default every 15 seconds). This is safe — the atomic lease acquisition prevents duplicate execution.
Cross-instance cancellation
Cancel requests can arrive at any instance, but the run may be executing on a different one. Aegra uses Redis pub/sub to propagate cancellation across instances.
View fullscreen ↗
Dev vs production mode
| Dev (aegra dev) | Production (aegra up / aegra serve) |
|---|
| Executor | LocalExecutor — asyncio.create_task() | WorkerExecutor — BLPOP + semaphore + lease |
| Redis required | No | Yes |
| Crash recovery | No (single process) | Yes (lease reaper) |
| Cross-instance cancel | N/A | Yes (Redis pub/sub) |
| SSE broker | In-memory queue + list | Redis pub/sub + replay buffer |
| Scaling | Single instance | Horizontal (shared Redis + PostgreSQL) |
aegra dev is designed for fast iteration — no Redis, no workers, no leases. Just start coding and your runs execute immediately in-process.
Redis data layout
| Key | Purpose | Written by | Read by |
|---|
aegra:jobs | Job queue (BLPOP) | API (RPUSH) | Worker (BLPOP) |
aegra:run:{run_id} | SSE live events | Worker (PUBLISH) | API (SUBSCRIBE) |
aegra:run:cache:{run_id} | SSE replay buffer | Worker (RPUSH) | API (LRANGE) |
aegra:run:counter:{run_id} | Event sequence counter | Worker (INCR) | API (GET) |
aegra:run:cancel | Cancel commands | API (PUBLISH) | Worker (SUBSCRIBE) |
aegra:run:done:{run_id} | Completion signal (TTL 1h) | Worker (SET) | API (EXISTS) |
Configuration
All worker settings are configured via environment variables. See the environment variables reference for the full list.
| Variable | Default | Description |
|---|
REDIS_BROKER_ENABLED | false | Enable Redis workers and broker |
REDIS_URL | redis://localhost:6379/0 | Redis connection URL |
WORKER_COUNT | 3 | Worker loops per instance |
N_JOBS_PER_WORKER | 10 | Concurrent runs per worker loop |
BG_JOB_TIMEOUT_SECS | 3600 | Max execution time per run (1 hour) |
BG_JOB_MAX_RETRIES | 3 | Max retry attempts before a crashed run is permanently failed |
STUCK_PENDING_THRESHOLD_SECONDS | 120 | How long a pending run can sit before the reaper re-enqueues it |
LEASE_DURATION_SECONDS | 30 | Lease TTL before reaper reclaims |
HEARTBEAT_INTERVAL_SECONDS | 10 | Lease extension frequency |
REAPER_INTERVAL_SECONDS | 15 | Expired lease scan frequency |
POSTGRES_POLL_INTERVAL_SECONDS | 5 | Fallback poll interval when Redis is unavailable |
WORKER_DRAIN_TIMEOUT | 30.0 | Graceful shutdown wait time (seconds) |
When Redis is unavailable, workers fall back to polling PostgreSQL for pending runs at POSTGRES_POLL_INTERVAL_SECONDS intervals. This keeps the system functional during Redis outages, though with higher latency.
File map
| File | Purpose |
|---|
services/executor.py | Factory — selects LocalExecutor or WorkerExecutor |
services/base_executor.py | Abstract interface: submit, wait, start, stop |
services/local_executor.py | Dev mode: asyncio.create_task |
services/worker_executor.py | Production mode: BLPOP + semaphore + lease + heartbeat |
services/run_executor.py | Shared execution logic (used by both executors) |
services/run_status.py | Database status updates |
services/run_preparation.py | Run creation (validate, persist, submit) |
services/lease_reaper.py | Background task recovering crashed worker runs |
services/broker.py | In-memory broker + factory |
services/redis_broker.py | Redis pub/sub broker for SSE |
services/streaming_service.py | SSE orchestration (replay + live) |
models/run_job.py | RunJob Pydantic model (serialized execution params) |
core/orm.py | Run table with execution_params, claimed_by, lease_expires_at |
core/active_runs.py | In-memory task registry (dev mode only) |
core/redis_manager.py | Redis connection pool singleton |