Overview
The SAQ (Simple Async Queue) module provides a robust job queue system for handling asynchronous tasks in the application. It includes a comprehensive observability system that tracks job runs, captures logs, and provides a web-based admin console for monitoring and managing jobs.
SAQ Console
The SAQ Console is a web-based admin interface located at /admin/saq that provides real-time monitoring and management of SAQ jobs. It offers:
- Job Metrics Dashboard: Overview of completed, failed, queued, running, and scheduled jobs
- Run History: Visual timeline of job executions grouped by function name
- Log Viewer: Detailed logs for each job run with search and filtering capabilities
- Job Enqueueing: Manual job execution with parameter configuration and scheduling
- Job Cancellation: Ability to cancel queued jobs
Access Control
The SAQ Console is restricted to admin users only. Access is enforced via the _enforce_admin_access function in app/api/admin/routes.py, which checks if the authenticated user's phone number is in the allowed admin list:
_ADMIN_PHONES_DIGITS = {
"41766753653",
"12028077258",
"14159608265",
}
Observability System
Task Run Registration
All SAQ tasks are automatically tracked through the saq_traced decorator in app/saq/observability.py. This decorator:
- Creates Run Records: When a job starts executing, a new
SaqRunrecord is created in the database with status "running" - Captures Logs: Attaches a custom log handler (
_SaqRunLogHandler) to capture all logs during execution - Updates Status: On completion or failure, updates the run record with final status, duration, and error information
Key Design Decisions
- No Pre-Queued Records: Run records are created when the job actually starts executing, not when enqueued. This avoids race conditions and ensures accurate tracking.
- Context-Based Log Capture: Uses Python's
contextvarsto associate logs with the correct run ID, even in concurrent scenarios. - Log Buffering: Logs are buffered in memory and flushed to the database after job completion to minimize database writes during execution.
The saq_traced Decorator
The decorator is applied to all SAQ functions in app/saq/saq.py:
_trace_worker = saq_traced(trigger_type="worker")
# Wrap worker functions
answer_notification_job = _trace_worker(answer_notification_job)
send_campaign_messages_job = _trace_worker(send_campaign_messages_job)
# ... etc
Parameters:
trigger_type: Either "worker" (manual/enqueued) or "scheduled" (cron)cron: Optional cron expression for scheduled jobsqueue_name: Optional queue name
What it does:
- Extracts the SAQ job ID from the context
- Creates a
SaqRunrecord with status "running" - Sets up log capture via
_SaqRunLogHandler - Executes the wrapped function
- Updates the run record with completion status and duration
- Flushes buffered logs to the database
Log Capture Mechanism
The _SaqRunLogHandler class:
- Filters Logs: Only captures logs when
_current_run_idcontext variable matches the handler's run ID - Buffers in Memory: Stores logs in memory during execution to avoid excessive database writes
- Enforces Limits: Truncates logs if they exceed 10,000 lines or 2MB
- Chronological Ordering: Ensures logs are stored in chronological order with stable sequencing
Logs are stored in the saq_run_logs table with:
run_id: Foreign key tosaq_runsts: Timestamplevel: Log level (INFO, ERROR, etc.)message: Log messageextra: Additional metadata (logger name, file path, line number, function name)seq: Sequence number for ordering
Job Discovery
The SAQ Console automatically discovers available jobs through the /api/admin/saq/available-jobs endpoint, which:
- Imports the SAQ Module: Dynamically imports
app.saq.saqto access registered functions - Discovers Worker Jobs:
- From
settings["functions"]list - Functions ending with
_jobin the module namespace - Discovers Scheduled Jobs: From
settings["cron_jobs"]list - Extracts Metadata: Uses Python's
inspectmodule to extract: - Function signature and parameters
- Parameter types and default values
- Function documentation
- Cron expressions (for scheduled jobs)
Job Registration
Jobs are registered in app/saq/saq.py:
Worker Jobs (in settings["functions"]):
settings = {
"functions": [
send_campaign_messages_job,
request_notification_job,
# ... etc
],
}
Scheduled Jobs (in settings["cron_jobs"]):
cron_jobs = [
CronJob(update_products_hourly, cron="30 12 * * *", timeout=10800),
CronJob(calculate_campaign_overages, cron="15 * * * *", timeout=600),
# ... etc
]
Database Models
SaqRun
Tracks individual job executions:
id: Primary keystatus: "running", "complete", "failed", "queued", "cancelled"function_name: Name of the executed functionsaq_job_id: SAQ's internal job ID (from Redis)trigger_type: "worker" or "scheduled"cron: Cron expression (for scheduled jobs)started_at,finished_at: Execution timestampsduration_ms: Execution duration in millisecondsargs: JSONB field storing function argumentsresult: JSONB field storing function return value (if stored)error_message,error_traceback: Error information for failed jobsqueue_name: Optional queue namescheduled_for: For scheduled jobs, when they're scheduled to run
SaqRunLog
Stores log entries for each run:
id: Primary keyrun_id: Foreign key tosaq_runsts: Timestamplevel: Log levelmessage: Log messageextra: Additional metadata (JSONB)seq: Sequence number for ordering
API Endpoints
All SAQ admin endpoints are in app/api/admin/routes.py and require admin authentication:
GET /api/admin/saq/overview
Returns metrics for the last 24 hours and current state:
last_24h.completed: Count of completed jobslast_24h.failed: Count of failed jobscurrent.queued: Currently queued jobscurrent.running: Currently running jobscurrent.scheduled: Future scheduled jobs (from both DB and Redis)
GET /api/admin/saq/runs
Paginated list of job runs with filtering:
- Query params:
start,n,status,function_name,q(error search) - Returns:
{total: int, runs: SaqRunRow[]}
GET /api/admin/saq/runs/{run_id}
Get detailed information about a specific run, including:
- Full run metadata
argsandresultJSONB fieldserror_tracebackfor failed jobs
GET /api/admin/saq/runs/{run_id}/logs
Paginated logs for a specific run:
- Query params:
cursor(seq > cursor),limit,q(message search) - Returns:
{run_id: int, logs: SaqLogLine[], next_cursor: int}
POST /api/admin/saq/enqueue
Manually enqueue a job:
- Body:
{job_name: str, job_args?: dict, scheduled_epoch?: int} - Returns:
{job_key: str, status: "queued"}
POST /api/admin/saq/jobs/{job_key}/cancel
Cancel a queued job:
- Updates the job status to "cancelled" in the database
- Aborts the job in Redis/SAQ
GET /api/admin/saq/available-jobs
List all available jobs that can be enqueued:
- Returns:
{available_jobs: SaqAvailableJob[], total_jobs: int} - Each job includes:
name,parameters,doc,category,cron,timeout
Frontend Implementation
The SAQ Console frontend is implemented in frontend/app/admin/saq/page.tsx:
Key Features
- Overview Dashboard: Displays metrics cards for completed, failed, queued, running, and scheduled jobs
- Run History: Groups runs by function name, showing visual timeline of executions
- Run Detail Sheet: Side panel showing:
- Run metadata (created, started, duration)
- Error messages and tracebacks
- Searchable, filterable logs with wrap/copy functionality
- Job Enqueue Dialog:
- Job picker with search
- Parameter template generation
- JSON args editor
- Immediate or scheduled execution
- Filtering: Filter runs by status, function name, or error message
State Management
The component uses React hooks for state:
overview: Current metricsruns: List of job runsrunDetail: Selected run detailslogs: Log entries for selected runavailableJobs: List of jobs that can be enqueued
API Integration
The frontend uses Next.js API routes in frontend/app/api/admin/saq/ that proxy to the backend:
/api/admin/saq/overview→/api/v1/admin/saq/overview/api/admin/saq/runs→/api/v1/admin/saq/runs/api/admin/saq/runs/[run_id]→/api/v1/admin/saq/runs/[run_id]/api/admin/saq/runs/[run_id]/logs→/api/v1/admin/saq/runs/[run_id]/logs/api/admin/saq/enqueue→/api/v1/admin/saq/enqueue/api/admin/saq/jobs/[job_key]/cancel→/api/v1/admin/saq/jobs/[job_key]/cancel/api/admin/saq/available-jobs→/api/v1/saq/available-jobs
Adding a New SAQ Job
To add a new SAQ job that appears in the console:
- Create the function in
app/saq/functions.pyorapp/saq/tasks/:
async def my_new_job(ctx, param1: str, param2: int = 10):
"""Job documentation here."""
# Job implementation
pass
- Wrap with
saq_tracedinapp/saq/saq.py:
my_new_job = _trace_worker(my_new_job)
- Register in settings:
settings = {
"functions": [
# ... existing jobs
my_new_job,
],
}
- For scheduled jobs, also add to
cron_jobs:
cron_jobs = [
# ... existing cron jobs
CronJob(my_new_job, cron="0 * * * *", timeout=300),
]
The job will automatically appear in the SAQ Console's available jobs list and be tracked when executed.
Maintenance
Log Cleanup
A scheduled job purge_saq_observability runs daily at 3:00 AM UTC to clean up old observability data:
- Location:
app/saq/tasks/purge_saq_observability_task.py - Cron:
0 3 * * * - Removes old runs and logs based on retention policy
Troubleshooting
Jobs Not Appearing in Console
- Ensure the function is wrapped with
saq_traced - Check that it's registered in
settings["functions"]orsettings["cron_jobs"] - Verify the function name matches what's expected
Logs Not Capturing
- Check that
run_idis valid (not -1) - Verify the log handler is attached (check logs for
[saq_traced]messages) - Ensure logs are being flushed (check
handler.flush_to_db()is called)
Run Records Not Created
- Check database connection
- Verify
DBClient.db_insert_objectsis working - Look for errors in application logs with
[saq_traced]prefix