Overview
The Dataflow module is the central ETL (Extract, Transform, Load) engine for Cosmo. It handles the ingestion of product data from various merchant platforms (Shopify, GreenIT, Whop, etc.), normalizes it, and stores it in our database.
It is designed for High Concurrency, Fault Tolerance, and Scalability.
Architecture
graph LR
A[Merchant API] -->|Extract| B(Raw Documents)
B -->|Transform| C(ProductSchema Objects)
C -->|Load| D[(Postgres DB)]
Key Components
- Orchestration & Concurrency: How we manage jobs, prevent overlaps, and use Redis distributed locks.
- Extraction: How we fetch data from Shopify (Bulk API), Whop (Async), and others.
- Transformation: Mappers that convert external data to our internal schema.
- Loading & Optimization: Smart batching, N+1 prevention, and database upsert logic.
- Session Management: How we prevent database timeouts during long runs.
Quick Start
Running a Flow Manually
You can trigger a flow via the flow_service:
from app.dataflow.flow_service import Flow
from app.clients.db_client import DBClient
# 1. Fetch the DataFlow record
data_flow = await DBClient.get(DataFlow, id=1)
# 2. Initialize and Run
flow = Flow(data_flow=data_flow, merchant_id=data_flow.merchant_id)
await flow.build()
await flow.run()
Monitoring
Check the DataFlowTimeline table for execution logs. Every run generates a unique flow_execution_id that ties together all extract, transform, and load events.