Source code for pydotorg.tasks.stats

"""Persistent job statistics tracking for SAQ tasks.

This module provides Redis-based persistent statistics tracking for SAQ jobs.
SAQ deletes completed jobs after TTL (600s default), so we track stats separately.
"""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    from redis.asyncio import Redis

logger = logging.getLogger(__name__)


[docs] class TaskStatsService: """Service for tracking persistent job statistics in Redis. Stores atomic counters for job completion/failure across application restarts. Uses Redis INCR for atomic updates and separate keys for total vs per-function stats. """
[docs] def __init__(self, redis: Redis, namespace: str = "pydotorg") -> None: """Initialize the stats service. Args: redis: Redis client instance (from worker context) namespace: Redis key namespace prefix (default: "pydotorg") """ self.redis = redis self.namespace = namespace
def _key(self, *parts: str) -> str: """Build a namespaced Redis key. Args: *parts: Key components to join Returns: Namespaced Redis key string """ return ":".join([self.namespace, "tasks", "stats", *parts])
[docs] async def increment_complete(self, function_name: str | None = None) -> None: """Increment completed job counter. Args: function_name: Optional function name for per-function stats """ try: await self.redis.incr(self._key("total", "complete")) if function_name: await self.redis.incr(self._key("func", function_name, "complete")) logger.debug(f"Incremented complete counter for {function_name or 'total'}") except Exception: logger.exception(f"Failed to increment complete counter for {function_name}")
[docs] async def increment_failed(self, function_name: str | None = None) -> None: """Increment failed job counter. Args: function_name: Optional function name for per-function stats """ try: await self.redis.incr(self._key("total", "failed")) if function_name: await self.redis.incr(self._key("func", function_name, "failed")) logger.debug(f"Incremented failed counter for {function_name or 'total'}") except Exception: logger.exception(f"Failed to increment failed counter for {function_name}")
[docs] async def increment_retried(self, function_name: str | None = None) -> None: """Increment retried job counter. Args: function_name: Optional function name for per-function stats """ try: await self.redis.incr(self._key("total", "retried")) if function_name: await self.redis.incr(self._key("func", function_name, "retried")) logger.debug(f"Incremented retried counter for {function_name or 'total'}") except Exception: logger.exception(f"Failed to increment retried counter for {function_name}")
[docs] async def get_stats(self) -> dict[str, int]: """Get total job statistics. Returns: Dict with complete, failed, and retried counts """ try: complete = await self.redis.get(self._key("total", "complete")) failed = await self.redis.get(self._key("total", "failed")) retried = await self.redis.get(self._key("total", "retried")) return { "complete": int(complete) if complete else 0, "failed": int(failed) if failed else 0, "retried": int(retried) if retried else 0, } except Exception: logger.exception("Failed to get total stats") return {"complete": 0, "failed": 0, "retried": 0}
[docs] async def get_function_stats(self, function_name: str) -> dict[str, int]: """Get statistics for a specific function. Args: function_name: Name of the task function Returns: Dict with complete, failed, and retried counts for this function """ try: complete = await self.redis.get(self._key("func", function_name, "complete")) failed = await self.redis.get(self._key("func", function_name, "failed")) retried = await self.redis.get(self._key("func", function_name, "retried")) return { "complete": int(complete) if complete else 0, "failed": int(failed) if failed else 0, "retried": int(retried) if retried else 0, } except Exception: logger.exception(f"Failed to get stats for function {function_name}") return {"complete": 0, "failed": 0, "retried": 0}
[docs] async def get_all_function_stats(self) -> dict[str, dict[str, int]]: """Get statistics for all functions. Returns: Dict mapping function names to their stats """ try: pattern = self._key("func", "*", "complete") stats: dict[str, dict[str, int]] = {} expected_key_parts = 6 async for key in self.redis.scan_iter(match=pattern): key_str = key.decode() if isinstance(key, bytes) else key parts = key_str.split(":") if len(parts) >= expected_key_parts: function_name = parts[4] if function_name not in stats: stats[function_name] = await self.get_function_stats(function_name) return stats except Exception: logger.exception("Failed to get all function stats") return {}
[docs] async def reset_stats(self) -> None: """Reset all statistics counters (admin operation).""" try: pattern = self._key("*") keys_deleted = 0 async for key in self.redis.scan_iter(match=pattern): await self.redis.delete(key) keys_deleted += 1 logger.info(f"Reset task statistics ({keys_deleted} keys deleted)") except Exception: logger.exception("Failed to reset stats")
[docs] async def get_stats_service(ctx: dict[str, Any]) -> TaskStatsService | None: """Get or create stats service from worker context. Args: ctx: SAQ worker context dictionary Returns: TaskStatsService instance or None if Redis unavailable """ redis = ctx.get("redis") if not redis: logger.warning("Redis not available in context, stats tracking disabled") return None if "stats_service" not in ctx: from pydotorg.config import settings namespace = getattr(settings, "redis_stats_namespace", "pydotorg") ctx["stats_service"] = TaskStatsService(redis, namespace=namespace) return ctx["stats_service"]