Background Tasks

SAQ background task definitions for async processing.

Module Contents

Background tasks for litestar-pydotorg using SAQ.

This module provides access to all background tasks. To avoid circular imports, access the queue and settings through the worker module directly:

from pydotorg.tasks.worker import queue, settings_dict, get_task_functions

The ALL_TASKS and ALL_CRON_JOBS lists are populated lazily.

get_all_cron_jobs()[source]

Get all cron jobs lazily to avoid circular imports.

Return type:

list[Any]

get_all_tasks()[source]

Get all task functions lazily to avoid circular imports.

Return type:

list[Callable[..., Any]]

Cache Tasks

Cache warming and invalidation tasks.

Download Tasks

Download statistics and file processing tasks.

Download statistics tracking service and background tasks.

This module provides Redis-based download tracking with periodic PostgreSQL aggregation. Uses atomic Redis INCR for real-time counting with background flush to database.

class DownloadStatsService[source]

Bases: object

Service for tracking download statistics in Redis.

Uses atomic Redis counters for high-frequency download tracking. Stats are periodically flushed to PostgreSQL for durable storage.

__init__(redis, namespace='pydotorg')[source]

Initialize the download stats service.

Parameters:
  • redis (Redis) – Redis client instance

  • namespace (str) – Redis key namespace prefix (default: “pydotorg”)

async track_download(file_id, release_id=None)[source]

Track a single file download.

Parameters:
  • file_id (str) – UUID of the release file being downloaded

  • release_id (str | None) – Optional UUID of the release

Return type:

None

async get_file_downloads(file_id)[source]

Get total download count for a file.

Parameters:

file_id (str) – UUID of the release file

Return type:

int

Returns:

Total download count

async get_file_downloads_daily(file_id, day=None)[source]

Get download count for a file on a specific day.

Parameters:
  • file_id (str) – UUID of the release file

  • day (date | None) – Date to get count for (default: today)

Return type:

int

Returns:

Download count for the day

async get_release_downloads(release_id)[source]

Get total download count for a release (all files).

Parameters:

release_id (str) – UUID of the release

Return type:

int

Returns:

Total download count for all files in release

async get_total_downloads()[source]

Get total download count across all files.

Return type:

int

Returns:

Total download count

async get_daily_downloads(day=None)[source]

Get download count for a specific day.

Parameters:

day (date | None) – Date to get count for (default: today)

Return type:

int

Returns:

Download count for the day

async get_top_downloads(limit=10)[source]

Get top downloaded files.

Parameters:

limit (int) – Maximum number of results

Return type:

list[tuple[str, int]]

Returns:

List of (file_id, count) tuples sorted by count descending

async get_all_file_stats()[source]

Get download stats for all files.

Return type:

dict[str, int]

Returns:

Dict mapping file_id to download count

async get_stats_summary()[source]

Get overall download statistics summary.

Return type:

dict[str, Any]

Returns:

Dict with total downloads, today’s downloads, and top files

async flush_to_database(session)[source]

Flush Redis counters to PostgreSQL database.

This method aggregates daily stats and stores them in the database for long-term storage and analytics queries.

Parameters:

session (AsyncSession) – SQLAlchemy async session

Return type:

int

Returns:

Number of records flushed

async cleanup_old_daily_keys(days_to_keep=7)[source]

Clean up old daily Redis keys after flushing to database.

Parameters:

days_to_keep (int) – Number of days of daily stats to retain in Redis

Return type:

int

Returns:

Number of keys deleted

async get_download_stats_service(ctx)[source]

Get or create download stats service from worker context.

Parameters:

ctx (dict[str, Any]) – SAQ worker context dictionary

Return type:

DownloadStatsService | None

Returns:

DownloadStatsService instance or None if Redis unavailable

async flush_download_stats(ctx)[source]

SAQ task to flush download stats from Redis to PostgreSQL.

This task should be scheduled to run periodically (e.g., every 15 minutes).

Parameters:

ctx (dict[str, Any]) – SAQ worker context

Return type:

dict[str, Any]

Returns:

Task result with flush count

async aggregate_download_stats(ctx)[source]

SAQ task to aggregate download statistics for reporting.

This task updates materialized views or summary tables for fast analytics.

Parameters:

ctx (dict[str, Any]) – SAQ worker context

Return type:

dict[str, Any]

Returns:

Task result with aggregation status

Email Tasks

Async email sending tasks.

Event Tasks

Event processing and notification tasks.

Feed Tasks

RSS/Atom feed generation tasks.

Feed aggregation background tasks for SAQ.

async refresh_all_feeds(ctx)[source]

Refresh all active feeds.

This task fetches and parses all active RSS feeds, creating or updating BlogEntry records for each feed item.

Parameters:

ctx (Mapping[str, Any]) – Task context containing dependencies (session_maker, etc.).

Return type:

dict[str, int]

Returns:

Dict with success_count and error_count.

async refresh_stale_feeds(ctx, *, max_age_hours=1)[source]

Refresh only feeds that haven’t been updated recently.

This task is more efficient than refresh_all_feeds as it only processes feeds that are older than the specified max_age.

Parameters:
  • ctx (Mapping[str, Any]) – Task context containing dependencies.

  • max_age_hours (int) – Only refresh feeds not fetched in this many hours.

Return type:

dict[str, int]

Returns:

Dict with success_count, error_count, and cutoff_time.

async refresh_single_feed(ctx, *, feed_id)[source]

Refresh a specific feed by ID.

This task is useful for on-demand feed refresh triggered by user actions or administrative tasks.

Parameters:
  • ctx (Mapping[str, Any]) – Task context containing dependencies.

  • feed_id (str) – UUID of the feed to refresh (as string).

Return type:

dict[str, Any]

Returns:

Dict with feed info and entry count.

Job Tasks

Job board processing tasks.

Search Tasks

Search index building and updating tasks.

Stats Tasks

Statistics collection and processing tasks.

Persistent job statistics tracking for SAQ tasks.

This module provides Redis-based persistent statistics tracking for SAQ jobs. SAQ deletes completed jobs after TTL (600s default), so we track stats separately.

class TaskStatsService[source]

Bases: object

Service for tracking persistent job statistics in Redis.

Stores atomic counters for job completion/failure across application restarts. Uses Redis INCR for atomic updates and separate keys for total vs per-function stats.

__init__(redis, namespace='pydotorg')[source]

Initialize the stats service.

Parameters:
  • redis (Redis) – Redis client instance (from worker context)

  • namespace (str) – Redis key namespace prefix (default: “pydotorg”)

async increment_complete(function_name=None)[source]

Increment completed job counter.

Parameters:

function_name (str | None) – Optional function name for per-function stats

Return type:

None

async increment_failed(function_name=None)[source]

Increment failed job counter.

Parameters:

function_name (str | None) – Optional function name for per-function stats

Return type:

None

async increment_retried(function_name=None)[source]

Increment retried job counter.

Parameters:

function_name (str | None) – Optional function name for per-function stats

Return type:

None

async get_stats()[source]

Get total job statistics.

Return type:

dict[str, int]

Returns:

Dict with complete, failed, and retried counts

async get_function_stats(function_name)[source]

Get statistics for a specific function.

Parameters:

function_name (str) – Name of the task function

Return type:

dict[str, int]

Returns:

Dict with complete, failed, and retried counts for this function

async get_all_function_stats()[source]

Get statistics for all functions.

Return type:

dict[str, dict[str, int]]

Returns:

Dict mapping function names to their stats

async reset_stats()[source]

Reset all statistics counters (admin operation).

Return type:

None

async get_stats_service(ctx)[source]

Get or create stats service from worker context.

Parameters:

ctx (dict[str, Any]) – SAQ worker context dictionary

Return type:

TaskStatsService | None

Returns:

TaskStatsService instance or None if Redis unavailable

Sync Tasks

Data synchronization tasks.

Worker Tasks

Background worker management tasks.