Source code for pydotorg.lib.tasks

"""Task queue helpers for SAQ background tasks.

This module provides utilities for enqueueing background tasks from
service layers. It handles lazy imports to avoid circular dependencies
and provides consistent error handling.

Example::

    from pydotorg.lib.tasks import enqueue_task


    async def approve_job(self, job_id: UUID) -> Job:
        job = await self.repository.update(job_id, status=JobStatus.APPROVED)

        # Enqueue background tasks - best effort
        await enqueue_task("index_job", job_id=str(job.id))
        await enqueue_task(
            "send_job_approved_email",
            to_email=job.creator.email,
            job_title=job.job_title,
            company_name=job.company_name,
            job_url=f"/jobs/{job.slug}/",
        )

        return job
"""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any, Final

if TYPE_CHECKING:
    from saq import Queue

logger = logging.getLogger(__name__)

DEFAULT_RETRIES: Final = 3


[docs] def get_queue() -> Queue: """Get the SAQ queue lazily to avoid circular imports. Returns: Configured SAQ Queue instance """ from pydotorg.tasks.worker import queue return queue
[docs] async def enqueue_task( task_name: str, *, timeout: int | None = None, retries: int = DEFAULT_RETRIES, **kwargs: Any, ) -> str | None: """Enqueue a background task by name. This is the primary helper for enqueueing tasks from service layers. Provides error handling and returns the job key for tracking. The task function must be registered in the worker's get_task_functions(). Task arguments should match the function signature (keyword-only args after ctx). Args: task_name: Name of the task function to enqueue (must be registered in worker). timeout: Optional task timeout in seconds. retries: Number of retry attempts (default: 3). **kwargs: Task-specific arguments passed to the task function. Returns: Job key (str) if successful, None if enqueue failed Example:: # Enqueue email after user registration job_key = await enqueue_task( "send_verification_email", to_email=user.email, username=user.username, verification_link=verification_link, ) if job_key: logger.info(f"Verification email queued: {job_key}") # Enqueue search indexing after job approval await enqueue_task("index_job", job_id=str(job.id)) """ queue = get_queue() try: job_kwargs = kwargs.copy() if timeout: job_kwargs["timeout"] = timeout if retries != DEFAULT_RETRIES: job_kwargs["retries"] = retries job = await queue.enqueue(task_name, **job_kwargs) if job is None: logger.warning( "Task enqueue returned None (may already be enqueued)", extra={"task_name": task_name, "kwargs": kwargs}, ) return None except Exception: logger.exception( "Failed to enqueue task", extra={"task_name": task_name, "kwargs": kwargs}, ) return None else: return job.key
[docs] async def enqueue_task_safe( task_name: str, *, timeout: int | None = None, retries: int = DEFAULT_RETRIES, **kwargs: Any, ) -> tuple[bool, str | None]: """Enqueue a task and return success status with optional job key. Variant of enqueue_task that returns a tuple for easier error handling in conditional flows. Args: task_name: Name of the task function to enqueue timeout: Optional task timeout in seconds retries: Number of retry attempts **kwargs: Task-specific arguments Returns: Tuple of (success: bool, job_key: str | None) Example:: success, job_key = await enqueue_task_safe("send_email", to_email="user@example.com") if not success: # Handle failure - maybe log or retry later pass """ job_key = await enqueue_task(task_name, timeout=timeout, retries=retries, **kwargs) return (job_key is not None, job_key)