Overview

The Dataflow module is the central ETL (Extract, Transform, Load) engine for Cosmo. It handles the ingestion of product data from various merchant platforms (Shopify, GreenIT, Whop, etc.), normalizes it, and stores it in our database.

It is designed for High Concurrency, Fault Tolerance, and Scalability.

Architecture

graph LR
    A[Merchant API] -->|Extract| B(Raw Documents)
    B -->|Transform| C(ProductSchema Objects)
    C -->|Load| D[(Postgres DB)]

Key Components

Quick Start

Running a Flow Manually

You can trigger a flow via the flow_service:

from app.dataflow.flow_service import Flow
from app.clients.db_client import DBClient

# 1. Fetch the DataFlow record
data_flow = await DBClient.get(DataFlow, id=1)

# 2. Initialize and Run
flow = Flow(data_flow=data_flow, merchant_id=data_flow.merchant_id)
await flow.build()
await flow.run()

Monitoring

Check the DataFlowTimeline table for execution logs. Every run generates a unique flow_execution_id that ties together all extract, transform, and load events.