"""Task queue helpers for SAQ background tasks.
This module provides utilities for enqueueing background tasks from
service layers. It handles lazy imports to avoid circular dependencies
and provides consistent error handling.
Example::
from pydotorg.lib.tasks import enqueue_task
async def approve_job(self, job_id: UUID) -> Job:
job = await self.repository.update(job_id, status=JobStatus.APPROVED)
# Enqueue background tasks - best effort
await enqueue_task("index_job", job_id=str(job.id))
await enqueue_task(
"send_job_approved_email",
to_email=job.creator.email,
job_title=job.job_title,
company_name=job.company_name,
job_url=f"/jobs/{job.slug}/",
)
return job
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any, Final
if TYPE_CHECKING:
from saq import Queue
logger = logging.getLogger(__name__)
DEFAULT_RETRIES: Final = 3
[docs]
def get_queue() -> Queue:
"""Get the SAQ queue lazily to avoid circular imports.
Returns:
Configured SAQ Queue instance
"""
from pydotorg.tasks.worker import queue
return queue
[docs]
async def enqueue_task(
task_name: str,
*,
timeout: int | None = None,
retries: int = DEFAULT_RETRIES,
**kwargs: Any,
) -> str | None:
"""Enqueue a background task by name.
This is the primary helper for enqueueing tasks from service layers.
Provides error handling and returns the job key for tracking.
The task function must be registered in the worker's get_task_functions().
Task arguments should match the function signature (keyword-only args after ctx).
Args:
task_name: Name of the task function to enqueue (must be registered in worker).
timeout: Optional task timeout in seconds.
retries: Number of retry attempts (default: 3).
**kwargs: Task-specific arguments passed to the task function.
Returns:
Job key (str) if successful, None if enqueue failed
Example::
# Enqueue email after user registration
job_key = await enqueue_task(
"send_verification_email",
to_email=user.email,
username=user.username,
verification_link=verification_link,
)
if job_key:
logger.info(f"Verification email queued: {job_key}")
# Enqueue search indexing after job approval
await enqueue_task("index_job", job_id=str(job.id))
"""
queue = get_queue()
try:
job_kwargs = kwargs.copy()
if timeout:
job_kwargs["timeout"] = timeout
if retries != DEFAULT_RETRIES:
job_kwargs["retries"] = retries
job = await queue.enqueue(task_name, **job_kwargs)
if job is None:
logger.warning(
"Task enqueue returned None (may already be enqueued)",
extra={"task_name": task_name, "kwargs": kwargs},
)
return None
except Exception:
logger.exception(
"Failed to enqueue task",
extra={"task_name": task_name, "kwargs": kwargs},
)
return None
else:
return job.key
[docs]
async def enqueue_task_safe(
task_name: str,
*,
timeout: int | None = None,
retries: int = DEFAULT_RETRIES,
**kwargs: Any,
) -> tuple[bool, str | None]:
"""Enqueue a task and return success status with optional job key.
Variant of enqueue_task that returns a tuple for easier error handling
in conditional flows.
Args:
task_name: Name of the task function to enqueue
timeout: Optional task timeout in seconds
retries: Number of retry attempts
**kwargs: Task-specific arguments
Returns:
Tuple of (success: bool, job_key: str | None)
Example::
success, job_key = await enqueue_task_safe("send_email", to_email="user@example.com")
if not success:
# Handle failure - maybe log or retry later
pass
"""
job_key = await enqueue_task(task_name, timeout=timeout, retries=retries, **kwargs)
return (job_key is not None, job_key)