Library Utilities¶
Shared library utilities and helpers.
Module Contents¶
Shared library utilities.
- class APIVersion[source]¶
Bases:
objectRepresents an API version with deprecation metadata.
- __init__(major, minor, patch=0, deprecated=False, sunset_date=None)¶
- classmethod from_string(version_str)[source]¶
Parse version from string like ‘v1’ or ‘1.2.3’.
- Parameters:
version_str (
str) – Version string in format ‘v1’, ‘1’, ‘1.2’, or ‘1.2.3’- Return type:
- Returns:
Parsed APIVersion instance
- Raises:
ValueError – If version string is invalid
- class APIVersionMiddleware[source]¶
Bases:
MiddlewareProtocolMiddleware that negotiates API version and adds version headers.
Version negotiation order: 1. Accept header (application/vnd.pydotorg.v1+json) 2. URL path prefix (/api/v1/…) 3. Query parameter (?api_version=1) 4. Default version (v1)
Adds response headers: - X-API-Version: Current version used - X-API-Deprecated: true/false - X-API-Sunset-Date: ISO date if deprecated
- app: ASGIApp¶
- ACCEPT_PATTERN = re.compile('application/vnd\\.pydotorg\\.v(\\d+)\\+json')¶
- PATH_PATTERN = re.compile('^/api/v(\\d+)/')¶
- async __call__(scope, receive, send)[source]¶
Process the request and add version headers to response.
- Parameters:
scope (
Union[HTTPScope,WebSocketScope]) – The ASGI scopereceive (
Callable[...,Awaitable[Union[HTTPRequestEvent,HTTPDisconnectEvent,WebSocketConnectEvent,WebSocketReceiveEvent,WebSocketDisconnectEvent]]]) – The receive callablesend (
Callable[[Union[HTTPResponseStartEvent,HTTPResponseBodyEvent,HTTPServerPushEvent,HTTPDisconnectEvent,WebSocketAcceptEvent,WebSocketSendEvent,WebSocketResponseStartEvent,WebSocketResponseBodyEvent,WebSocketCloseEvent]],Awaitable[None]]) – The send callable
- Return type:
- __init__(app)[source]¶
Initialize the middleware.
- Parameters:
app (
Callable[[Union[HTTPScope,WebSocketScope],Callable[...,Awaitable[Union[HTTPRequestEvent,HTTPDisconnectEvent,WebSocketConnectEvent,WebSocketReceiveEvent,WebSocketDisconnectEvent]]],Callable[[Union[HTTPResponseStartEvent,HTTPResponseBodyEvent,HTTPServerPushEvent,HTTPDisconnectEvent,WebSocketAcceptEvent,WebSocketSendEvent,WebSocketResponseStartEvent,WebSocketResponseBodyEvent,WebSocketCloseEvent]],Awaitable[None]]],Awaitable[None]]) – The ASGI application
- class APIVersionRegistry[source]¶
Bases:
objectRegistry of supported API versions with deprecation metadata.
- get_default()[source]¶
Get the default API version.
- Return type:
- Returns:
The default APIVersion
- Raises:
RuntimeError – If no default version is set
- get_version(major)[source]¶
Get a version by major version number.
- Parameters:
major (
int) – The major version number- Return type:
- Returns:
The APIVersion if found, None otherwise
- is_supported(version)[source]¶
Check if a version is supported.
- Parameters:
version (
APIVersion) – The version to check- Return type:
- Returns:
True if the version is supported, False otherwise
- register(version, *, is_default=False)[source]¶
Register a new API version.
- Parameters:
version (
APIVersion) – The version to register (with deprecation metadata already set)is_default (
bool) – Whether this should be the default version
- Return type:
- add_version(major, minor=0, patch=0, *, is_default=False)[source]¶
Add a new API version to the registry.
- Parameters:
- Return type:
- Returns:
The created APIVersion
Example
>>> add_version(2, 0, 0, is_default=True) APIVersion(major=2, minor=0, patch=0)
API Versioning¶
API versioning utilities and decorators.
API versioning middleware with version negotiation support.
Provides version negotiation via Accept header, URL path, and query parameters. Includes deprecation warnings for old API versions.
- class APIVersion[source]¶
Bases:
objectRepresents an API version with deprecation metadata.
- classmethod from_string(version_str)[source]¶
Parse version from string like ‘v1’ or ‘1.2.3’.
- Parameters:
version_str (
str) – Version string in format ‘v1’, ‘1’, ‘1.2’, or ‘1.2.3’- Return type:
- Returns:
Parsed APIVersion instance
- Raises:
ValueError – If version string is invalid
- __init__(major, minor, patch=0, deprecated=False, sunset_date=None)¶
- class APIVersionRegistry[source]¶
Bases:
objectRegistry of supported API versions with deprecation metadata.
- register(version, *, is_default=False)[source]¶
Register a new API version.
- Parameters:
version (
APIVersion) – The version to register (with deprecation metadata already set)is_default (
bool) – Whether this should be the default version
- Return type:
- get_version(major)[source]¶
Get a version by major version number.
- Parameters:
major (
int) – The major version number- Return type:
- Returns:
The APIVersion if found, None otherwise
- get_default()[source]¶
Get the default API version.
- Return type:
- Returns:
The default APIVersion
- Raises:
RuntimeError – If no default version is set
- is_supported(version)[source]¶
Check if a version is supported.
- Parameters:
version (
APIVersion) – The version to check- Return type:
- Returns:
True if the version is supported, False otherwise
- class APIVersionMiddleware[source]¶
Bases:
MiddlewareProtocolMiddleware that negotiates API version and adds version headers.
Version negotiation order: 1. Accept header (application/vnd.pydotorg.v1+json) 2. URL path prefix (/api/v1/…) 3. Query parameter (?api_version=1) 4. Default version (v1)
Adds response headers: - X-API-Version: Current version used - X-API-Deprecated: true/false - X-API-Sunset-Date: ISO date if deprecated
- ACCEPT_PATTERN = re.compile('application/vnd\\.pydotorg\\.v(\\d+)\\+json')¶
- PATH_PATTERN = re.compile('^/api/v(\\d+)/')¶
- __init__(app)[source]¶
Initialize the middleware.
- Parameters:
app (
Callable[[Union[HTTPScope,WebSocketScope],Callable[...,Awaitable[Union[HTTPRequestEvent,HTTPDisconnectEvent,WebSocketConnectEvent,WebSocketReceiveEvent,WebSocketDisconnectEvent]]],Callable[[Union[HTTPResponseStartEvent,HTTPResponseBodyEvent,HTTPServerPushEvent,HTTPDisconnectEvent,WebSocketAcceptEvent,WebSocketSendEvent,WebSocketResponseStartEvent,WebSocketResponseBodyEvent,WebSocketCloseEvent]],Awaitable[None]]],Awaitable[None]]) – The ASGI application
- app: ASGIApp¶
- async __call__(scope, receive, send)[source]¶
Process the request and add version headers to response.
- Parameters:
scope (
Union[HTTPScope,WebSocketScope]) – The ASGI scopereceive (
Callable[...,Awaitable[Union[HTTPRequestEvent,HTTPDisconnectEvent,WebSocketConnectEvent,WebSocketReceiveEvent,WebSocketDisconnectEvent]]]) – The receive callablesend (
Callable[[Union[HTTPResponseStartEvent,HTTPResponseBodyEvent,HTTPServerPushEvent,HTTPDisconnectEvent,WebSocketAcceptEvent,WebSocketSendEvent,WebSocketResponseStartEvent,WebSocketResponseBodyEvent,WebSocketCloseEvent]],Awaitable[None]]) – The send callable
- Return type:
- deprecate_version(major, sunset_date=None)[source]¶
Mark an API version as deprecated.
- Parameters:
- Return type:
Example
>>> from datetime import date >>> deprecate_version(1, sunset_date=date(2025, 12, 31))
Tasks¶
Shared task utilities.
Task queue helpers for SAQ background tasks.
This module provides utilities for enqueueing background tasks from service layers. It handles lazy imports to avoid circular dependencies and provides consistent error handling.
Example:
from pydotorg.lib.tasks import enqueue_task
async def approve_job(self, job_id: UUID) -> Job:
job = await self.repository.update(job_id, status=JobStatus.APPROVED)
# Enqueue background tasks - best effort
await enqueue_task("index_job", job_id=str(job.id))
await enqueue_task(
"send_job_approved_email",
to_email=job.creator.email,
job_title=job.job_title,
company_name=job.company_name,
job_url=f"/jobs/{job.slug}/",
)
return job
- get_queue()[source]¶
Get the SAQ queue lazily to avoid circular imports.
- Return type:
Queue- Returns:
Configured SAQ Queue instance
- async enqueue_task(task_name, *, timeout=None, retries=3, **kwargs)[source]¶
Enqueue a background task by name.
This is the primary helper for enqueueing tasks from service layers. Provides error handling and returns the job key for tracking.
The task function must be registered in the worker’s get_task_functions(). Task arguments should match the function signature (keyword-only args after ctx).
- Parameters:
- Return type:
- Returns:
Job key (str) if successful, None if enqueue failed
Example:
# Enqueue email after user registration job_key = await enqueue_task( "send_verification_email", to_email=user.email, username=user.username, verification_link=verification_link, ) if job_key: logger.info(f"Verification email queued: {job_key}") # Enqueue search indexing after job approval await enqueue_task("index_job", job_id=str(job.id))
- async enqueue_task_safe(task_name, *, timeout=None, retries=3, **kwargs)[source]¶
Enqueue a task and return success status with optional job key.
Variant of enqueue_task that returns a tuple for easier error handling in conditional flows.
- Parameters:
- Returns:
bool, job_key: str | None)
- Return type:
Tuple of (success
Example:
success, job_key = await enqueue_task_safe("send_email", to_email="user@example.com") if not success: # Handle failure - maybe log or retry later pass