"""Twitch live-stream status checks.""" import logging from dataclasses import dataclass from datetime import datetime, timedelta import httpx from app.config import settings logger = logging.getLogger(__name__) TWITCH_STREAMS_URL = "https://api.twitch.tv/helix/streams" TWITCH_TOKEN_URL = "https://id.twitch.tv/oauth2/token" @dataclass class TwitchLiveStatus: """Cached Twitch live status for a channel.""" channel_name: str is_live: bool checked_at: datetime title: str | None = None game_name: str | None = None started_at: str | None = None viewer_count: int | None = None reason: str | None = None def to_dict(self) -> dict: """Return a response-safe representation.""" return { "channel": self.channel_name, "is_live": self.is_live, "checked_at": self.checked_at.isoformat(), "title": self.title, "game_name": self.game_name, "started_at": self.started_at, "viewer_count": self.viewer_count, "reason": self.reason, } def _bearer_token(access_token: str) -> str: """Normalize a Twitch token for Helix Authorization.""" token = access_token.strip() if token.startswith("oauth:"): return token.removeprefix("oauth:") return token class TwitchLiveStatusService: """Small cached client for Twitch Helix stream status.""" def __init__(self, cache_seconds: int = 60): self.cache_duration = timedelta(seconds=max(1, cache_seconds)) self._cache: dict[str, TwitchLiveStatus] = {} self._app_access_token: str | None = None self._app_access_token_expires_at: datetime | None = None async def get_status( self, channel_name: str, force_refresh: bool = False, ) -> TwitchLiveStatus: """Return current live status for a channel, using a short cache.""" normalized_channel = channel_name.strip().lower().lstrip("#") now = datetime.utcnow() cached = self._cache.get(normalized_channel) if ( cached and not force_refresh and now - cached.checked_at < self.cache_duration ): return cached status = await self._fetch_status(normalized_channel, now) self._cache[normalized_channel] = status return status async def _fetch_status( self, channel_name: str, checked_at: datetime, ) -> TwitchLiveStatus: """Fetch live status from Twitch Helix.""" if not settings.TWITCH_CLIENT_ID or not settings.TWITCH_ACCESS_TOKEN: return TwitchLiveStatus( channel_name=channel_name, is_live=False, checked_at=checked_at, reason="twitch_api_not_configured", ) headers = { "Client-ID": settings.TWITCH_CLIENT_ID, "Authorization": f"Bearer {_bearer_token(settings.TWITCH_ACCESS_TOKEN)}", } params = {"user_login": channel_name} try: async with httpx.AsyncClient(timeout=10) as client: response = await client.get( TWITCH_STREAMS_URL, headers=headers, params=params, ) if response.status_code == 401: app_access_token = await self._get_app_access_token(client) if app_access_token: headers["Authorization"] = f"Bearer {app_access_token}" response = await client.get( TWITCH_STREAMS_URL, headers=headers, params=params, ) response.raise_for_status() except Exception as e: logger.warning("Failed to check Twitch live status: %s", e) return TwitchLiveStatus( channel_name=channel_name, is_live=False, checked_at=checked_at, reason="twitch_api_error", ) streams = response.json().get("data", []) if not streams: return TwitchLiveStatus( channel_name=channel_name, is_live=False, checked_at=checked_at, reason="stream_offline", ) stream = streams[0] return TwitchLiveStatus( channel_name=channel_name, is_live=True, checked_at=checked_at, title=stream.get("title"), game_name=stream.get("game_name"), started_at=stream.get("started_at"), viewer_count=stream.get("viewer_count"), ) async def _get_app_access_token(self, client: httpx.AsyncClient) -> str | None: """Get a Twitch app access token for Helix reads.""" now = datetime.utcnow() if ( self._app_access_token and self._app_access_token_expires_at and now < self._app_access_token_expires_at ): return self._app_access_token if not settings.TWITCH_CLIENT_ID or not settings.TWITCH_CLIENT_SECRET: return None try: response = await client.post( TWITCH_TOKEN_URL, data={ "client_id": settings.TWITCH_CLIENT_ID, "client_secret": settings.TWITCH_CLIENT_SECRET, "grant_type": "client_credentials", }, ) response.raise_for_status() except Exception as e: logger.warning("Failed to get Twitch app access token: %s", e) return None payload = response.json() access_token = payload.get("access_token") expires_in = int(payload.get("expires_in", 0)) if not access_token: return None self._app_access_token = access_token self._app_access_token_expires_at = now + timedelta( seconds=max(60, expires_in - 60) ) return access_token