Orchestration
The Dataflow module uses a robust orchestration layer to manage the execution of ETL jobs. This ensures that massive data syncs (handling thousands of products) can run reliably without overwhelming the system or the database.
The Flow Service
The core orchestrator is the Flow class defined in app/dataflow/flow_service.py.
Lifecycle of a Flow
- Build: Resolves the necessary
Extractclient andTransformmappers based on theDataFlowconfiguration in the database. - Run:
- Generates a unique
flow_execution_id(UUID). - Extracts data using the configured client.
- Transforms data in parallel (using
asyncio.gatherand semaphores). - Loads data into the database using batched upserts.
- Archives old products that are no longer present in the source.
- Generates a unique
- Notification: Logs every major step (start, error, completion) to the
DataFlowTimelinetable for auditability.
SAQ & Background Jobs
We use SAQ (Simple Async Queue) for background job processing. This decouples the trigger (e.g., a cron schedule or API call) from the heavy lifting.
The Hourly Scheduler
The file app/saq/tasks/product_updates_task.py contains the update_products_hourly task, which runs on a cron schedule.
Responsibilities:
1. Queries all "active" merchants (Online status, Billable plan).
2. Iterates through valid DataFlow configurations.
3. Enqueues a specific run_product_flow_job for each merchant.
Distributed Concurrency Control
A critical requirement for our system is Database Stability. In the past, running multiple large import jobs simultaneously exhausted our database connection pool (limit: 100) and caused CPU spikes.
To solve this, we implemented Strict Sequential Processing using RedisLock.
1. Global Scheduler Lock
Key: update_products_hourly_global
* Timeout: 0 (Fail immediately if locked)
* Expiry: 6 hours
* Purpose: Ensures only one "scheduler" instance runs at a time. If the previous hour's job is still queuing tasks, the new run will skip.
2. Execution Lock
Key: product_flow_execution_lock
* Timeout: 4 hours (Wait up to 4 hours to acquire)
* Expiry: 4 hours
* Purpose: This is the most critical lock. It forces strictly sequential execution of data flows across the entire SAQ cluster.
* Behavior:
* Worker A picks up Job 1 -> Acquires Lock -> Runs.
* Worker B picks up Job 2 -> Tries Lock -> Waits...
* Worker A finishes -> Releases Lock.
* Worker B acquires Lock -> Runs.
Why Sequential? While our infrastructure could scale to handle parallel imports, the database write pattern for products (heavy UPSERTs with complex relationships) creates significant lock contention. Sequential processing guarantees predictable performance and zero database "Connection Timeout" errors.
Timeouts & Configuration
Long-running jobs require carefully tuned timeouts to avoid being killed mid-process.
| Setting | Value | Reason |
|---|---|---|
| Job Timeout | 3 Hours (10800s) |
Large Shopify stores (50k+ products) can take over 2 hours to sync. |
| Lock Wait | 4 Hours (14400s) |
A worker must be willing to wait for the previous large job to finish. |
| Lock Expiry | 4 Hours | Safety valve: if a worker crashes hard, the lock eventually releases. |