Database Recipes

Common database patterns and query recipes for litestar-pydotorg.

Basic Queries

Get by ID

async def get_by_id(session: AsyncSession, model_id: UUID) -> Model | None:
    return await session.get(Model, model_id)

Get All with Pagination

from sqlalchemy import select

async def list_all(
    session: AsyncSession,
    *,
    limit: int = 100,
    offset: int = 0,
) -> Sequence[Model]:
    query = select(Model).limit(limit).offset(offset)
    result = await session.execute(query)
    return result.scalars().all()

Get with Filters

async def list_filtered(
    session: AsyncSession,
    *,
    is_active: bool | None = None,
    category: str | None = None,
) -> Sequence[Model]:
    query = select(Model)

    if is_active is not None:
        query = query.where(Model.is_active == is_active)

    if category is not None:
        query = query.where(Model.category == category)

    result = await session.execute(query)
    return result.scalars().all()

Relationship Loading

Eager Loading (Prevent N+1)

from sqlalchemy.orm import selectinload, joinedload

async def get_with_relations(
    session: AsyncSession,
    user_id: UUID,
) -> User | None:
    query = (
        select(User)
        .where(User.id == user_id)
        .options(
            selectinload(User.posts),
            selectinload(User.comments),
            joinedload(User.profile),
        )
    )
    result = await session.execute(query)
    return result.scalar_one_or_none()

Lazy Loading (When Needed)

async def get_user_posts(
    session: AsyncSession,
    user: User,
) -> Sequence[Post]:
    # Explicitly load relationship
    await session.refresh(user, ["posts"])
    return user.posts

Subquery Loading for Collections

from sqlalchemy.orm import subqueryload

async def list_users_with_posts(session: AsyncSession) -> Sequence[User]:
    query = (
        select(User)
        .options(subqueryload(User.posts))
        .where(User.is_active == True)
    )
    result = await session.execute(query)
    return result.scalars().unique().all()

Advanced Queries

Search with ILIKE

async def search(
    session: AsyncSession,
    query_text: str,
    *,
    limit: int = 20,
) -> Sequence[Model]:
    query = (
        select(Model)
        .where(Model.name.ilike(f"%{query_text}%"))
        .limit(limit)
    )
    result = await session.execute(query)
    return result.scalars().all()

Full-Text Search (PostgreSQL)

from sqlalchemy import func

async def full_text_search(
    session: AsyncSession,
    search_query: str,
) -> Sequence[Model]:
    query = (
        select(Model)
        .where(
            func.to_tsvector('english', Model.title + ' ' + Model.content)
            .match(search_query)
        )
        .order_by(
            func.ts_rank(
                func.to_tsvector('english', Model.title + ' ' + Model.content),
                func.plainto_tsquery('english', search_query)
            ).desc()
        )
    )
    result = await session.execute(query)
    return result.scalars().all()

Aggregate Queries

from sqlalchemy import func

async def get_statistics(session: AsyncSession) -> dict:
    query = select(
        func.count(Model.id).label("total"),
        func.count(Model.id).filter(Model.is_active == True).label("active"),
        func.avg(Model.price).label("avg_price"),
        func.max(Model.created_at).label("latest"),
    )
    result = await session.execute(query)
    row = result.one()

    return {
        "total": row.total,
        "active": row.active,
        "avg_price": float(row.avg_price or 0),
        "latest": row.latest,
    }

Group By with Having

async def get_category_counts(
    session: AsyncSession,
    min_count: int = 5,
) -> list[dict]:
    query = (
        select(
            Model.category,
            func.count(Model.id).label("count"),
        )
        .group_by(Model.category)
        .having(func.count(Model.id) >= min_count)
        .order_by(func.count(Model.id).desc())
    )
    result = await session.execute(query)

    return [
        {"category": row.category, "count": row.count}
        for row in result
    ]

Date Range Queries

from datetime import datetime, timedelta

async def get_recent(
    session: AsyncSession,
    days: int = 7,
) -> Sequence[Model]:
    since = datetime.utcnow() - timedelta(days=days)
    query = (
        select(Model)
        .where(Model.created_at >= since)
        .order_by(Model.created_at.desc())
    )
    result = await session.execute(query)
    return result.scalars().all()


async def get_by_date_range(
    session: AsyncSession,
    start_date: datetime,
    end_date: datetime,
) -> Sequence[Model]:
    query = (
        select(Model)
        .where(Model.created_at >= start_date)
        .where(Model.created_at <= end_date)
        .order_by(Model.created_at)
    )
    result = await session.execute(query)
    return result.scalars().all()

Transactions

Basic Transaction

async def transfer_funds(
    session: AsyncSession,
    from_id: UUID,
    to_id: UUID,
    amount: float,
) -> None:
    async with session.begin():
        from_account = await session.get(Account, from_id)
        to_account = await session.get(Account, to_id)

        if from_account.balance < amount:
            raise ValueError("Insufficient funds")

        from_account.balance -= amount
        to_account.balance += amount
        # Commits automatically on success

Nested Transactions (Savepoints)

async def process_order(session: AsyncSession, order: Order) -> None:
    async with session.begin_nested():
        try:
            # Process payment
            await process_payment(session, order)

            # Update inventory
            await update_inventory(session, order)

            # Send confirmation
            await send_confirmation(order)
        except PaymentError:
            # Rollback just this savepoint
            raise

Manual Transaction Control

async def bulk_import(session: AsyncSession, items: list[dict]) -> int:
    imported = 0

    try:
        for item in items:
            model = Model(**item)
            session.add(model)
            imported += 1

            # Flush periodically for large imports
            if imported % 100 == 0:
                await session.flush()

        await session.commit()
        return imported
    except Exception:
        await session.rollback()
        raise

Bulk Operations

Bulk Insert

async def bulk_insert(
    session: AsyncSession,
    items: list[dict],
) -> None:
    models = [Model(**item) for item in items]
    session.add_all(models)
    await session.flush()

Bulk Insert with Returning

from sqlalchemy.dialects.postgresql import insert

async def bulk_insert_returning(
    session: AsyncSession,
    items: list[dict],
) -> list[UUID]:
    stmt = insert(Model).values(items).returning(Model.id)
    result = await session.execute(stmt)
    return [row.id for row in result]

Bulk Update

from sqlalchemy import update

async def bulk_update(
    session: AsyncSession,
    updates: dict[UUID, dict],
) -> int:
    updated = 0
    for model_id, values in updates.items():
        stmt = (
            update(Model)
            .where(Model.id == model_id)
            .values(**values)
        )
        result = await session.execute(stmt)
        updated += result.rowcount
    await session.flush()
    return updated

Upsert (Insert or Update)

from sqlalchemy.dialects.postgresql import insert

async def upsert(
    session: AsyncSession,
    data: dict,
) -> Model:
    stmt = insert(Model).values(**data)
    stmt = stmt.on_conflict_do_update(
        index_elements=[Model.external_id],
        set_={
            "name": stmt.excluded.name,
            "updated_at": func.now(),
        }
    ).returning(Model)

    result = await session.execute(stmt)
    return result.scalar_one()

Soft Delete

Model with Soft Delete

from datetime import datetime

class SoftDeleteMixin:
    deleted_at: Mapped[datetime | None] = mapped_column(default=None)

    @property
    def is_deleted(self) -> bool:
        return self.deleted_at is not None


class Product(Base, SoftDeleteMixin):
    __tablename__ = "products"
    # ... other fields

Query Non-Deleted

async def list_active(session: AsyncSession) -> Sequence[Product]:
    query = select(Product).where(Product.deleted_at.is_(None))
    result = await session.execute(query)
    return result.scalars().all()

Soft Delete Operation

async def soft_delete(session: AsyncSession, product_id: UUID) -> None:
    product = await session.get(Product, product_id)
    if product:
        product.deleted_at = datetime.utcnow()
        await session.flush()

Restore Soft-Deleted

async def restore(session: AsyncSession, product_id: UUID) -> None:
    query = (
        select(Product)
        .where(Product.id == product_id)
        .where(Product.deleted_at.is_not(None))
    )
    result = await session.execute(query)
    product = result.scalar_one_or_none()

    if product:
        product.deleted_at = None
        await session.flush()

Connection Pool Management

Check Pool Status

from pydotorg.core.database import engine

def get_pool_status() -> dict:
    pool = engine.pool
    return {
        "size": pool.size(),
        "checked_out": pool.checkedout(),
        "overflow": pool.overflow(),
        "invalid": pool.invalidatedcount(),
    }

Configure Pool

# In config.py
engine = create_async_engine(
    DATABASE_URL,
    pool_size=5,           # Minimum connections
    max_overflow=10,       # Additional connections allowed
    pool_timeout=30,       # Wait time for connection
    pool_recycle=1800,     # Recycle connections after 30 minutes
    pool_pre_ping=True,    # Verify connections before use
)

See Also