Connect orchestrator to Twitch chat

This commit is contained in:
2026-05-12 10:44:58 -05:00
parent 9bc6a7a24e
commit c2d1c176df
4 changed files with 365 additions and 20 deletions

View File

@@ -46,6 +46,8 @@ class Settings(BaseSettings):
TWITCH_CLIENT_SECRET: Optional[str] = None TWITCH_CLIENT_SECRET: Optional[str] = None
TWITCH_BOT_USERNAME: Optional[str] = None TWITCH_BOT_USERNAME: Optional[str] = None
TWITCH_CHANNEL_NAME: Optional[str] = None TWITCH_CHANNEL_NAME: Optional[str] = None
TWITCH_ACCESS_TOKEN: Optional[str] = None
TWITCH_CHAT_ENABLED: bool = True
# LLM # LLM
LLM_PROVIDER: Optional[str] = None # "openai", "ollama", "lm_studio", or None LLM_PROVIDER: Optional[str] = None # "openai", "ollama", "lm_studio", or None

View File

@@ -14,6 +14,7 @@ from app.memory.database import init_db
from app.memory.database import get_session as get_db_session from app.memory.database import get_session as get_db_session
from app.memory.repository import Repository from app.memory.repository import Repository
from app.exports.markdown import MarkdownExporter from app.exports.markdown import MarkdownExporter
from app.twitch.chat import TwitchChatMessage, TwitchIRCClient, set_active_client
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -40,6 +41,9 @@ class DashboardRequest(BaseModel):
# Global orchestrator instance # Global orchestrator instance
orchestrator: AgentOrchestrator | None = None orchestrator: AgentOrchestrator | None = None
agent_loop_task: asyncio.Task | None = None agent_loop_task: asyncio.Task | None = None
twitch_chat_client: TwitchIRCClient | None = None
twitch_chat_task: asyncio.Task | None = None
twitch_session_id: str | None = None
async def require_admin( async def require_admin(
@@ -73,6 +77,78 @@ async def agent_loop() -> None:
await asyncio.sleep(orchestrator.loop_interval_seconds) await asyncio.sleep(orchestrator.loop_interval_seconds)
def twitch_configured() -> bool:
"""Return whether Twitch chat has enough runtime configuration to start."""
return bool(
settings.TWITCH_CHAT_ENABLED
and settings.TWITCH_CHANNEL_NAME
and settings.TWITCH_BOT_USERNAME
and settings.TWITCH_ACCESS_TOKEN
)
async def get_or_create_twitch_session(channel_name: str) -> str:
"""Use an active session for the Twitch channel, or create one."""
if not orchestrator:
raise RuntimeError("Orchestrator not initialized")
normalized_channel = channel_name.lower()
matching_sessions = [
(session_id, session)
for session_id, session in orchestrator.active_sessions.items()
if session.get("channel_name", "").lower() == normalized_channel
]
if matching_sessions:
session_id, _ = max(
matching_sessions,
key=lambda item: item[1].get("started_at", datetime.min),
)
return session_id
return await orchestrator.start_session(channel_name)
async def handle_twitch_chat_message(message: TwitchChatMessage) -> None:
"""Route a Twitch chat message into the orchestrator."""
if not orchestrator or not twitch_session_id:
return
if settings.TWITCH_BOT_USERNAME and (
message.username.lower() == settings.TWITCH_BOT_USERNAME.lower()
):
return
await orchestrator.handle_chat_message(
session_id=twitch_session_id,
username=message.display_name or message.username,
message=message.content,
)
async def start_twitch_chat() -> None:
"""Start Twitch chat monitoring when configured."""
global twitch_chat_client, twitch_chat_task, twitch_session_id
if not orchestrator or not twitch_configured():
logger.info("Twitch chat listener not started; configuration is incomplete")
return
twitch_session_id = await get_or_create_twitch_session(settings.TWITCH_CHANNEL_NAME)
twitch_chat_client = TwitchIRCClient(
channel_name=settings.TWITCH_CHANNEL_NAME,
bot_username=settings.TWITCH_BOT_USERNAME,
access_token=settings.TWITCH_ACCESS_TOKEN,
on_message=handle_twitch_chat_message,
)
set_active_client(twitch_chat_client)
twitch_chat_task = asyncio.create_task(twitch_chat_client.run())
logger.info(
"Twitch chat listener starting for %s on session %s",
settings.TWITCH_CHANNEL_NAME,
twitch_session_id,
)
@app.on_event("startup") @app.on_event("startup")
async def startup_event(): async def startup_event():
"""Initialize database and services on startup.""" """Initialize database and services on startup."""
@@ -84,6 +160,7 @@ async def startup_event():
) )
await orchestrator.restore_active_sessions() await orchestrator.restore_active_sessions()
agent_loop_task = asyncio.create_task(agent_loop()) agent_loop_task = asyncio.create_task(agent_loop())
await start_twitch_chat()
logger.info("Application started successfully") logger.info("Application started successfully")
except Exception as e: except Exception as e:
logger.error(f"Failed to start application: {e}") logger.error(f"Failed to start application: {e}")
@@ -93,6 +170,16 @@ async def startup_event():
@app.on_event("shutdown") @app.on_event("shutdown")
async def shutdown_event(): async def shutdown_event():
"""Clean up resources on shutdown.""" """Clean up resources on shutdown."""
global twitch_chat_client
if twitch_chat_task:
twitch_chat_task.cancel()
with suppress(asyncio.CancelledError):
await twitch_chat_task
if twitch_chat_client:
await twitch_chat_client.disconnect()
set_active_client(None)
twitch_chat_client = None
if agent_loop_task: if agent_loop_task:
agent_loop_task.cancel() agent_loop_task.cancel()
with suppress(asyncio.CancelledError): with suppress(asyncio.CancelledError):
@@ -308,6 +395,20 @@ async def get_loop_status() -> dict:
} }
@app.get("/admin/twitch/status", dependencies=[Depends(require_admin)])
async def get_twitch_status() -> dict:
"""Get Twitch chat connection status."""
configured = twitch_configured()
client_status = twitch_chat_client.status() if twitch_chat_client else {}
return {
"configured": configured,
"running": bool(twitch_chat_task and not twitch_chat_task.done()),
"session_id": twitch_session_id,
**client_status,
"timestamp": datetime.utcnow().isoformat(),
}
@app.post("/admin/loop/frequency", dependencies=[Depends(require_admin)]) @app.post("/admin/loop/frequency", dependencies=[Depends(require_admin)])
async def set_loop_frequency(interval_seconds: float = Form(...)) -> dict: async def set_loop_frequency(interval_seconds: float = Form(...)) -> dict:
"""Set how frequently the background agent loop runs.""" """Set how frequently the background agent loop runs."""

View File

@@ -1,10 +1,242 @@
"""Twitch chat client for sending and receiving messages.""" """Twitch chat client for sending and receiving messages over IRC."""
import asyncio
import contextlib
import logging import logging
import ssl
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from typing import Optional from typing import Optional
from app.config import settings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
TWITCH_IRC_HOST = "irc.chat.twitch.tv"
TWITCH_IRC_PORT = 6697
@dataclass
class TwitchChatMessage:
"""Parsed Twitch chat message."""
username: str
display_name: str
content: str
is_moderator: bool = False
def _normalize_token(access_token: str) -> str:
"""Return a Twitch IRC PASS token with the expected oauth: prefix."""
token = access_token.strip()
if token.startswith("oauth:"):
return token
return f"oauth:{token}"
def _channel_name(channel_name: str) -> str:
"""Normalize a channel name for IRC commands."""
return channel_name.strip().lower().lstrip("#")
def _parse_tags(raw_tags: str) -> dict[str, str]:
"""Parse IRCv3 tags from a Twitch message."""
tags = {}
for raw_tag in raw_tags.split(";"):
key, _, value = raw_tag.partition("=")
tags[key] = (
value.replace(r"\s", " ")
.replace(r"\:", ";")
.replace(r"\\", "\\")
.replace(r"\r", "\r")
.replace(r"\n", "\n")
)
return tags
def parse_privmsg(line: str) -> TwitchChatMessage | None:
"""Parse a Twitch IRC PRIVMSG line into a chat message."""
tags: dict[str, str] = {}
if line.startswith("@"):
raw_tags, _, line = line.partition(" ")
tags = _parse_tags(raw_tags[1:])
if not line.startswith(":"):
return None
prefix, _, rest = line[1:].partition(" ")
command, _, params = rest.partition(" ")
if command != "PRIVMSG":
return None
_, _, content = params.partition(" :")
username = prefix.split("!", 1)[0]
display_name = tags.get("display-name") or username
badges = tags.get("badges", "")
is_moderator = tags.get("mod") == "1" or "moderator/" in badges
if not username or not content:
return None
return TwitchChatMessage(
username=username,
display_name=display_name,
content=content,
is_moderator=is_moderator,
)
class TwitchIRCClient:
"""Small Twitch IRC client for one channel."""
def __init__(
self,
channel_name: str,
bot_username: str,
access_token: str,
on_message: Callable[[TwitchChatMessage], Awaitable[None]] | None = None,
):
self.channel_name = _channel_name(channel_name)
self.bot_username = bot_username.strip().lower()
self.access_token = _normalize_token(access_token)
self.on_message = on_message
self.connected = False
self._reader: asyncio.StreamReader | None = None
self._writer: asyncio.StreamWriter | None = None
self._stop_event = asyncio.Event()
self._last_error: str | None = None
@property
def last_error(self) -> str | None:
"""Return the most recent connection/listener error."""
return self._last_error
async def connect(self) -> None:
"""Open an IRC connection and join the configured channel."""
ssl_context = ssl.create_default_context()
self._reader, self._writer = await asyncio.open_connection(
TWITCH_IRC_HOST,
TWITCH_IRC_PORT,
ssl=ssl_context,
)
await self._send_raw(f"PASS {self.access_token}")
await self._send_raw(f"NICK {self.bot_username}")
await self._send_raw("CAP REQ :twitch.tv/tags twitch.tv/commands")
await self._send_raw(f"JOIN #{self.channel_name}")
await self._wait_for_login()
self.connected = True
self._last_error = None
logger.info("Connected to Twitch chat for #%s", self.channel_name)
async def disconnect(self) -> None:
"""Close the active IRC connection."""
self._stop_event.set()
self.connected = False
if self._writer:
self._writer.close()
with contextlib.suppress(Exception):
await self._writer.wait_closed()
self._reader = None
self._writer = None
async def run(self) -> None:
"""Run the chat listener with retry on connection loss."""
backoff_seconds = 5
while not self._stop_event.is_set():
try:
await self.connect()
await self._listen()
except asyncio.CancelledError:
raise
except Exception as e:
self.connected = False
self._last_error = str(e)
logger.warning("Twitch chat listener disconnected: %s", e)
finally:
await self._close_writer()
if not self._stop_event.is_set():
await asyncio.sleep(backoff_seconds)
async def send_message(self, message: str) -> bool:
"""Send a chat message to the configured channel."""
if not self.connected or not self._writer:
return False
await self._send_raw(f"PRIVMSG #{self.channel_name} :{message}")
logger.info("Sent Twitch chat message to #%s", self.channel_name)
return True
def status(self) -> dict:
"""Return connection status safe for API responses."""
return {
"configured": True,
"connected": self.connected,
"channel": self.channel_name,
"bot_username": self.bot_username,
"last_error": self._last_error,
}
async def _listen(self) -> None:
if not self._reader:
raise RuntimeError("Twitch IRC reader is not connected")
while not self._stop_event.is_set():
line = await self._read_line()
if await self._handle_control_line(line):
continue
message = parse_privmsg(line)
if message and self.on_message:
await self.on_message(message)
async def _wait_for_login(self) -> None:
"""Wait briefly for Twitch to accept or reject IRC authentication."""
while True:
line = await asyncio.wait_for(self._read_line(), timeout=10)
if await self._handle_control_line(line):
continue
if "Login authentication failed" in line:
raise PermissionError("Twitch IRC authentication failed")
if " 001 " in line or "GLOBALUSERSTATE" in line or " JOIN #" in line:
return
async def _read_line(self) -> str:
if not self._reader:
raise RuntimeError("Twitch IRC reader is not connected")
raw_line = await self._reader.readline()
if not raw_line:
raise ConnectionError("Twitch IRC connection closed")
return raw_line.decode("utf-8", errors="replace").strip()
async def _handle_control_line(self, line: str) -> bool:
if line.startswith("PING"):
await self._send_raw("PONG :tmi.twitch.tv")
return True
return False
async def _send_raw(self, command: str) -> None:
if not self._writer:
raise RuntimeError("Twitch IRC writer is not connected")
self._writer.write(f"{command}\r\n".encode("utf-8"))
await self._writer.drain()
async def _close_writer(self) -> None:
self.connected = False
if self._writer:
self._writer.close()
with contextlib.suppress(Exception):
await self._writer.wait_closed()
self._reader = None
self._writer = None
_active_client: TwitchIRCClient | None = None
def set_active_client(client: TwitchIRCClient | None) -> None:
"""Register the process-wide Twitch chat client used for outbound messages."""
global _active_client
_active_client = client
async def send_chat_message( async def send_chat_message(
channel_name: str, channel_name: str,
@@ -12,27 +244,35 @@ async def send_chat_message(
access_token: Optional[str] = None, access_token: Optional[str] = None,
) -> bool: ) -> bool:
""" """
Send a message to Twitch chat. Send a message to Twitch chat through the active IRC client when available.
Args: The fallback creates a short-lived IRC connection for admin/test paths.
channel_name: Twitch channel to send message to
message: Message content
access_token: OAuth token with chat:edit scope
Returns:
True if message sent successfully
TODO: Implement Twitch Send Chat Message API
Reference: https://dev.twitch.tv/docs/api/reference#send-chat-message
TODO: Handle rate limiting (20 messages per 30 seconds for verified bots)
TODO: Implement message queue for reliable delivery
TODO: Add retry logic with exponential backoff
""" """
logger.info(f"Sending message to {channel_name}: {message[:50]}...") if _active_client and _active_client.channel_name == _channel_name(channel_name):
# Stub implementation sent = await _active_client.send_message(message)
if sent:
return True return True
token = access_token or settings.TWITCH_ACCESS_TOKEN
bot_username = settings.TWITCH_BOT_USERNAME
if not token or not bot_username:
logger.info("Twitch chat send skipped because credentials are incomplete")
return False
client = TwitchIRCClient(
channel_name=channel_name,
bot_username=bot_username,
access_token=token,
)
try:
await client.connect()
return await client.send_message(message)
except Exception as e:
logger.warning("Failed to send Twitch chat message: %s", e)
return False
finally:
await client.disconnect()
class ChatMessageBuffer: class ChatMessageBuffer:
""" """

View File

@@ -38,6 +38,8 @@ services:
TWITCH_CLIENT_SECRET: ${TWITCH_CLIENT_SECRET:-} TWITCH_CLIENT_SECRET: ${TWITCH_CLIENT_SECRET:-}
TWITCH_BOT_USERNAME: ${TWITCH_BOT_USERNAME:-} TWITCH_BOT_USERNAME: ${TWITCH_BOT_USERNAME:-}
TWITCH_CHANNEL_NAME: ${TWITCH_CHANNEL_NAME:-} TWITCH_CHANNEL_NAME: ${TWITCH_CHANNEL_NAME:-}
TWITCH_ACCESS_TOKEN: ${TWITCH_ACCESS_TOKEN:-}
TWITCH_CHAT_ENABLED: ${TWITCH_CHAT_ENABLED:-true}
LLM_PROVIDER: ${LLM_PROVIDER:-} LLM_PROVIDER: ${LLM_PROVIDER:-}
LLM_BASE_URL: ${LLM_BASE_URL:-} LLM_BASE_URL: ${LLM_BASE_URL:-}
LLM_API_KEY: ${LLM_API_KEY:-} LLM_API_KEY: ${LLM_API_KEY:-}