Source code for pydotorg.domains.users.services

"""User domain services for business logic."""

from __future__ import annotations

from typing import TYPE_CHECKING

from advanced_alchemy.service import SQLAlchemyAsyncRepositoryService

from pydotorg.domains.users.api_keys import APIKey
from pydotorg.domains.users.api_keys import APIKeyService as APIKeyGenerator
from pydotorg.domains.users.models import Membership, User, UserGroup
from pydotorg.domains.users.repositories import (
    APIKeyRepository,
    MembershipRepository,
    UserGroupRepository,
    UserRepository,
)
from pydotorg.domains.users.security import hash_password

if TYPE_CHECKING:
    from uuid import UUID

    from pydotorg.domains.users.schemas import APIKeyCreate, UserCreate


[docs] class UserService(SQLAlchemyAsyncRepositoryService[User]): """Service for User business logic.""" repository_type = UserRepository match_fields = ["email", "username"]
[docs] async def create_user(self, data: UserCreate) -> User: """Create a new user with hashed password. Args: data: User creation data including plain-text password. Returns: The created user instance. Raises: ValueError: If email or username already exists. """ if await self.repository.exists_by_email(data.email): msg = f"User with email {data.email} already exists" raise ValueError(msg) if await self.repository.exists_by_username(data.username): msg = f"User with username {data.username} already exists" raise ValueError(msg) user_data = data.model_dump(exclude={"password"}) user_data["password_hash"] = hash_password(data.password) return await self.create(user_data)
[docs] async def get_by_email(self, email: str) -> User | None: """Get a user by email address. Args: email: The email address to search for. Returns: The user if found, None otherwise. """ return await self.repository.get_by_email(email)
[docs] async def get_by_username(self, username: str) -> User | None: """Get a user by username. Args: username: The username to search for. Returns: The user if found, None otherwise. """ return await self.repository.get_by_username(username)
[docs] async def deactivate(self, user_id: UUID) -> User: """Deactivate a user account. Args: user_id: The ID of the user to deactivate. Returns: The updated user instance. """ user = await self.get(user_id) user.is_active = False return await self.update(user_id, {"is_active": False})
[docs] async def reactivate(self, user_id: UUID) -> User: """Reactivate a user account. Args: user_id: The ID of the user to reactivate. Returns: The updated user instance. """ user = await self.get(user_id) user.is_active = True return await self.update(user_id, {"is_active": True})
[docs] class MembershipService(SQLAlchemyAsyncRepositoryService[Membership]): """Service for Membership business logic.""" repository_type = MembershipRepository match_fields = ["user_id"]
[docs] async def get_by_user_id(self, user_id: UUID) -> Membership | None: """Get a membership by user ID. Args: user_id: The user ID to search for. Returns: The membership if found, None otherwise. """ return await self.repository.get_by_user_id(user_id)
[docs] async def create_for_user(self, user_id: UUID, membership_data: dict) -> Membership: """Create a membership for a specific user. Args: user_id: The ID of the user to create membership for. membership_data: The membership data. Returns: The created membership instance. Raises: ValueError: If user already has a membership. """ existing = await self.get_by_user_id(user_id) if existing: msg = f"User {user_id} already has a membership" raise ValueError(msg) membership_data["user_id"] = user_id return await self.create(membership_data)
[docs] class UserGroupService(SQLAlchemyAsyncRepositoryService[UserGroup]): """Service for UserGroup business logic.""" repository_type = UserGroupRepository match_fields = ["name"]
[docs] async def list_approved(self, limit: int = 100, offset: int = 0) -> list[UserGroup]: """List approved user groups. Args: limit: Maximum number of groups to return. offset: Number of groups to skip. Returns: List of approved user groups. """ return await self.repository.list_approved(limit=limit, offset=offset)
[docs] async def list_trusted(self, limit: int = 100, offset: int = 0) -> list[UserGroup]: """List trusted user groups. Args: limit: Maximum number of groups to return. offset: Number of groups to skip. Returns: List of trusted user groups. """ return await self.repository.list_trusted(limit=limit, offset=offset)
[docs] async def approve(self, group_id: UUID) -> UserGroup: """Approve a user group. Args: group_id: The ID of the group to approve. Returns: The updated group instance. """ return await self.update(group_id, {"approved": True})
[docs] async def revoke_approval(self, group_id: UUID) -> UserGroup: """Revoke approval of a user group. Args: group_id: The ID of the group to revoke approval. Returns: The updated group instance. """ return await self.update(group_id, {"approved": False})
[docs] async def mark_trusted(self, group_id: UUID) -> UserGroup: """Mark a user group as trusted. Args: group_id: The ID of the group to mark as trusted. Returns: The updated group instance. """ return await self.update(group_id, {"trusted": True})
[docs] async def revoke_trust(self, group_id: UUID) -> UserGroup: """Revoke trust of a user group. Args: group_id: The ID of the group to revoke trust. Returns: The updated group instance. """ return await self.update(group_id, {"trusted": False})
[docs] class APIKeyService(SQLAlchemyAsyncRepositoryService[APIKey]): """Service for API key management.""" repository_type = APIKeyRepository match_fields = ["key_hash"]
[docs] async def create_key(self, user_id: UUID, data: APIKeyCreate) -> tuple[APIKey, str]: """Create a new API key for a user. Args: user_id: The user ID to create the key for. data: API key creation data. Returns: Tuple of (APIKey instance, raw key string). """ api_key, raw_key = APIKeyGenerator.create_key( user_id=user_id, name=data.name, description=data.description, expires_in_days=data.expires_in_days, ) created = await self.create(api_key.__dict__) return created, raw_key
[docs] async def list_by_user(self, user_id: UUID) -> list[APIKey]: """List all API keys for a user. Args: user_id: The user ID. Returns: List of API keys. """ return await self.repository.list_by_user(user_id)
[docs] async def list_active_by_user(self, user_id: UUID) -> list[APIKey]: """List active API keys for a user. Args: user_id: The user ID. Returns: List of active API keys. """ return await self.repository.list_active_by_user(user_id)
[docs] async def revoke(self, key_id: UUID) -> APIKey: """Revoke an API key. Args: key_id: The API key ID. Returns: The revoked API key. """ return await self.update(key_id, {"is_active": False})
[docs] async def revoke_all_for_user(self, user_id: UUID) -> int: """Revoke all API keys for a user. Args: user_id: The user ID. Returns: Number of keys revoked. """ return await self.repository.revoke_all_for_user(user_id)