Session Management
Handling long-running ETL transactions (1-3 hours) requires specific strategies to prevent database connection drops and memory leaks.
The Problem
In a standard web request, a database session lives for milliseconds. In an ETL job:
1. Idle Timeouts: If the Python process is busy transforming data or waiting for an API for 5 minutes, the database server (or a firewall/proxy like Supabase/Render) might kill the "idle" connection.
2. Memory Bloat: Loading thousands of ORM objects into a single session consumes massive RAM.
3. Stale Objects: If we commit a transaction, SQLAlchemy by default "expires" all loaded objects. Accessing them again triggers a new SQL query (N+1 problem) or raises MissingGreenlet in async contexts.
The Solution
We implemented a robust session management strategy in app/dataflow/etl_service.py inside the _load_products method.
1. expire_on_commit=False
We explicitly create a session with this flag disabled:
batch_session = sessionmanager._sessionmaker(expire_on_commit=False)
Why?
* We cache things like "Existing SKUs" (existing_skus_cache) at the start of the job.
* We perform intermediate commits (see below).
* Without this flag, after a commit, sku.id would trigger a fresh DB query. With 10,000 SKUs, this would crash the database.
* Fixes: sqlalchemy.exc.MissingGreenlet errors.
2. Keep-Alive Commits (Batching)
We force a database commit every 25 products:
COMMIT_INTERVAL = 25
# ... inside loop ...
if len(pending_ids) >= COMMIT_INTERVAL:
await batch_session.commit()
Why?
* Heartbeat: Sending a COMMIT signal keeps the TCP connection to the database active, preventing "server closed the connection unexpectedly" errors.
* Transaction Size: Keeps the transaction log small. If we tried to commit 50,000 products in one go, the database WAL (Write Ahead Log) might grow too large or time out.
* Progress: If a job fails at product 5000, the first 4975 are safely saved.
3. Connection Pooling Settings
In app/core/database.py, we tune the connection pool for stability:
pool_recycle: Set to 300 seconds (5 minutes). This proactively recycles connections before the server typically kills them (often 10-60 minutes).- TCP Keepalives: We inject
keepalives_idle,keepalives_interval, andkeepalives_countinto theasyncpgconnection arguments to prevent network-level drops (e.g., by Docker or Cloud Load Balancers).
connect_args={
"keepalives": 1,
"keepalives_idle": 30,
"keepalives_interval": 10,
"keepalives_count": 5
}