diff --git a/app/config.py b/app/config.py index 08b2c38..25f2b57 100644 --- a/app/config.py +++ b/app/config.py @@ -46,6 +46,8 @@ class Settings(BaseSettings): TWITCH_CLIENT_SECRET: Optional[str] = None TWITCH_BOT_USERNAME: Optional[str] = None TWITCH_CHANNEL_NAME: Optional[str] = None + TWITCH_ACCESS_TOKEN: Optional[str] = None + TWITCH_CHAT_ENABLED: bool = True # LLM LLM_PROVIDER: Optional[str] = None # "openai", "ollama", "lm_studio", or None diff --git a/app/main.py b/app/main.py index 34981ad..555d3d4 100644 --- a/app/main.py +++ b/app/main.py @@ -14,6 +14,7 @@ from app.memory.database import init_db from app.memory.database import get_session as get_db_session from app.memory.repository import Repository from app.exports.markdown import MarkdownExporter +from app.twitch.chat import TwitchChatMessage, TwitchIRCClient, set_active_client logger = logging.getLogger(__name__) @@ -40,6 +41,9 @@ class DashboardRequest(BaseModel): # Global orchestrator instance orchestrator: AgentOrchestrator | None = None agent_loop_task: asyncio.Task | None = None +twitch_chat_client: TwitchIRCClient | None = None +twitch_chat_task: asyncio.Task | None = None +twitch_session_id: str | None = None async def require_admin( @@ -73,6 +77,78 @@ async def agent_loop() -> None: await asyncio.sleep(orchestrator.loop_interval_seconds) +def twitch_configured() -> bool: + """Return whether Twitch chat has enough runtime configuration to start.""" + return bool( + settings.TWITCH_CHAT_ENABLED + and settings.TWITCH_CHANNEL_NAME + and settings.TWITCH_BOT_USERNAME + and settings.TWITCH_ACCESS_TOKEN + ) + + +async def get_or_create_twitch_session(channel_name: str) -> str: + """Use an active session for the Twitch channel, or create one.""" + if not orchestrator: + raise RuntimeError("Orchestrator not initialized") + + normalized_channel = channel_name.lower() + matching_sessions = [ + (session_id, session) + for session_id, session in orchestrator.active_sessions.items() + if session.get("channel_name", "").lower() == normalized_channel + ] + if matching_sessions: + session_id, _ = max( + matching_sessions, + key=lambda item: item[1].get("started_at", datetime.min), + ) + return session_id + + return await orchestrator.start_session(channel_name) + + +async def handle_twitch_chat_message(message: TwitchChatMessage) -> None: + """Route a Twitch chat message into the orchestrator.""" + if not orchestrator or not twitch_session_id: + return + + if settings.TWITCH_BOT_USERNAME and ( + message.username.lower() == settings.TWITCH_BOT_USERNAME.lower() + ): + return + + await orchestrator.handle_chat_message( + session_id=twitch_session_id, + username=message.display_name or message.username, + message=message.content, + ) + + +async def start_twitch_chat() -> None: + """Start Twitch chat monitoring when configured.""" + global twitch_chat_client, twitch_chat_task, twitch_session_id + + if not orchestrator or not twitch_configured(): + logger.info("Twitch chat listener not started; configuration is incomplete") + return + + twitch_session_id = await get_or_create_twitch_session(settings.TWITCH_CHANNEL_NAME) + twitch_chat_client = TwitchIRCClient( + channel_name=settings.TWITCH_CHANNEL_NAME, + bot_username=settings.TWITCH_BOT_USERNAME, + access_token=settings.TWITCH_ACCESS_TOKEN, + on_message=handle_twitch_chat_message, + ) + set_active_client(twitch_chat_client) + twitch_chat_task = asyncio.create_task(twitch_chat_client.run()) + logger.info( + "Twitch chat listener starting for %s on session %s", + settings.TWITCH_CHANNEL_NAME, + twitch_session_id, + ) + + @app.on_event("startup") async def startup_event(): """Initialize database and services on startup.""" @@ -84,6 +160,7 @@ async def startup_event(): ) await orchestrator.restore_active_sessions() agent_loop_task = asyncio.create_task(agent_loop()) + await start_twitch_chat() logger.info("Application started successfully") except Exception as e: logger.error(f"Failed to start application: {e}") @@ -93,6 +170,16 @@ async def startup_event(): @app.on_event("shutdown") async def shutdown_event(): """Clean up resources on shutdown.""" + global twitch_chat_client + + if twitch_chat_task: + twitch_chat_task.cancel() + with suppress(asyncio.CancelledError): + await twitch_chat_task + if twitch_chat_client: + await twitch_chat_client.disconnect() + set_active_client(None) + twitch_chat_client = None if agent_loop_task: agent_loop_task.cancel() with suppress(asyncio.CancelledError): @@ -308,6 +395,20 @@ async def get_loop_status() -> dict: } +@app.get("/admin/twitch/status", dependencies=[Depends(require_admin)]) +async def get_twitch_status() -> dict: + """Get Twitch chat connection status.""" + configured = twitch_configured() + client_status = twitch_chat_client.status() if twitch_chat_client else {} + return { + "configured": configured, + "running": bool(twitch_chat_task and not twitch_chat_task.done()), + "session_id": twitch_session_id, + **client_status, + "timestamp": datetime.utcnow().isoformat(), + } + + @app.post("/admin/loop/frequency", dependencies=[Depends(require_admin)]) async def set_loop_frequency(interval_seconds: float = Form(...)) -> dict: """Set how frequently the background agent loop runs.""" diff --git a/app/twitch/chat.py b/app/twitch/chat.py index 9c08431..204e658 100644 --- a/app/twitch/chat.py +++ b/app/twitch/chat.py @@ -1,10 +1,242 @@ -"""Twitch chat client for sending and receiving messages.""" +"""Twitch chat client for sending and receiving messages over IRC.""" +import asyncio +import contextlib import logging +import ssl +from collections.abc import Awaitable, Callable +from dataclasses import dataclass from typing import Optional +from app.config import settings + logger = logging.getLogger(__name__) +TWITCH_IRC_HOST = "irc.chat.twitch.tv" +TWITCH_IRC_PORT = 6697 + + +@dataclass +class TwitchChatMessage: + """Parsed Twitch chat message.""" + + username: str + display_name: str + content: str + is_moderator: bool = False + + +def _normalize_token(access_token: str) -> str: + """Return a Twitch IRC PASS token with the expected oauth: prefix.""" + token = access_token.strip() + if token.startswith("oauth:"): + return token + return f"oauth:{token}" + + +def _channel_name(channel_name: str) -> str: + """Normalize a channel name for IRC commands.""" + return channel_name.strip().lower().lstrip("#") + + +def _parse_tags(raw_tags: str) -> dict[str, str]: + """Parse IRCv3 tags from a Twitch message.""" + tags = {} + for raw_tag in raw_tags.split(";"): + key, _, value = raw_tag.partition("=") + tags[key] = ( + value.replace(r"\s", " ") + .replace(r"\:", ";") + .replace(r"\\", "\\") + .replace(r"\r", "\r") + .replace(r"\n", "\n") + ) + return tags + + +def parse_privmsg(line: str) -> TwitchChatMessage | None: + """Parse a Twitch IRC PRIVMSG line into a chat message.""" + tags: dict[str, str] = {} + if line.startswith("@"): + raw_tags, _, line = line.partition(" ") + tags = _parse_tags(raw_tags[1:]) + + if not line.startswith(":"): + return None + + prefix, _, rest = line[1:].partition(" ") + command, _, params = rest.partition(" ") + if command != "PRIVMSG": + return None + + _, _, content = params.partition(" :") + username = prefix.split("!", 1)[0] + display_name = tags.get("display-name") or username + badges = tags.get("badges", "") + is_moderator = tags.get("mod") == "1" or "moderator/" in badges + + if not username or not content: + return None + + return TwitchChatMessage( + username=username, + display_name=display_name, + content=content, + is_moderator=is_moderator, + ) + + +class TwitchIRCClient: + """Small Twitch IRC client for one channel.""" + + def __init__( + self, + channel_name: str, + bot_username: str, + access_token: str, + on_message: Callable[[TwitchChatMessage], Awaitable[None]] | None = None, + ): + self.channel_name = _channel_name(channel_name) + self.bot_username = bot_username.strip().lower() + self.access_token = _normalize_token(access_token) + self.on_message = on_message + self.connected = False + self._reader: asyncio.StreamReader | None = None + self._writer: asyncio.StreamWriter | None = None + self._stop_event = asyncio.Event() + self._last_error: str | None = None + + @property + def last_error(self) -> str | None: + """Return the most recent connection/listener error.""" + return self._last_error + + async def connect(self) -> None: + """Open an IRC connection and join the configured channel.""" + ssl_context = ssl.create_default_context() + self._reader, self._writer = await asyncio.open_connection( + TWITCH_IRC_HOST, + TWITCH_IRC_PORT, + ssl=ssl_context, + ) + await self._send_raw(f"PASS {self.access_token}") + await self._send_raw(f"NICK {self.bot_username}") + await self._send_raw("CAP REQ :twitch.tv/tags twitch.tv/commands") + await self._send_raw(f"JOIN #{self.channel_name}") + await self._wait_for_login() + self.connected = True + self._last_error = None + logger.info("Connected to Twitch chat for #%s", self.channel_name) + + async def disconnect(self) -> None: + """Close the active IRC connection.""" + self._stop_event.set() + self.connected = False + if self._writer: + self._writer.close() + with contextlib.suppress(Exception): + await self._writer.wait_closed() + self._reader = None + self._writer = None + + async def run(self) -> None: + """Run the chat listener with retry on connection loss.""" + backoff_seconds = 5 + while not self._stop_event.is_set(): + try: + await self.connect() + await self._listen() + except asyncio.CancelledError: + raise + except Exception as e: + self.connected = False + self._last_error = str(e) + logger.warning("Twitch chat listener disconnected: %s", e) + finally: + await self._close_writer() + + if not self._stop_event.is_set(): + await asyncio.sleep(backoff_seconds) + + async def send_message(self, message: str) -> bool: + """Send a chat message to the configured channel.""" + if not self.connected or not self._writer: + return False + await self._send_raw(f"PRIVMSG #{self.channel_name} :{message}") + logger.info("Sent Twitch chat message to #%s", self.channel_name) + return True + + def status(self) -> dict: + """Return connection status safe for API responses.""" + return { + "configured": True, + "connected": self.connected, + "channel": self.channel_name, + "bot_username": self.bot_username, + "last_error": self._last_error, + } + + async def _listen(self) -> None: + if not self._reader: + raise RuntimeError("Twitch IRC reader is not connected") + + while not self._stop_event.is_set(): + line = await self._read_line() + if await self._handle_control_line(line): + continue + message = parse_privmsg(line) + if message and self.on_message: + await self.on_message(message) + + async def _wait_for_login(self) -> None: + """Wait briefly for Twitch to accept or reject IRC authentication.""" + while True: + line = await asyncio.wait_for(self._read_line(), timeout=10) + if await self._handle_control_line(line): + continue + if "Login authentication failed" in line: + raise PermissionError("Twitch IRC authentication failed") + if " 001 " in line or "GLOBALUSERSTATE" in line or " JOIN #" in line: + return + + async def _read_line(self) -> str: + if not self._reader: + raise RuntimeError("Twitch IRC reader is not connected") + raw_line = await self._reader.readline() + if not raw_line: + raise ConnectionError("Twitch IRC connection closed") + return raw_line.decode("utf-8", errors="replace").strip() + + async def _handle_control_line(self, line: str) -> bool: + if line.startswith("PING"): + await self._send_raw("PONG :tmi.twitch.tv") + return True + return False + + async def _send_raw(self, command: str) -> None: + if not self._writer: + raise RuntimeError("Twitch IRC writer is not connected") + self._writer.write(f"{command}\r\n".encode("utf-8")) + await self._writer.drain() + + async def _close_writer(self) -> None: + self.connected = False + if self._writer: + self._writer.close() + with contextlib.suppress(Exception): + await self._writer.wait_closed() + self._reader = None + self._writer = None + + +_active_client: TwitchIRCClient | None = None + + +def set_active_client(client: TwitchIRCClient | None) -> None: + """Register the process-wide Twitch chat client used for outbound messages.""" + global _active_client + _active_client = client + async def send_chat_message( channel_name: str, @@ -12,26 +244,34 @@ async def send_chat_message( access_token: Optional[str] = None, ) -> bool: """ - Send a message to Twitch chat. - - Args: - channel_name: Twitch channel to send message to - message: Message content - access_token: OAuth token with chat:edit scope - - Returns: - True if message sent successfully - - TODO: Implement Twitch Send Chat Message API - Reference: https://dev.twitch.tv/docs/api/reference#send-chat-message - - TODO: Handle rate limiting (20 messages per 30 seconds for verified bots) - TODO: Implement message queue for reliable delivery - TODO: Add retry logic with exponential backoff + Send a message to Twitch chat through the active IRC client when available. + + The fallback creates a short-lived IRC connection for admin/test paths. """ - logger.info(f"Sending message to {channel_name}: {message[:50]}...") - # Stub implementation - return True + if _active_client and _active_client.channel_name == _channel_name(channel_name): + sent = await _active_client.send_message(message) + if sent: + return True + + token = access_token or settings.TWITCH_ACCESS_TOKEN + bot_username = settings.TWITCH_BOT_USERNAME + if not token or not bot_username: + logger.info("Twitch chat send skipped because credentials are incomplete") + return False + + client = TwitchIRCClient( + channel_name=channel_name, + bot_username=bot_username, + access_token=token, + ) + try: + await client.connect() + return await client.send_message(message) + except Exception as e: + logger.warning("Failed to send Twitch chat message: %s", e) + return False + finally: + await client.disconnect() class ChatMessageBuffer: diff --git a/docker-compose.yml b/docker-compose.yml index 184ff8d..b360377 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -38,6 +38,8 @@ services: TWITCH_CLIENT_SECRET: ${TWITCH_CLIENT_SECRET:-} TWITCH_BOT_USERNAME: ${TWITCH_BOT_USERNAME:-} TWITCH_CHANNEL_NAME: ${TWITCH_CHANNEL_NAME:-} + TWITCH_ACCESS_TOKEN: ${TWITCH_ACCESS_TOKEN:-} + TWITCH_CHAT_ENABLED: ${TWITCH_CHAT_ENABLED:-true} LLM_PROVIDER: ${LLM_PROVIDER:-} LLM_BASE_URL: ${LLM_BASE_URL:-} LLM_API_KEY: ${LLM_API_KEY:-}