"""Twitch chat client for sending and receiving messages over IRC.""" import asyncio import contextlib import logging import ssl from collections.abc import Awaitable, Callable from dataclasses import dataclass from typing import Optional from app.config import settings logger = logging.getLogger(__name__) TWITCH_IRC_HOST = "irc.chat.twitch.tv" TWITCH_IRC_PORT = 6697 @dataclass class TwitchChatMessage: """Parsed Twitch chat message.""" username: str display_name: str content: str is_moderator: bool = False def _normalize_token(access_token: str) -> str: """Return a Twitch IRC PASS token with the expected oauth: prefix.""" token = access_token.strip() if token.startswith("oauth:"): return token return f"oauth:{token}" def _channel_name(channel_name: str) -> str: """Normalize a channel name for IRC commands.""" return channel_name.strip().lower().lstrip("#") def _parse_tags(raw_tags: str) -> dict[str, str]: """Parse IRCv3 tags from a Twitch message.""" tags = {} for raw_tag in raw_tags.split(";"): key, _, value = raw_tag.partition("=") tags[key] = ( value.replace(r"\s", " ") .replace(r"\:", ";") .replace(r"\\", "\\") .replace(r"\r", "\r") .replace(r"\n", "\n") ) return tags def parse_privmsg(line: str) -> TwitchChatMessage | None: """Parse a Twitch IRC PRIVMSG line into a chat message.""" tags: dict[str, str] = {} if line.startswith("@"): raw_tags, _, line = line.partition(" ") tags = _parse_tags(raw_tags[1:]) if not line.startswith(":"): return None prefix, _, rest = line[1:].partition(" ") command, _, params = rest.partition(" ") if command != "PRIVMSG": return None _, _, content = params.partition(" :") username = prefix.split("!", 1)[0] display_name = tags.get("display-name") or username badges = tags.get("badges", "") is_moderator = tags.get("mod") == "1" or "moderator/" in badges if not username or not content: return None return TwitchChatMessage( username=username, display_name=display_name, content=content, is_moderator=is_moderator, ) class TwitchIRCClient: """Small Twitch IRC client for one channel.""" def __init__( self, channel_name: str, bot_username: str, access_token: str, on_message: Callable[[TwitchChatMessage], Awaitable[None]] | None = None, ): self.channel_name = _channel_name(channel_name) self.bot_username = bot_username.strip().lower() self.access_token = _normalize_token(access_token) self.on_message = on_message self.connected = False self._reader: asyncio.StreamReader | None = None self._writer: asyncio.StreamWriter | None = None self._stop_event = asyncio.Event() self._last_error: str | None = None @property def last_error(self) -> str | None: """Return the most recent connection/listener error.""" return self._last_error async def connect(self) -> None: """Open an IRC connection and join the configured channel.""" ssl_context = ssl.create_default_context() self._reader, self._writer = await asyncio.open_connection( TWITCH_IRC_HOST, TWITCH_IRC_PORT, ssl=ssl_context, ) await self._send_raw(f"PASS {self.access_token}") await self._send_raw(f"NICK {self.bot_username}") await self._send_raw("CAP REQ :twitch.tv/tags twitch.tv/commands") await self._send_raw(f"JOIN #{self.channel_name}") await self._wait_for_login() self.connected = True self._last_error = None logger.info("Connected to Twitch chat for #%s", self.channel_name) async def disconnect(self) -> None: """Close the active IRC connection.""" self._stop_event.set() self.connected = False if self._writer: self._writer.close() with contextlib.suppress(Exception): await self._writer.wait_closed() self._reader = None self._writer = None async def run(self) -> None: """Run the chat listener with retry on connection loss.""" backoff_seconds = 5 while not self._stop_event.is_set(): try: await self.connect() await self._listen() except asyncio.CancelledError: raise except Exception as e: self.connected = False self._last_error = str(e) logger.warning("Twitch chat listener disconnected: %s", e) finally: await self._close_writer() if not self._stop_event.is_set(): await asyncio.sleep(backoff_seconds) async def send_message(self, message: str) -> bool: """Send a chat message to the configured channel.""" if not self.connected or not self._writer: return False await self._send_raw(f"PRIVMSG #{self.channel_name} :{message}") logger.info("Sent Twitch chat message to #%s", self.channel_name) return True def status(self) -> dict: """Return connection status safe for API responses.""" return { "configured": True, "connected": self.connected, "channel": self.channel_name, "bot_username": self.bot_username, "last_error": self._last_error, } async def _listen(self) -> None: if not self._reader: raise RuntimeError("Twitch IRC reader is not connected") while not self._stop_event.is_set(): line = await self._read_line() if await self._handle_control_line(line): continue message = parse_privmsg(line) if message and self.on_message: await self.on_message(message) async def _wait_for_login(self) -> None: """Wait briefly for Twitch to accept or reject IRC authentication.""" while True: line = await asyncio.wait_for(self._read_line(), timeout=10) if await self._handle_control_line(line): continue if "Login authentication failed" in line: raise PermissionError("Twitch IRC authentication failed") if " 001 " in line or "GLOBALUSERSTATE" in line or " JOIN #" in line: return async def _read_line(self) -> str: if not self._reader: raise RuntimeError("Twitch IRC reader is not connected") raw_line = await self._reader.readline() if not raw_line: raise ConnectionError("Twitch IRC connection closed") return raw_line.decode("utf-8", errors="replace").strip() async def _handle_control_line(self, line: str) -> bool: if line.startswith("PING"): await self._send_raw("PONG :tmi.twitch.tv") return True return False async def _send_raw(self, command: str) -> None: if not self._writer: raise RuntimeError("Twitch IRC writer is not connected") self._writer.write(f"{command}\r\n".encode("utf-8")) await self._writer.drain() async def _close_writer(self) -> None: self.connected = False if self._writer: self._writer.close() with contextlib.suppress(Exception): await self._writer.wait_closed() self._reader = None self._writer = None _active_client: TwitchIRCClient | None = None def set_active_client(client: TwitchIRCClient | None) -> None: """Register the process-wide Twitch chat client used for outbound messages.""" global _active_client _active_client = client async def send_chat_message( channel_name: str, message: str, access_token: Optional[str] = None, ) -> bool: """ Send a message to Twitch chat through the active IRC client when available. The fallback creates a short-lived IRC connection for admin/test paths. """ if _active_client and _active_client.channel_name == _channel_name(channel_name): sent = await _active_client.send_message(message) if sent: return True token = access_token or settings.TWITCH_ACCESS_TOKEN bot_username = settings.TWITCH_BOT_USERNAME if not token or not bot_username: logger.info("Twitch chat send skipped because credentials are incomplete") return False client = TwitchIRCClient( channel_name=channel_name, bot_username=bot_username, access_token=token, ) try: await client.connect() return await client.send_message(message) except Exception as e: logger.warning("Failed to send Twitch chat message: %s", e) return False finally: await client.disconnect() class ChatMessageBuffer: """ Buffer for outgoing chat messages with rate limiting. Implements Twitch's chat rate limits: - Regular users: 20 messages per 30 seconds - Verified bots: 50 messages per 30 seconds - Moderators: 100 messages per 30 seconds TODO: Implement queue with configurable rate limits TODO: Add priority levels for urgent messages TODO: Implement metrics tracking """ def __init__(self, channel_name: str, max_messages_per_interval: int = 20): """Initialize message buffer.""" self.channel_name = channel_name self.max_messages_per_interval = max_messages_per_interval self.message_queue: list[str] = [] async def add_message(self, message: str) -> None: """Add a message to the buffer.""" self.message_queue.append(message) logger.debug(f"Message queued for {self.channel_name}") async def flush(self) -> None: """Send all buffered messages.""" for message in self.message_queue: await send_chat_message(self.channel_name, message) self.message_queue.clear()