Overview

The SAQ (Simple Async Queue) module provides a robust job queue system for handling asynchronous tasks in the application. It includes a comprehensive observability system that tracks job runs, captures logs, and provides a web-based admin console for monitoring and managing jobs.

SAQ Console

The SAQ Console is a web-based admin interface located at /admin/saq that provides real-time monitoring and management of SAQ jobs. It offers:

  • Job Metrics Dashboard: Overview of completed, failed, queued, running, and scheduled jobs
  • Run History: Visual timeline of job executions grouped by function name
  • Log Viewer: Detailed logs for each job run with search and filtering capabilities
  • Job Enqueueing: Manual job execution with parameter configuration and scheduling
  • Job Cancellation: Ability to cancel queued jobs

Access Control

The SAQ Console is restricted to admin users only. Access is enforced via the _enforce_admin_access function in app/api/admin/routes.py, which checks if the authenticated user's phone number is in the allowed admin list:

_ADMIN_PHONES_DIGITS = {
    "41766753653",
    "12028077258",
    "14159608265",
}

Observability System

Task Run Registration

All SAQ tasks are automatically tracked through the saq_traced decorator in app/saq/observability.py. This decorator:

  1. Creates Run Records: When a job starts executing, a new SaqRun record is created in the database with status "running"
  2. Captures Logs: Attaches a custom log handler (_SaqRunLogHandler) to capture all logs during execution
  3. Updates Status: On completion or failure, updates the run record with final status, duration, and error information

Key Design Decisions

  • No Pre-Queued Records: Run records are created when the job actually starts executing, not when enqueued. This avoids race conditions and ensures accurate tracking.
  • Context-Based Log Capture: Uses Python's contextvars to associate logs with the correct run ID, even in concurrent scenarios.
  • Log Buffering: Logs are buffered in memory and flushed to the database after job completion to minimize database writes during execution.

The saq_traced Decorator

The decorator is applied to all SAQ functions in app/saq/saq.py:

_trace_worker = saq_traced(trigger_type="worker")

# Wrap worker functions
answer_notification_job = _trace_worker(answer_notification_job)
send_campaign_messages_job = _trace_worker(send_campaign_messages_job)
# ... etc

Parameters:

  • trigger_type: Either "worker" (manual/enqueued) or "scheduled" (cron)
  • cron: Optional cron expression for scheduled jobs
  • queue_name: Optional queue name

What it does:

  1. Extracts the SAQ job ID from the context
  2. Creates a SaqRun record with status "running"
  3. Sets up log capture via _SaqRunLogHandler
  4. Executes the wrapped function
  5. Updates the run record with completion status and duration
  6. Flushes buffered logs to the database

Log Capture Mechanism

The _SaqRunLogHandler class:

  • Filters Logs: Only captures logs when _current_run_id context variable matches the handler's run ID
  • Buffers in Memory: Stores logs in memory during execution to avoid excessive database writes
  • Enforces Limits: Truncates logs if they exceed 10,000 lines or 2MB
  • Chronological Ordering: Ensures logs are stored in chronological order with stable sequencing

Logs are stored in the saq_run_logs table with:

  • run_id: Foreign key to saq_runs
  • ts: Timestamp
  • level: Log level (INFO, ERROR, etc.)
  • message: Log message
  • extra: Additional metadata (logger name, file path, line number, function name)
  • seq: Sequence number for ordering

Job Discovery

The SAQ Console automatically discovers available jobs through the /api/admin/saq/available-jobs endpoint, which:

  1. Imports the SAQ Module: Dynamically imports app.saq.saq to access registered functions
  2. Discovers Worker Jobs:
  3. From settings["functions"] list
  4. Functions ending with _job in the module namespace
  5. Discovers Scheduled Jobs: From settings["cron_jobs"] list
  6. Extracts Metadata: Uses Python's inspect module to extract:
  7. Function signature and parameters
  8. Parameter types and default values
  9. Function documentation
  10. Cron expressions (for scheduled jobs)

Job Registration

Jobs are registered in app/saq/saq.py:

Worker Jobs (in settings["functions"]):

settings = {
    "functions": [
        send_campaign_messages_job,
        request_notification_job,
        # ... etc
    ],
}

Scheduled Jobs (in settings["cron_jobs"]):

cron_jobs = [
    CronJob(update_products_hourly, cron="30 12 * * *", timeout=10800),
    CronJob(calculate_campaign_overages, cron="15 * * * *", timeout=600),
    # ... etc
]

Database Models

SaqRun

Tracks individual job executions:

  • id: Primary key
  • status: "running", "complete", "failed", "queued", "cancelled"
  • function_name: Name of the executed function
  • saq_job_id: SAQ's internal job ID (from Redis)
  • trigger_type: "worker" or "scheduled"
  • cron: Cron expression (for scheduled jobs)
  • started_at, finished_at: Execution timestamps
  • duration_ms: Execution duration in milliseconds
  • args: JSONB field storing function arguments
  • result: JSONB field storing function return value (if stored)
  • error_message, error_traceback: Error information for failed jobs
  • queue_name: Optional queue name
  • scheduled_for: For scheduled jobs, when they're scheduled to run

SaqRunLog

Stores log entries for each run:

  • id: Primary key
  • run_id: Foreign key to saq_runs
  • ts: Timestamp
  • level: Log level
  • message: Log message
  • extra: Additional metadata (JSONB)
  • seq: Sequence number for ordering

API Endpoints

All SAQ admin endpoints are in app/api/admin/routes.py and require admin authentication:

GET /api/admin/saq/overview

Returns metrics for the last 24 hours and current state:

  • last_24h.completed: Count of completed jobs
  • last_24h.failed: Count of failed jobs
  • current.queued: Currently queued jobs
  • current.running: Currently running jobs
  • current.scheduled: Future scheduled jobs (from both DB and Redis)

GET /api/admin/saq/runs

Paginated list of job runs with filtering:

  • Query params: start, n, status, function_name, q (error search)
  • Returns: {total: int, runs: SaqRunRow[]}

GET /api/admin/saq/runs/{run_id}

Get detailed information about a specific run, including:

  • Full run metadata
  • args and result JSONB fields
  • error_traceback for failed jobs

GET /api/admin/saq/runs/{run_id}/logs

Paginated logs for a specific run:

  • Query params: cursor (seq > cursor), limit, q (message search)
  • Returns: {run_id: int, logs: SaqLogLine[], next_cursor: int}

POST /api/admin/saq/enqueue

Manually enqueue a job:

  • Body: {job_name: str, job_args?: dict, scheduled_epoch?: int}
  • Returns: {job_key: str, status: "queued"}

POST /api/admin/saq/jobs/{job_key}/cancel

Cancel a queued job:

  • Updates the job status to "cancelled" in the database
  • Aborts the job in Redis/SAQ

GET /api/admin/saq/available-jobs

List all available jobs that can be enqueued:

  • Returns: {available_jobs: SaqAvailableJob[], total_jobs: int}
  • Each job includes: name, parameters, doc, category, cron, timeout

Frontend Implementation

The SAQ Console frontend is implemented in frontend/app/admin/saq/page.tsx:

Key Features

  1. Overview Dashboard: Displays metrics cards for completed, failed, queued, running, and scheduled jobs
  2. Run History: Groups runs by function name, showing visual timeline of executions
  3. Run Detail Sheet: Side panel showing:
  4. Run metadata (created, started, duration)
  5. Error messages and tracebacks
  6. Searchable, filterable logs with wrap/copy functionality
  7. Job Enqueue Dialog:
  8. Job picker with search
  9. Parameter template generation
  10. JSON args editor
  11. Immediate or scheduled execution
  12. Filtering: Filter runs by status, function name, or error message

State Management

The component uses React hooks for state:

  • overview: Current metrics
  • runs: List of job runs
  • runDetail: Selected run details
  • logs: Log entries for selected run
  • availableJobs: List of jobs that can be enqueued

API Integration

The frontend uses Next.js API routes in frontend/app/api/admin/saq/ that proxy to the backend:

  • /api/admin/saq/overview/api/v1/admin/saq/overview
  • /api/admin/saq/runs/api/v1/admin/saq/runs
  • /api/admin/saq/runs/[run_id]/api/v1/admin/saq/runs/[run_id]
  • /api/admin/saq/runs/[run_id]/logs/api/v1/admin/saq/runs/[run_id]/logs
  • /api/admin/saq/enqueue/api/v1/admin/saq/enqueue
  • /api/admin/saq/jobs/[job_key]/cancel/api/v1/admin/saq/jobs/[job_key]/cancel
  • /api/admin/saq/available-jobs/api/v1/saq/available-jobs

Adding a New SAQ Job

To add a new SAQ job that appears in the console:

  1. Create the function in app/saq/functions.py or app/saq/tasks/:
async def my_new_job(ctx, param1: str, param2: int = 10):
    """Job documentation here."""
    # Job implementation
    pass
  1. Wrap with saq_traced in app/saq/saq.py:
my_new_job = _trace_worker(my_new_job)
  1. Register in settings:
settings = {
    "functions": [
        # ... existing jobs
        my_new_job,
    ],
}
  1. For scheduled jobs, also add to cron_jobs:
cron_jobs = [
    # ... existing cron jobs
    CronJob(my_new_job, cron="0 * * * *", timeout=300),
]

The job will automatically appear in the SAQ Console's available jobs list and be tracked when executed.

Maintenance

Log Cleanup

A scheduled job purge_saq_observability runs daily at 3:00 AM UTC to clean up old observability data:

  • Location: app/saq/tasks/purge_saq_observability_task.py
  • Cron: 0 3 * * *
  • Removes old runs and logs based on retention policy

Troubleshooting

Jobs Not Appearing in Console

  • Ensure the function is wrapped with saq_traced
  • Check that it's registered in settings["functions"] or settings["cron_jobs"]
  • Verify the function name matches what's expected

Logs Not Capturing

  • Check that run_id is valid (not -1)
  • Verify the log handler is attached (check logs for [saq_traced] messages)
  • Ensure logs are being flushed (check handler.flush_to_db() is called)

Run Records Not Created

  • Check database connection
  • Verify DBClient.db_insert_objects is working
  • Look for errors in application logs with [saq_traced] prefix