Gate chat interaction on Twitch live status
This commit is contained in:
184
app/twitch/live.py
Normal file
184
app/twitch/live.py
Normal file
@@ -0,0 +1,184 @@
|
||||
"""Twitch live-stream status checks."""
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import httpx
|
||||
|
||||
from app.config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
TWITCH_STREAMS_URL = "https://api.twitch.tv/helix/streams"
|
||||
TWITCH_TOKEN_URL = "https://id.twitch.tv/oauth2/token"
|
||||
|
||||
|
||||
@dataclass
|
||||
class TwitchLiveStatus:
|
||||
"""Cached Twitch live status for a channel."""
|
||||
|
||||
channel_name: str
|
||||
is_live: bool
|
||||
checked_at: datetime
|
||||
title: str | None = None
|
||||
game_name: str | None = None
|
||||
started_at: str | None = None
|
||||
viewer_count: int | None = None
|
||||
reason: str | None = None
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
"""Return a response-safe representation."""
|
||||
return {
|
||||
"channel": self.channel_name,
|
||||
"is_live": self.is_live,
|
||||
"checked_at": self.checked_at.isoformat(),
|
||||
"title": self.title,
|
||||
"game_name": self.game_name,
|
||||
"started_at": self.started_at,
|
||||
"viewer_count": self.viewer_count,
|
||||
"reason": self.reason,
|
||||
}
|
||||
|
||||
|
||||
def _bearer_token(access_token: str) -> str:
|
||||
"""Normalize a Twitch token for Helix Authorization."""
|
||||
token = access_token.strip()
|
||||
if token.startswith("oauth:"):
|
||||
return token.removeprefix("oauth:")
|
||||
return token
|
||||
|
||||
|
||||
class TwitchLiveStatusService:
|
||||
"""Small cached client for Twitch Helix stream status."""
|
||||
|
||||
def __init__(self, cache_seconds: int = 60):
|
||||
self.cache_duration = timedelta(seconds=max(1, cache_seconds))
|
||||
self._cache: dict[str, TwitchLiveStatus] = {}
|
||||
self._app_access_token: str | None = None
|
||||
self._app_access_token_expires_at: datetime | None = None
|
||||
|
||||
async def get_status(
|
||||
self,
|
||||
channel_name: str,
|
||||
force_refresh: bool = False,
|
||||
) -> TwitchLiveStatus:
|
||||
"""Return current live status for a channel, using a short cache."""
|
||||
normalized_channel = channel_name.strip().lower().lstrip("#")
|
||||
now = datetime.utcnow()
|
||||
cached = self._cache.get(normalized_channel)
|
||||
if (
|
||||
cached
|
||||
and not force_refresh
|
||||
and now - cached.checked_at < self.cache_duration
|
||||
):
|
||||
return cached
|
||||
|
||||
status = await self._fetch_status(normalized_channel, now)
|
||||
self._cache[normalized_channel] = status
|
||||
return status
|
||||
|
||||
async def _fetch_status(
|
||||
self,
|
||||
channel_name: str,
|
||||
checked_at: datetime,
|
||||
) -> TwitchLiveStatus:
|
||||
"""Fetch live status from Twitch Helix."""
|
||||
if not settings.TWITCH_CLIENT_ID or not settings.TWITCH_ACCESS_TOKEN:
|
||||
return TwitchLiveStatus(
|
||||
channel_name=channel_name,
|
||||
is_live=False,
|
||||
checked_at=checked_at,
|
||||
reason="twitch_api_not_configured",
|
||||
)
|
||||
|
||||
headers = {
|
||||
"Client-ID": settings.TWITCH_CLIENT_ID,
|
||||
"Authorization": f"Bearer {_bearer_token(settings.TWITCH_ACCESS_TOKEN)}",
|
||||
}
|
||||
params = {"user_login": channel_name}
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
response = await client.get(
|
||||
TWITCH_STREAMS_URL,
|
||||
headers=headers,
|
||||
params=params,
|
||||
)
|
||||
if response.status_code == 401:
|
||||
app_access_token = await self._get_app_access_token(client)
|
||||
if app_access_token:
|
||||
headers["Authorization"] = f"Bearer {app_access_token}"
|
||||
response = await client.get(
|
||||
TWITCH_STREAMS_URL,
|
||||
headers=headers,
|
||||
params=params,
|
||||
)
|
||||
response.raise_for_status()
|
||||
except Exception as e:
|
||||
logger.warning("Failed to check Twitch live status: %s", e)
|
||||
return TwitchLiveStatus(
|
||||
channel_name=channel_name,
|
||||
is_live=False,
|
||||
checked_at=checked_at,
|
||||
reason="twitch_api_error",
|
||||
)
|
||||
|
||||
streams = response.json().get("data", [])
|
||||
if not streams:
|
||||
return TwitchLiveStatus(
|
||||
channel_name=channel_name,
|
||||
is_live=False,
|
||||
checked_at=checked_at,
|
||||
reason="stream_offline",
|
||||
)
|
||||
|
||||
stream = streams[0]
|
||||
return TwitchLiveStatus(
|
||||
channel_name=channel_name,
|
||||
is_live=True,
|
||||
checked_at=checked_at,
|
||||
title=stream.get("title"),
|
||||
game_name=stream.get("game_name"),
|
||||
started_at=stream.get("started_at"),
|
||||
viewer_count=stream.get("viewer_count"),
|
||||
)
|
||||
|
||||
async def _get_app_access_token(self, client: httpx.AsyncClient) -> str | None:
|
||||
"""Get a Twitch app access token for Helix reads."""
|
||||
now = datetime.utcnow()
|
||||
if (
|
||||
self._app_access_token
|
||||
and self._app_access_token_expires_at
|
||||
and now < self._app_access_token_expires_at
|
||||
):
|
||||
return self._app_access_token
|
||||
|
||||
if not settings.TWITCH_CLIENT_ID or not settings.TWITCH_CLIENT_SECRET:
|
||||
return None
|
||||
|
||||
try:
|
||||
response = await client.post(
|
||||
TWITCH_TOKEN_URL,
|
||||
data={
|
||||
"client_id": settings.TWITCH_CLIENT_ID,
|
||||
"client_secret": settings.TWITCH_CLIENT_SECRET,
|
||||
"grant_type": "client_credentials",
|
||||
},
|
||||
)
|
||||
response.raise_for_status()
|
||||
except Exception as e:
|
||||
logger.warning("Failed to get Twitch app access token: %s", e)
|
||||
return None
|
||||
|
||||
payload = response.json()
|
||||
access_token = payload.get("access_token")
|
||||
expires_in = int(payload.get("expires_in", 0))
|
||||
if not access_token:
|
||||
return None
|
||||
|
||||
self._app_access_token = access_token
|
||||
self._app_access_token_expires_at = now + timedelta(
|
||||
seconds=max(60, expires_in - 60)
|
||||
)
|
||||
return access_token
|
||||
Reference in New Issue
Block a user