Handling long-running ETL transactions (1-3 hours) requires specific strategies to prevent database connection drops and memory leaks.

The Problem

In a standard web request, a database session lives for milliseconds. In an ETL job: 1. Idle Timeouts: If the Python process is busy transforming data or waiting for an API for 5 minutes, the database server (or a firewall/proxy like Supabase/Render) might kill the "idle" connection. 2. Memory Bloat: Loading thousands of ORM objects into a single session consumes massive RAM. 3. Stale Objects: If we commit a transaction, SQLAlchemy by default "expires" all loaded objects. Accessing them again triggers a new SQL query (N+1 problem) or raises MissingGreenlet in async contexts.


The Solution

We implemented a robust session management strategy in app/dataflow/etl_service.py inside the _load_products method.

1. expire_on_commit=False

We explicitly create a session with this flag disabled:

batch_session = sessionmanager._sessionmaker(expire_on_commit=False)

Why? * We cache things like "Existing SKUs" (existing_skus_cache) at the start of the job. * We perform intermediate commits (see below). * Without this flag, after a commit, sku.id would trigger a fresh DB query. With 10,000 SKUs, this would crash the database. * Fixes: sqlalchemy.exc.MissingGreenlet errors.

2. Keep-Alive Commits (Batching)

We force a database commit every 25 products:

COMMIT_INTERVAL = 25
# ... inside loop ...
if len(pending_ids) >= COMMIT_INTERVAL:
    await batch_session.commit()

Why? * Heartbeat: Sending a COMMIT signal keeps the TCP connection to the database active, preventing "server closed the connection unexpectedly" errors. * Transaction Size: Keeps the transaction log small. If we tried to commit 50,000 products in one go, the database WAL (Write Ahead Log) might grow too large or time out. * Progress: If a job fails at product 5000, the first 4975 are safely saved.

3. Connection Pooling Settings

In app/core/database.py, we tune the connection pool for stability:

  • pool_recycle: Set to 300 seconds (5 minutes). This proactively recycles connections before the server typically kills them (often 10-60 minutes).
  • TCP Keepalives: We inject keepalives_idle, keepalives_interval, and keepalives_count into the asyncpg connection arguments to prevent network-level drops (e.g., by Docker or Cloud Load Balancers).
connect_args={
    "keepalives": 1,
    "keepalives_idle": 30,
    "keepalives_interval": 10,
    "keepalives_count": 5
}