The Load layer is the most complex part of the pipeline. It handles Upserts (Update if exists, Insert if new) and ensures data integrity.

The Load Class

Defined in app/dataflow/etl_service.py, specifically the _load_products method.

Optimizations

1. Complexity-Based Batching

We do not batch simply by "count" (e.g., 100 products). A single product with 100 variants is computationally equal to 100 products with 1 variant.

The Algorithm:

doc_complexity = 1 + len(variants)
MAX_BATCH_COMPLEXITY = 100

if current_batch_complexity + doc_complexity > MAX_BATCH_COMPLEXITY:
    flush_batch()
This ensures that a batch never becomes too heavy to process, preventing transaction timeouts.

2. Anti-N+1 Strategy (Prefetching)

Before processing a batch, we fetch all relevant data in one query:

# 1. Get all Product IDs for source_ids in this batch
existing_products_map = {source_id: db_id, ...}

# 2. Get all SKU records for these products
existing_skus_cache = {(product_id, sku_source_id): sku_obj, ...}

When upserting, we check these in-memory dictionaries instead of running SELECT * FROM skus WHERE ... for every single item. This reduces database queries by 95%+.

3. Batch Attribute Insertion

Inserting attributes (Color, Size, Material) is the slowest part because there are so many of them. * Old Way: Loop through variants -> Loop through attributes -> Insert. * New Way: Collect all new attributes for the entire batch into a generic list new_product_attributes. Run a single db_insert_objects(new_product_attributes) at the end of the transaction.

4. Deferred Embedding Generation

We use AI embeddings to power our semantic product search. Generating these requires calling the OpenAI API. * Optimization: We do not generate embeddings inside the main DB transaction loop. * Process: 1. Load ALL products to DB. 2. Commit transactions. 3. After the loop finishes, call ProductSearcher.upload_product_embeddings(all_loaded_ids). 4. This keeps the database lock time minimal.

5. Archiving

What happens if a merchant deletes a product? * We track every product_id we successfully touched during the run. * At the end, we query: SELECT id FROM products WHERE merchant_id=X AND id NOT IN (touched_ids). * We bulk update these to status = 'archived'. * Chunking: This is done in chunks of 5000 to avoid locking the table.