Background Tasks
SAQ background task definitions for async processing.
Module Contents
Background tasks for litestar-pydotorg using SAQ.
This module provides access to all background tasks. To avoid circular imports,
access the queue and settings through the worker module directly:
from pydotorg.tasks.worker import queue, settings_dict, get_task_functions
The ALL_TASKS and ALL_CRON_JOBS lists are populated lazily.
-
get_all_cron_jobs()[source]
Get all cron jobs lazily to avoid circular imports.
- Return type:
list[Any]
-
get_all_tasks()[source]
Get all task functions lazily to avoid circular imports.
- Return type:
list[Callable[..., Any]]
Cache Tasks
Cache warming and invalidation tasks.
Download Tasks
Download statistics and file processing tasks.
Download statistics tracking service and background tasks.
This module provides Redis-based download tracking with periodic PostgreSQL aggregation.
Uses atomic Redis INCR for real-time counting with background flush to database.
-
class DownloadStatsService[source]
Bases: object
Service for tracking download statistics in Redis.
Uses atomic Redis counters for high-frequency download tracking.
Stats are periodically flushed to PostgreSQL for durable storage.
-
__init__(redis, namespace='pydotorg')[source]
Initialize the download stats service.
- Parameters:
-
-
async track_download(file_id, release_id=None)[source]
Track a single file download.
- Parameters:
-
- Return type:
None
-
async get_file_downloads(file_id)[source]
Get total download count for a file.
- Parameters:
file_id (str) – UUID of the release file
- Return type:
int
- Returns:
Total download count
-
async get_file_downloads_daily(file_id, day=None)[source]
Get download count for a file on a specific day.
- Parameters:
-
- Return type:
int
- Returns:
Download count for the day
-
async get_release_downloads(release_id)[source]
Get total download count for a release (all files).
- Parameters:
release_id (str) – UUID of the release
- Return type:
int
- Returns:
Total download count for all files in release
-
async get_total_downloads()[source]
Get total download count across all files.
- Return type:
int
- Returns:
Total download count
-
async get_daily_downloads(day=None)[source]
Get download count for a specific day.
- Parameters:
day (date | None) – Date to get count for (default: today)
- Return type:
int
- Returns:
Download count for the day
-
async get_top_downloads(limit=10)[source]
Get top downloaded files.
- Parameters:
limit (int) – Maximum number of results
- Return type:
list[tuple[str, int]]
- Returns:
List of (file_id, count) tuples sorted by count descending
-
async get_all_file_stats()[source]
Get download stats for all files.
- Return type:
dict[str, int]
- Returns:
Dict mapping file_id to download count
-
async get_stats_summary()[source]
Get overall download statistics summary.
- Return type:
dict[str, Any]
- Returns:
Dict with total downloads, today’s downloads, and top files
-
async flush_to_database(session)[source]
Flush Redis counters to PostgreSQL database.
This method aggregates daily stats and stores them in the database
for long-term storage and analytics queries.
- Parameters:
session (AsyncSession) – SQLAlchemy async session
- Return type:
int
- Returns:
Number of records flushed
-
async cleanup_old_daily_keys(days_to_keep=7)[source]
Clean up old daily Redis keys after flushing to database.
- Parameters:
days_to_keep (int) – Number of days of daily stats to retain in Redis
- Return type:
int
- Returns:
Number of keys deleted
-
async get_download_stats_service(ctx)[source]
Get or create download stats service from worker context.
- Parameters:
ctx (dict[str, Any]) – SAQ worker context dictionary
- Return type:
DownloadStatsService | None
- Returns:
DownloadStatsService instance or None if Redis unavailable
-
async flush_download_stats(ctx)[source]
SAQ task to flush download stats from Redis to PostgreSQL.
This task should be scheduled to run periodically (e.g., every 15 minutes).
- Parameters:
ctx (dict[str, Any]) – SAQ worker context
- Return type:
dict[str, Any]
- Returns:
Task result with flush count
-
async aggregate_download_stats(ctx)[source]
SAQ task to aggregate download statistics for reporting.
This task updates materialized views or summary tables for fast analytics.
- Parameters:
ctx (dict[str, Any]) – SAQ worker context
- Return type:
dict[str, Any]
- Returns:
Task result with aggregation status
Email Tasks
Async email sending tasks.
Event Tasks
Event processing and notification tasks.
Feed Tasks
RSS/Atom feed generation tasks.
Feed aggregation background tasks for SAQ.
-
async refresh_all_feeds(ctx)[source]
Refresh all active feeds.
This task fetches and parses all active RSS feeds, creating or updating
BlogEntry records for each feed item.
- Parameters:
ctx (Mapping[str, Any]) – Task context containing dependencies (session_maker, etc.).
- Return type:
dict[str, int]
- Returns:
Dict with success_count and error_count.
-
async refresh_stale_feeds(ctx, *, max_age_hours=1)[source]
Refresh only feeds that haven’t been updated recently.
This task is more efficient than refresh_all_feeds as it only processes
feeds that are older than the specified max_age.
- Parameters:
ctx (Mapping[str, Any]) – Task context containing dependencies.
max_age_hours (int) – Only refresh feeds not fetched in this many hours.
- Return type:
dict[str, int]
- Returns:
Dict with success_count, error_count, and cutoff_time.
-
async refresh_single_feed(ctx, *, feed_id)[source]
Refresh a specific feed by ID.
This task is useful for on-demand feed refresh triggered by user actions
or administrative tasks.
- Parameters:
ctx (Mapping[str, Any]) – Task context containing dependencies.
feed_id (str) – UUID of the feed to refresh (as string).
- Return type:
dict[str, Any]
- Returns:
Dict with feed info and entry count.
Job Tasks
Job board processing tasks.
Search Tasks
Search index building and updating tasks.
Stats Tasks
Statistics collection and processing tasks.
Persistent job statistics tracking for SAQ tasks.
This module provides Redis-based persistent statistics tracking for SAQ jobs.
SAQ deletes completed jobs after TTL (600s default), so we track stats separately.
-
class TaskStatsService[source]
Bases: object
Service for tracking persistent job statistics in Redis.
Stores atomic counters for job completion/failure across application restarts.
Uses Redis INCR for atomic updates and separate keys for total vs per-function stats.
-
__init__(redis, namespace='pydotorg')[source]
Initialize the stats service.
- Parameters:
-
-
async increment_complete(function_name=None)[source]
Increment completed job counter.
- Parameters:
function_name (str | None) – Optional function name for per-function stats
- Return type:
None
-
async increment_failed(function_name=None)[source]
Increment failed job counter.
- Parameters:
function_name (str | None) – Optional function name for per-function stats
- Return type:
None
-
async increment_retried(function_name=None)[source]
Increment retried job counter.
- Parameters:
function_name (str | None) – Optional function name for per-function stats
- Return type:
None
-
async get_stats()[source]
Get total job statistics.
- Return type:
dict[str, int]
- Returns:
Dict with complete, failed, and retried counts
-
async get_function_stats(function_name)[source]
Get statistics for a specific function.
- Parameters:
function_name (str) – Name of the task function
- Return type:
dict[str, int]
- Returns:
Dict with complete, failed, and retried counts for this function
-
async get_all_function_stats()[source]
Get statistics for all functions.
- Return type:
dict[str, dict[str, int]]
- Returns:
Dict mapping function names to their stats
-
async reset_stats()[source]
Reset all statistics counters (admin operation).
- Return type:
None
-
async get_stats_service(ctx)[source]
Get or create stats service from worker context.
- Parameters:
ctx (dict[str, Any]) – SAQ worker context dictionary
- Return type:
TaskStatsService | None
- Returns:
TaskStatsService instance or None if Redis unavailable
Sync Tasks
Data synchronization tasks.
Worker Tasks
Background worker management tasks.