Loading
The Load layer is the most complex part of the pipeline. It handles Upserts (Update if exists, Insert if new) and ensures data integrity.
The Load Class
Defined in app/dataflow/etl_service.py, specifically the _load_products method.
Optimizations
1. Complexity-Based Batching
We do not batch simply by "count" (e.g., 100 products). A single product with 100 variants is computationally equal to 100 products with 1 variant.
The Algorithm:
doc_complexity = 1 + len(variants)
MAX_BATCH_COMPLEXITY = 100
if current_batch_complexity + doc_complexity > MAX_BATCH_COMPLEXITY:
flush_batch()
This ensures that a batch never becomes too heavy to process, preventing transaction timeouts.
2. Anti-N+1 Strategy (Prefetching)
Before processing a batch, we fetch all relevant data in one query:
# 1. Get all Product IDs for source_ids in this batch
existing_products_map = {source_id: db_id, ...}
# 2. Get all SKU records for these products
existing_skus_cache = {(product_id, sku_source_id): sku_obj, ...}
When upserting, we check these in-memory dictionaries instead of running SELECT * FROM skus WHERE ... for every single item. This reduces database queries by 95%+.
3. Batch Attribute Insertion
Inserting attributes (Color, Size, Material) is the slowest part because there are so many of them.
* Old Way: Loop through variants -> Loop through attributes -> Insert.
* New Way: Collect all new attributes for the entire batch into a generic list new_product_attributes. Run a single db_insert_objects(new_product_attributes) at the end of the transaction.
4. Deferred Embedding Generation
We use AI embeddings to power our semantic product search. Generating these requires calling the OpenAI API.
* Optimization: We do not generate embeddings inside the main DB transaction loop.
* Process:
1. Load ALL products to DB.
2. Commit transactions.
3. After the loop finishes, call ProductSearcher.upload_product_embeddings(all_loaded_ids).
4. This keeps the database lock time minimal.
5. Archiving
What happens if a merchant deletes a product?
* We track every product_id we successfully touched during the run.
* At the end, we query: SELECT id FROM products WHERE merchant_id=X AND id NOT IN (touched_ids).
* We bulk update these to status = 'archived'.
* Chunking: This is done in chunks of 5000 to avoid locking the table.