Skip to main content
Aegra uses a worker-based execution model for production deployments. Runs are dispatched through a Redis job queue and executed by concurrent asyncio workers across any number of instances. Lease-based crash recovery ensures no run is lost, even if an instance dies mid-execution. In dev mode (aegra dev), none of this is needed — runs execute as simple in-process asyncio tasks.

Run lifecycle overview

How a single run flows through the system — from client request to completion: View fullscreen ↗ The happy path of a single run, step by step:
  1. Client sends a run request to any instance via the load balancer.
  2. FastAPI validates, persists the run to PostgreSQL (with serialized execution params), and pushes the run ID onto the Redis job queue.
  3. Worker picks up the job via BLPOP (blocking pop — instant delivery, no polling).
  4. Worker acquires an exclusive lease in PostgreSQL, executes the graph, heartbeats every 10s, and finalizes the run (status, output, release lease).
  5. Worker publishes each event to Redis Pub/Sub for real-time streaming.
  6. SSE Broker subscribes to Redis Pub/Sub and relays events.
  7. Client receives events over an SSE connection.
Behind the scenes, the Lease Reaper scans PostgreSQL every 15 seconds for runs with expired leases (crashed workers) and re-enqueues them. Workers also send OpenTelemetry traces to Langfuse, Phoenix, or any OTLP collector.

Horizontal scaling

Every instance is stateless — add more instances behind a load balancer to increase capacity. All instances share the same Redis and PostgreSQL backends. View fullscreen ↗ Each instance runs multiple worker loops (default 3), and each worker loop handles up to N_JOBS_PER_WORKER concurrent runs (default 10) via an asyncio semaphore. Total capacity per instance is WORKER_COUNT x N_JOBS_PER_WORKER — 30 concurrent runs by default. A job enqueued by Instance A can be picked up by Instance B — any worker can execute any run.

Job lifecycle

The detailed sequence — every Redis command, database query, and semaphore operation involved in executing a single run: View fullscreen ↗ Key details:
  1. Execution params are stored in PostgreSQL alongside the run, so any worker can reconstruct the full job context (user identity, config, trace metadata, interrupt settings, etc.).
  2. Lease acquisition is atomic — UPDATE ... WHERE claimed_by IS NULL ensures only one worker wins the race.
  3. Heartbeats extend the lease every 10 seconds. If a worker crashes, the lease expires and the run becomes eligible for recovery.
  4. OpenTelemetry trace context propagates across the Redis queue boundary, so traces span the full request lifecycle.

Streaming run with crash recovery

The full picture — a streaming run shows how the client SSE connection, worker execution, and reaper recovery all interact: View fullscreen ↗

Crash recovery

Every instance runs a LeaseReaper background task that scans for runs with expired leases. When a worker crashes (OOM, kill signal, network partition), its heartbeat stops and the lease expires. The reaper resets the run to pending and re-enqueues it. The new worker resumes from the last checkpoint. View fullscreen ↗ The reaper runs on every instance (default every 15 seconds). This is safe — the atomic lease acquisition prevents duplicate execution.

Cross-instance cancellation

Cancel requests can arrive at any instance, but the run may be executing on a different one. Aegra uses Redis pub/sub to propagate cancellation across instances. View fullscreen ↗

Dev vs production mode

Dev (aegra dev)Production (aegra up / aegra serve)
ExecutorLocalExecutorasyncio.create_task()WorkerExecutor — BLPOP + semaphore + lease
Redis requiredNoYes
Crash recoveryNo (single process)Yes (lease reaper)
Cross-instance cancelN/AYes (Redis pub/sub)
SSE brokerIn-memory queue + listRedis pub/sub + replay buffer
ScalingSingle instanceHorizontal (shared Redis + PostgreSQL)
aegra dev is designed for fast iteration — no Redis, no workers, no leases. Just start coding and your runs execute immediately in-process.

Redis data layout

KeyPurposeWritten byRead by
aegra:jobsJob queue (BLPOP)API (RPUSH)Worker (BLPOP)
aegra:run:{run_id}SSE live eventsWorker (PUBLISH)API (SUBSCRIBE)
aegra:run:cache:{run_id}SSE replay bufferWorker (RPUSH)API (LRANGE)
aegra:run:counter:{run_id}Event sequence counterWorker (INCR)API (GET)
aegra:run:cancelCancel commandsAPI (PUBLISH)Worker (SUBSCRIBE)
aegra:run:done:{run_id}Completion signal (TTL 1h)Worker (SET)API (EXISTS)

Configuration

All worker settings are configured via environment variables. See the environment variables reference for the full list.
VariableDefaultDescription
REDIS_BROKER_ENABLEDfalseEnable Redis workers and broker
REDIS_URLredis://localhost:6379/0Redis connection URL
WORKER_COUNT3Worker loops per instance
N_JOBS_PER_WORKER10Concurrent runs per worker loop
BG_JOB_TIMEOUT_SECS3600Max execution time per run (1 hour)
BG_JOB_MAX_RETRIES3Max retry attempts before a crashed run is permanently failed
STUCK_PENDING_THRESHOLD_SECONDS120How long a pending run can sit before the reaper re-enqueues it
LEASE_DURATION_SECONDS30Lease TTL before reaper reclaims
HEARTBEAT_INTERVAL_SECONDS10Lease extension frequency
REAPER_INTERVAL_SECONDS15Expired lease scan frequency
POSTGRES_POLL_INTERVAL_SECONDS5Fallback poll interval when Redis is unavailable
WORKER_DRAIN_TIMEOUT30.0Graceful shutdown wait time (seconds)
When Redis is unavailable, workers fall back to polling PostgreSQL for pending runs at POSTGRES_POLL_INTERVAL_SECONDS intervals. This keeps the system functional during Redis outages, though with higher latency.

File map

FilePurpose
services/executor.pyFactory — selects LocalExecutor or WorkerExecutor
services/base_executor.pyAbstract interface: submit, wait, start, stop
services/local_executor.pyDev mode: asyncio.create_task
services/worker_executor.pyProduction mode: BLPOP + semaphore + lease + heartbeat
services/run_executor.pyShared execution logic (used by both executors)
services/run_status.pyDatabase status updates
services/run_preparation.pyRun creation (validate, persist, submit)
services/lease_reaper.pyBackground task recovering crashed worker runs
services/broker.pyIn-memory broker + factory
services/redis_broker.pyRedis pub/sub broker for SSE
services/streaming_service.pySSE orchestration (replay + live)
models/run_job.pyRunJob Pydantic model (serialized execution params)
core/orm.pyRun table with execution_params, claimed_by, lease_expires_at
core/active_runs.pyIn-memory task registry (dev mode only)
core/redis_manager.pyRedis connection pool singleton