Skip to Content
DevopsBulk Seed with Variance Report

Bulk Seed with Variance Report

What

Bulk-seed 1000+ records idempotently across five tables (products, accounts, transactions, conversations, messages) by creating a fresh async service per parent entity, committing per parent, and finally emitting a variance table that contrasts expected vs actual counts so partial failures are obvious at a glance.

Project Context

In full_project_context_updated.txt -> app/seed_database.py, products are committed one-at-a-time because accounts need their FK to resolve. Accounts commit per customer so one customer crash does not abort the rest of the batch. Transactions commit per account for the same reason. The expected-vs-actual report at the end tells the operator that, e.g., 94 of 100 customers succeeded and 6 failed silently — far better than seeing a single generic success line over a half-empty database.

How

Per-parent commit, fresh service per loop iteration

async def seed_products() -> list[int]: product_ids, products_failed = [], 0 for data in SAMPLE_PRODUCTS: async with ProductService() as service: try: product = await service.repo.create(data) await service.commit() # commit each product individually product_ids.append(product.id) except Exception as e: products_failed += 1 try: await service.rollback() except Exception: pass logger.warning(f"Skipped product {data.get('name')}: {str(e)[:200]}") return product_ids async def seed_accounts(customer_ids, product_ids): for idx, customer_id in enumerate(customer_ids): async with AccountService() as service: num_accounts = 2 + (idx % 2) # 2 or 3 accounts per customer for j in range(num_accounts): try: account_data = { "account_number": f"ACC{customer_id}{j:02d}", "customer_id": customer_id, "product_id": product_ids[j % len(product_ids)], ... } account = await service.repo.create(account_data) account_ids.append(account.id) except Exception: accounts_failed += 1 continue await service.commit() return account_ids
  • async with per parent entity guarantees the session closes on partial failure, freeing the connection back to the pool.
  • service.commit() after the parent group’s create calls groups inserts into one transaction per parent. If the parent succeeds but a child fails, only the child is rolled back; the parent stays.

End-of-run variance report

def render_variance_report(expected, actual): for entity in ("customers", "products", "accounts", "transactions", "conversations", "messages"): e, a = expected[entity], actual[entity] logger.info(f"{entity:<14} expected={e:,} actual={a:,} " f"variance={a - e:+,}") success_rate = actual["total"] / expected["total"] * 100 if success_rate >= 99.0: logger.info(f"PASSED — {success_rate:.2f}% success") elif success_rate >= 95.0: logger.info(f"WARNING — {success_rate:.2f}% success") else: logger.info(f"FAILED — {success_rate:.2f}% success")
  • Logging str(e)[:200] trims tracebacks to the first 200 chars to avoid spamming the log — long DB driver errors are usually prefixes that already identify the cause.
  • The variance report converts silent partial failure into an explicit (actual, expected, variance) table so CI can grep for FAILED and fail the run.

Top-to-Bottom Code Walkthrough (app/seed_database.py)

app/seed_database.py is a long script with consistent structure: imports at the top, constants in the middle, per-entity async seeders, an end-of-run variance report, and a CLI shim. Here is the walkthrough.

Imports: asyncio, logging, datetime/timedelta/timezone, Decimal for monetary amounts (never subtract with floats when dealing with money — Decimal preserves precision), Faker from faker for realistic UK data via Faker("en_GB"). From sqlalchemy: text (raw SQL wrapper) and delete (bulk DELETE). From sqlalchemy.ext.asyncio: AsyncSession typing. From app.database: AsyncSessionLocal (the session factory), engine (the async engine), Base (the declarative base). From app.services: a long list of services because each entity has its own service layer (CustomerService, ProductService, AccountService, TransactionService, ConversationService, MessageService, SecurityService). From app.models: the enums AccountType, AccountStatus, MessageRole, ConversationChannel, and the FAQ model.

logging.basicConfig(level=logging.INFO) is used because the script runs as a standalone CLI (python -m app.seed_database --clear --customers 200) and doesn’t go through setup_logging() in app/logger.py. This is intentional — the seed script shouldn’t be sensitive to app-level log config; it logs to the console. The __name__ == "__main__"-attached logger logger = logging.getLogger(__name__) follows the standard pattern.

fake = Faker("en_GB") instantiates the UK-locale Faker. Pick the locale because banks in production deal with UK customers — US-style phone numbers and addresses are wrong.

async def create_all_tables() -> None: is idempotent DDL bootstrapping. async with engine.begin() as conn: opens a transaction that lasts the entire block — postgres DDL is transactional. await conn.run_sync(Base.metadata.create_all) runs the sync ORM metadata create on a worker thread, generating CREATE TABLE IF NOT EXISTS statements for every model. Idempotent because IF NOT EXISTS is part of each statement.

def generate_customers(count, default_pwd_hash) returns a list of dicts that match Customer model fields. Note that fake.random_element([True, False, False, False]) deliberately biases ~25% of customers to VIP — not 50%, not random — it’s a hand-tuned prior. hashed_password field is filled with a pre-hashed default so we don’t re-hash the same password for every customer — bcrypt is intentionally slow (~300ms per hash); doing it 100× in a loop would take 30 seconds.

The constants block holds four typed lists: SAMPLE_PRODUCTS (8 financial products: mortgages, savings, credit, loans, current accounts — each with name/type/description/interest_rate/features/requirements/is_active). SAMPLE_FAQS (6 hand-crafted Q&A pairs covering onboarding, contact, fees, rates, safety, mobile) — these are deliberately not auto-generated because their content is regulatory-sensitive. REALISTIC_MERCHANTS (28 UK high-street chains — Tesco, Sainsbury’s, Starbucks, Netflix etc.) for transaction merchant names. TRANSACTION_CATEGORIES (16 standard Spending-Categories). SAMPLE_CONVERSATIONS (3 example conversations with channel + intent metadata).

async def seed_customers(count) is the first seeder. security = SecurityService(); default_hash = security.get_password_hash("password123") pre-hashes the default password once. Then async with CustomerService() as service: opens a service lifecycle. Inside the for-loop: try: customer = await service.create_customer(**data); customer_ids.append(customer.id) and every-10th-customer log line; on exception, service.rollback() + log + continue. await service.commit() commits at the end. This per-iteration try/except keeps a single bad row from killing the whole batch.

async def seed_faqs(session) is a one-shot Q&A insert — await session.execute(delete(FAQ)) clears existing; then loops SAMPLE_FAQS, adds each to the session, and commits at the end.

async def seed_products() is the FK-root seeder. The critical detail is that the entire function loops SAMPLE_PRODUCTS and opens a fresh async with ProductService() as service per product, and commits each successful product individually. Why per-product commit: accounts.product_id is a FOREIGN KEY; if you leave products uncommitted, the FK has no target and inserts cascade-fail. Committing per-product forces a stable FK surface as the loop progresses.

async def seed_accounts(customer_ids, product_ids) per-customer pattern. Inside async with AccountService() as service:, async with AsyncSessionLocal() as session: reads the Customer.customer_id field (the external string ID, not the DB primary key). For each account creation attempt, service.repo.create(account_data) writes the row; failures accumulate into accounts_failed, kept in accounts_failed += 1 then continue. await service.commit() after the inner loop commits all accounts for this customer atomically — one customer’s worth, not the whole batch.

async def seed_transactions(account_ids) is the biggest-volume seeder. Per-account, fresh TransactionService(), 10-20 transactions per account. The date arithmetic datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(days=days_ago) builds a naive UTC timestamp — needed because Transaction.date was declared as a naive DateTime column during the initial schema, and Postgres trips on aware-naive mixing without explicit conversion. The replace-to-naive fix is a documented gotcha (see datetime.utcnow() deprecation history).

async def seed_conversations(customer_ids) creates 3 conversation records, one per customer.

async def seed_messages(conversation_ids) writes 2 messages per conversation (one customer + one agent reply each).

async def clear_database() runs raw DELETE FROM table statements in FK-safe order: transactions, accounts, products, messages, conversations, customers. The order matters — children before parents.

async def seed_all(clear_first, customer_count) is the orchestrator. Steps are explicit: create tables, optionally clear, then sequentially seed each entity. After seeding, the variance-report loop computes expected per entity, actual counts, the variance and the success-rate percentage, and logs a formatted table. The final block decides PASSED/WARNING/FAILED based on a variance threshold of 5 records: within 5 → PASSED, between 5 and 50 missed → WARNING, more than 50 missed → FAILED. CI can grep for these strings.

The __main__ block parses sys.argv — --clear toggles clear_first=True; --customers N parses the integer for customer_count; default customer_count=100. Then asyncio.run(seed_all(...)) kicks off the entire flow.

Common Pitfalls

Sharing a single AsyncSession across all inserts carries one bad row into the rollback of the whole batch — one constraint violation deletes the entire upstream seed. Always re-scope per parent.

Reporting a generic success line for any completed run lies. Only print the success banner after the variance table shows every entity within tolerance.

Real-World Interview Prep

Q1: How would you seed 1M rows in under 60 seconds in Postgres?

A: Don’t loop INSERT — use COPY. Three options ordered by throughput: (1) COPY ... FROM STDIN with asyncpg.copy_records_to_table — ~300k rows/sec. (2) INSERT ... VALUES (...), (...), (...) with multi-row VALUES in chunks of 1000 — ~30k rows/sec. (3) INSERT per-row via ORM — ~3k rows/sec, never do this at scale. For the FCA seed the relational structure (products -> accounts -> transactions) means you need to commit per parent entity group because of the FK constraint; the trade-off is correctness over throughput. For pure flat-table seeding, batch into a single COPY and run inside one transaction.

Q2: Walk through the FK ordering for an accounts/transactions bulk seed.

A: Three layers. (1) Insert products first because accounts.product_id references it. Commit each product because later inserts need its ID. (2) Insert accounts per customer with customer_id and product_id set; commit per customer so a single fatal FK doesn’t roll back the whole batch. (3) Insert transactions referencing account_id. Commit-per-account keeps the failure radius small. Never rely on metadata.create_all ordering alone — deferrable constraints and concurrent inserts can still fail. Use BEGIN; SET CONSTRAINTS ALL DEFERRED; ... COMMIT; if you want full atomic load (and the schema supports it).

Q3: Why is “variance report” better than a boolean success/fail for batch jobs?

A: A boolean hides partial-success. 99/100 customers seeding succeeds, then a single conflict wipes the rest, but if you only check the exit code you never see that the run was 94% successful. A variance table (expected=100, actual=94, variance=-6) tells the operator WHICH entity failed and by how much. The automated next step is then obvious: “re-run the missing 6”, versus “re-run the whole batch, hope nothing else broke”. CI can also gate on success_rate < 99.0 to fail the deploy, which is much more defensive than just exit 0.

Last updated on