Source code for pydotorg.domains.sponsors.services

"""Sponsors domain services for business logic."""

from __future__ import annotations

import datetime
from datetime import UTC
from typing import TYPE_CHECKING

from advanced_alchemy.service import SQLAlchemyAsyncRepositoryService

from pydotorg.domains.sponsors.models import (
    Contract,
    ContractStatus,
    LegalClause,
    Sponsor,
    Sponsorship,
    SponsorshipLevel,
    SponsorshipStatus,
)
from pydotorg.domains.sponsors.repositories import (
    ContractRepository,
    LegalClauseRepository,
    SponsorRepository,
    SponsorshipLevelRepository,
    SponsorshipRepository,
)

if TYPE_CHECKING:
    from uuid import UUID

    from pydotorg.domains.sponsors.schemas import (
        SponsorCreate,
        SponsorshipCreate,
        SponsorshipLevelCreate,
    )


[docs] class InvalidContractStatusError(Exception): """Raised when a contract status transition is not valid."""
[docs] class SponsorshipLevelService(SQLAlchemyAsyncRepositoryService[SponsorshipLevel]): """Service for SponsorshipLevel business logic.""" repository_type = SponsorshipLevelRepository match_fields = ["slug"]
[docs] async def create_level(self, data: SponsorshipLevelCreate) -> SponsorshipLevel: """Create a new sponsorship level. Args: data: Sponsorship level creation data. Returns: The created sponsorship level instance. Raises: ValueError: If slug already exists. """ if data.slug and await self.repository.exists_by_slug(data.slug): msg = f"Sponsorship level with slug {data.slug} already exists" raise ValueError(msg) level_data = data.model_dump() if not level_data.get("slug") and level_data.get("name"): level_data["slug"] = SponsorshipLevel.generate_slug(level_data["name"]) return await self.create(level_data)
[docs] async def get_by_slug(self, slug: str) -> SponsorshipLevel | None: """Get a sponsorship level by its slug. Args: slug: The slug to search for. Returns: The sponsorship level if found, None otherwise. """ return await self.repository.get_by_slug(slug)
[docs] async def list_ordered(self, limit: int = 100, offset: int = 0) -> list[SponsorshipLevel]: """List sponsorship levels ordered by order field. Args: limit: Maximum number of levels to return. offset: Number of levels to skip. Returns: List of sponsorship levels ordered by order field. """ return await self.repository.list_ordered(limit=limit, offset=offset)
[docs] class SponsorService(SQLAlchemyAsyncRepositoryService[Sponsor]): """Service for Sponsor business logic.""" repository_type = SponsorRepository match_fields = ["slug"]
[docs] async def create_sponsor(self, data: SponsorCreate) -> Sponsor: """Create a new sponsor. Args: data: Sponsor creation data. Returns: The created sponsor instance. Raises: ValueError: If slug already exists. """ if data.slug and await self.repository.exists_by_slug(data.slug): msg = f"Sponsor with slug {data.slug} already exists" raise ValueError(msg) sponsor_data = data.model_dump() if not sponsor_data.get("slug") and sponsor_data.get("name"): sponsor_data["slug"] = Sponsor.generate_slug(sponsor_data["name"]) return await self.create(sponsor_data)
[docs] async def get_by_slug(self, slug: str) -> Sponsor | None: """Get a sponsor by its slug. Args: slug: The slug to search for. Returns: The sponsor if found, None otherwise. """ return await self.repository.get_by_slug(slug)
[docs] async def list_with_active_sponsorships( self, limit: int = 100, offset: int = 0, ) -> list[Sponsor]: """List sponsors with active sponsorships. Args: limit: Maximum number of sponsors to return. offset: Number of sponsors to skip. Returns: List of sponsors with active sponsorships. """ return await self.repository.list_with_active_sponsorships(limit=limit, offset=offset)
[docs] class SponsorshipService(SQLAlchemyAsyncRepositoryService[Sponsorship]): """Service for Sponsorship business logic.""" repository_type = SponsorshipRepository match_fields = ["sponsor_id", "level_id"]
[docs] async def create_sponsorship(self, data: SponsorshipCreate) -> Sponsorship: """Create a new sponsorship. Args: data: Sponsorship creation data. Returns: The created sponsorship instance. """ sponsorship_data = data.model_dump() if not sponsorship_data.get("applied_on"): sponsorship_data["applied_on"] = datetime.datetime.now(tz=UTC).date() return await self.create(sponsorship_data)
[docs] async def list_by_sponsor_id( self, sponsor_id: UUID, limit: int = 100, offset: int = 0, ) -> list[Sponsorship]: """List sponsorships for a specific sponsor. Args: sponsor_id: The sponsor ID to filter by. limit: Maximum number of sponsorships to return. offset: Number of sponsorships to skip. Returns: List of sponsorships for the sponsor. """ return await self.repository.list_by_sponsor_id(sponsor_id, limit=limit, offset=offset)
[docs] async def list_by_level_id( self, level_id: UUID, limit: int = 100, offset: int = 0, ) -> list[Sponsorship]: """List sponsorships for a specific level. Args: level_id: The level ID to filter by. limit: Maximum number of sponsorships to return. offset: Number of sponsorships to skip. Returns: List of sponsorships for the level. """ return await self.repository.list_by_level_id(level_id, limit=limit, offset=offset)
[docs] async def list_by_status( self, status: SponsorshipStatus, limit: int = 100, offset: int = 0, ) -> list[Sponsorship]: """List sponsorships by status. Args: status: The status to filter by. limit: Maximum number of sponsorships to return. offset: Number of sponsorships to skip. Returns: List of sponsorships with the given status. """ return await self.repository.list_by_status(status, limit=limit, offset=offset)
[docs] async def list_active(self, limit: int = 100, offset: int = 0) -> list[Sponsorship]: """List active sponsorships. Args: limit: Maximum number of sponsorships to return. offset: Number of sponsorships to skip. Returns: List of active sponsorships. """ return await self.repository.list_active(limit=limit, offset=offset)
[docs] async def approve(self, sponsorship_id: UUID) -> Sponsorship: """Approve a sponsorship. Args: sponsorship_id: The ID of the sponsorship to approve. Returns: The updated sponsorship instance. """ return await self.update( sponsorship_id, { "status": SponsorshipStatus.APPROVED, "approved_on": datetime.datetime.now(tz=UTC).date(), }, )
[docs] async def reject(self, sponsorship_id: UUID) -> Sponsorship: """Reject a sponsorship. Args: sponsorship_id: The ID of the sponsorship to reject. Returns: The updated sponsorship instance. """ return await self.update( sponsorship_id, { "status": SponsorshipStatus.REJECTED, "rejected_on": datetime.datetime.now(tz=UTC).date(), }, )
[docs] async def finalize(self, sponsorship_id: UUID) -> Sponsorship: """Finalize a sponsorship. Args: sponsorship_id: The ID of the sponsorship to finalize. Returns: The updated sponsorship instance. """ return await self.update( sponsorship_id, { "status": SponsorshipStatus.FINALIZED, "finalized_on": datetime.datetime.now(tz=UTC).date(), }, )
[docs] async def approve_with_renewal( self, sponsorship_id: UUID, start_date: datetime.date, end_date: datetime.date, sponsorship_fee: int | None = None, *, is_renewal: bool = False, ) -> Sponsorship: """Approve a sponsorship with full approval data. Args: sponsorship_id: The ID of the sponsorship to approve. start_date: The sponsorship start date. end_date: The sponsorship end date. sponsorship_fee: Optional sponsorship fee override. is_renewal: Whether this is a renewal of a previous sponsorship. Returns: The updated sponsorship instance. """ update_data: dict[str, datetime.date | int | bool | SponsorshipStatus] = { "status": SponsorshipStatus.APPROVED, "approved_on": datetime.datetime.now(tz=UTC).date(), "start_date": start_date, "end_date": end_date, "year": start_date.year, "renewal": is_renewal, } if sponsorship_fee is not None: update_data["sponsorship_fee"] = sponsorship_fee return await self.update(sponsorship_id, update_data)
[docs] async def get_previous_sponsorship(self, sponsorship: Sponsorship) -> Sponsorship | None: """Get the previous sponsorship for the same sponsor. Used to determine previous effective date for renewal contracts. Args: sponsorship: The current sponsorship. Returns: The previous sponsorship if found, None otherwise. """ return await self.repository.get_previous_for_sponsor( sponsor_id=sponsorship.sponsor_id, current_year=sponsorship.year, )
[docs] async def list_expiring_soon( self, days: int = 90, limit: int = 100, offset: int = 0, ) -> list[Sponsorship]: """List sponsorships expiring within the given number of days. Useful for sending renewal reminders. Args: days: Number of days until expiration. limit: Maximum number of sponsorships to return. offset: Number of sponsorships to skip. Returns: List of sponsorships expiring within the given timeframe. """ return await self.repository.list_expiring_soon(days=days, limit=limit, offset=offset)
[docs] async def create_renewal( self, previous_sponsorship: Sponsorship, level_id: UUID | None = None, ) -> Sponsorship: """Create a renewal sponsorship based on a previous one. Args: previous_sponsorship: The sponsorship being renewed. level_id: Optional new level ID (defaults to same level). Returns: The new renewal sponsorship instance. """ return await self.create( { "sponsor_id": previous_sponsorship.sponsor_id, "level_id": level_id or previous_sponsorship.level_id, "status": SponsorshipStatus.APPLIED, "applied_on": datetime.datetime.now(tz=UTC).date(), "renewal": True, } )
[docs] class LegalClauseService(SQLAlchemyAsyncRepositoryService[LegalClause]): """Service for LegalClause business logic.""" repository_type = LegalClauseRepository match_fields = ["slug"]
[docs] async def get_by_slug(self, slug: str) -> LegalClause | None: """Get a legal clause by its slug. Args: slug: The slug to search for. Returns: The legal clause if found, None otherwise. """ return await self.repository.get_by_slug(slug)
[docs] async def list_active(self, limit: int = 100, offset: int = 0) -> list[LegalClause]: """List active legal clauses ordered by order field. Args: limit: Maximum number of clauses to return. offset: Number of clauses to skip. Returns: List of active legal clauses. """ return await self.repository.list_active(limit=limit, offset=offset)
[docs] class ContractService(SQLAlchemyAsyncRepositoryService[Contract]): """Service for Contract business logic and workflow management.""" repository_type = ContractRepository
[docs] async def create_contract(self, sponsorship_id: UUID) -> Contract: """Create a new contract for a sponsorship. Initializes a draft contract with sponsor information from the sponsorship. Args: sponsorship_id: The sponsorship ID to create contract for. Returns: The created contract instance. """ sponsorship = await self.repository.session.get(Sponsorship, sponsorship_id) if not sponsorship: msg = f"Sponsorship {sponsorship_id} not found" raise ValueError(msg) sponsor = sponsorship.sponsor sponsor_info = f"{sponsor.name}" if sponsor.description: sponsor_info += f" - {sponsor.description}" sponsor_contact = sponsor.full_address if sponsor.primary_phone: sponsor_contact += f" | Phone: {sponsor.primary_phone}" return await self.create( { "sponsorship_id": sponsorship_id, "sponsor_info": sponsor_info, "sponsor_contact": sponsor_contact, "status": ContractStatus.DRAFT, } )
[docs] async def get_by_sponsorship_id(self, sponsorship_id: UUID) -> Contract | None: """Get a contract by sponsorship ID. Args: sponsorship_id: The sponsorship ID to search for. Returns: The contract if found, None otherwise. """ return await self.repository.get_by_sponsorship_id(sponsorship_id)
[docs] async def list_by_status( self, status: ContractStatus, limit: int = 100, offset: int = 0, ) -> list[Contract]: """List contracts by status. Args: status: The status to filter by. limit: Maximum number of contracts to return. offset: Number of contracts to skip. Returns: List of contracts with the given status. """ return await self.repository.list_by_status(status, limit=limit, offset=offset)
[docs] async def send_for_signature( self, contract_id: UUID, document_pdf: str = "", document_docx: str = "", ) -> Contract: """Send contract for signature. Transitions contract from DRAFT to AWAITING_SIGNATURE. Args: contract_id: The contract ID. document_pdf: Path to the unsigned PDF document. document_docx: Path to the unsigned DOCX document. Returns: The updated contract instance. Raises: InvalidContractStatusError: If transition is not valid. """ contract = await self.get(contract_id) if not contract.can_send: msg = f"Cannot send contract in {contract.status} status for signature" raise InvalidContractStatusError(msg) contract.revision += 1 return await self.update( contract_id, { "status": ContractStatus.AWAITING_SIGNATURE, "document_pdf": document_pdf, "document_docx": document_docx, "sent_on": datetime.datetime.now(tz=UTC).date(), "revision": contract.revision, }, )
[docs] async def execute_contract( self, contract_id: UUID, signed_document: str = "", ) -> Contract: """Execute a contract. Marks the contract as executed and finalizes the associated sponsorship. Args: contract_id: The contract ID. signed_document: Path to the signed document. Returns: The updated contract instance. Raises: InvalidContractStatusError: If transition is not valid. """ contract = await self.get(contract_id) if not contract.can_execute: msg = f"Cannot execute contract in {contract.status} status" raise InvalidContractStatusError(msg) contract = await self.update( contract_id, { "status": ContractStatus.EXECUTED, "signed_document": signed_document, "executed_on": datetime.datetime.now(tz=UTC).date(), }, ) sponsorship = contract.sponsorship sponsorship.status = SponsorshipStatus.FINALIZED sponsorship.finalized_on = datetime.datetime.now(tz=UTC).date() sponsorship.locked = True await self.repository.session.commit() return contract
[docs] async def nullify_contract(self, contract_id: UUID) -> Contract: """Nullify a contract. Args: contract_id: The contract ID. Returns: The updated contract instance. Raises: InvalidContractStatusError: If transition is not valid. """ contract = await self.get(contract_id) if not contract.can_nullify: msg = f"Cannot nullify contract in {contract.status} status" raise InvalidContractStatusError(msg) return await self.update( contract_id, {"status": ContractStatus.NULLIFIED}, )
[docs] async def revert_to_draft(self, contract_id: UUID) -> Contract: """Revert a nullified contract back to draft. Args: contract_id: The contract ID. Returns: The updated contract instance. Raises: InvalidContractStatusError: If transition is not valid. """ contract = await self.get(contract_id) if ContractStatus.DRAFT not in contract.next_statuses: msg = f"Cannot revert contract in {contract.status} status to draft" raise InvalidContractStatusError(msg) return await self.update( contract_id, {"status": ContractStatus.DRAFT}, )
[docs] async def update_benefits_and_clauses( self, contract_id: UUID, benefits_list: str, legal_clauses_text: str, ) -> Contract: """Update contract benefits list and legal clauses. Args: contract_id: The contract ID. benefits_list: Markdown formatted list of benefits. legal_clauses_text: Markdown formatted legal clauses. Returns: The updated contract instance. """ contract = await self.get(contract_id) if not contract.is_draft: msg = "Can only update benefits and clauses on draft contracts" raise InvalidContractStatusError(msg) return await self.update( contract_id, { "benefits_list": benefits_list, "legal_clauses_text": legal_clauses_text, }, )