185 lines
6.0 KiB
Python
185 lines
6.0 KiB
Python
"""Twitch live-stream status checks."""
|
|
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta
|
|
|
|
import httpx
|
|
|
|
from app.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
TWITCH_STREAMS_URL = "https://api.twitch.tv/helix/streams"
|
|
TWITCH_TOKEN_URL = "https://id.twitch.tv/oauth2/token"
|
|
|
|
|
|
@dataclass
|
|
class TwitchLiveStatus:
|
|
"""Cached Twitch live status for a channel."""
|
|
|
|
channel_name: str
|
|
is_live: bool
|
|
checked_at: datetime
|
|
title: str | None = None
|
|
game_name: str | None = None
|
|
started_at: str | None = None
|
|
viewer_count: int | None = None
|
|
reason: str | None = None
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Return a response-safe representation."""
|
|
return {
|
|
"channel": self.channel_name,
|
|
"is_live": self.is_live,
|
|
"checked_at": self.checked_at.isoformat(),
|
|
"title": self.title,
|
|
"game_name": self.game_name,
|
|
"started_at": self.started_at,
|
|
"viewer_count": self.viewer_count,
|
|
"reason": self.reason,
|
|
}
|
|
|
|
|
|
def _bearer_token(access_token: str) -> str:
|
|
"""Normalize a Twitch token for Helix Authorization."""
|
|
token = access_token.strip()
|
|
if token.startswith("oauth:"):
|
|
return token.removeprefix("oauth:")
|
|
return token
|
|
|
|
|
|
class TwitchLiveStatusService:
|
|
"""Small cached client for Twitch Helix stream status."""
|
|
|
|
def __init__(self, cache_seconds: int = 60):
|
|
self.cache_duration = timedelta(seconds=max(1, cache_seconds))
|
|
self._cache: dict[str, TwitchLiveStatus] = {}
|
|
self._app_access_token: str | None = None
|
|
self._app_access_token_expires_at: datetime | None = None
|
|
|
|
async def get_status(
|
|
self,
|
|
channel_name: str,
|
|
force_refresh: bool = False,
|
|
) -> TwitchLiveStatus:
|
|
"""Return current live status for a channel, using a short cache."""
|
|
normalized_channel = channel_name.strip().lower().lstrip("#")
|
|
now = datetime.utcnow()
|
|
cached = self._cache.get(normalized_channel)
|
|
if (
|
|
cached
|
|
and not force_refresh
|
|
and now - cached.checked_at < self.cache_duration
|
|
):
|
|
return cached
|
|
|
|
status = await self._fetch_status(normalized_channel, now)
|
|
self._cache[normalized_channel] = status
|
|
return status
|
|
|
|
async def _fetch_status(
|
|
self,
|
|
channel_name: str,
|
|
checked_at: datetime,
|
|
) -> TwitchLiveStatus:
|
|
"""Fetch live status from Twitch Helix."""
|
|
if not settings.TWITCH_CLIENT_ID or not settings.TWITCH_ACCESS_TOKEN:
|
|
return TwitchLiveStatus(
|
|
channel_name=channel_name,
|
|
is_live=False,
|
|
checked_at=checked_at,
|
|
reason="twitch_api_not_configured",
|
|
)
|
|
|
|
headers = {
|
|
"Client-ID": settings.TWITCH_CLIENT_ID,
|
|
"Authorization": f"Bearer {_bearer_token(settings.TWITCH_ACCESS_TOKEN)}",
|
|
}
|
|
params = {"user_login": channel_name}
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
response = await client.get(
|
|
TWITCH_STREAMS_URL,
|
|
headers=headers,
|
|
params=params,
|
|
)
|
|
if response.status_code == 401:
|
|
app_access_token = await self._get_app_access_token(client)
|
|
if app_access_token:
|
|
headers["Authorization"] = f"Bearer {app_access_token}"
|
|
response = await client.get(
|
|
TWITCH_STREAMS_URL,
|
|
headers=headers,
|
|
params=params,
|
|
)
|
|
response.raise_for_status()
|
|
except Exception as e:
|
|
logger.warning("Failed to check Twitch live status: %s", e)
|
|
return TwitchLiveStatus(
|
|
channel_name=channel_name,
|
|
is_live=False,
|
|
checked_at=checked_at,
|
|
reason="twitch_api_error",
|
|
)
|
|
|
|
streams = response.json().get("data", [])
|
|
if not streams:
|
|
return TwitchLiveStatus(
|
|
channel_name=channel_name,
|
|
is_live=False,
|
|
checked_at=checked_at,
|
|
reason="stream_offline",
|
|
)
|
|
|
|
stream = streams[0]
|
|
return TwitchLiveStatus(
|
|
channel_name=channel_name,
|
|
is_live=True,
|
|
checked_at=checked_at,
|
|
title=stream.get("title"),
|
|
game_name=stream.get("game_name"),
|
|
started_at=stream.get("started_at"),
|
|
viewer_count=stream.get("viewer_count"),
|
|
)
|
|
|
|
async def _get_app_access_token(self, client: httpx.AsyncClient) -> str | None:
|
|
"""Get a Twitch app access token for Helix reads."""
|
|
now = datetime.utcnow()
|
|
if (
|
|
self._app_access_token
|
|
and self._app_access_token_expires_at
|
|
and now < self._app_access_token_expires_at
|
|
):
|
|
return self._app_access_token
|
|
|
|
if not settings.TWITCH_CLIENT_ID or not settings.TWITCH_CLIENT_SECRET:
|
|
return None
|
|
|
|
try:
|
|
response = await client.post(
|
|
TWITCH_TOKEN_URL,
|
|
data={
|
|
"client_id": settings.TWITCH_CLIENT_ID,
|
|
"client_secret": settings.TWITCH_CLIENT_SECRET,
|
|
"grant_type": "client_credentials",
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
except Exception as e:
|
|
logger.warning("Failed to get Twitch app access token: %s", e)
|
|
return None
|
|
|
|
payload = response.json()
|
|
access_token = payload.get("access_token")
|
|
expires_in = int(payload.get("expires_in", 0))
|
|
if not access_token:
|
|
return None
|
|
|
|
self._app_access_token = access_token
|
|
self._app_access_token_expires_at = now + timedelta(
|
|
seconds=max(60, expires_in - 60)
|
|
)
|
|
return access_token
|