"""Agent Orchestrator - Routes messages and manages agent modes.""" import logging from datetime import datetime, timedelta from app.agent.policies import ( ChatActivityPolicy, ResponseSuppression, SuspiciousContentPolicy, ) from app.agent.modes.hearthkeeper import HearthkeeperMode from app.agent.modes.steward import StewardMode from app.agent.modes.warden import WardenMode from app.agent.modes.librarian import LibrarianMode from app.agent.modes.scribe import ScribeMode from app.config import settings from app.llm.client import LLMClient from app.memory.database import get_session from app.memory.models import AgentActionType from app.memory.repository import Repository from app.twitch.chat import send_chat_message logger = logging.getLogger(__name__) class AgentOrchestrator: """ Main orchestrator for agent behavior. Routes chat messages to appropriate modes and manages responses. Implements policies for when to speak, when to stay silent, and how to flag suspicious content. """ def __init__(self, loop_interval_seconds: float = 60.0): """Initialize the orchestrator and all modes.""" self.llm_client = LLMClient() self.loop_interval_seconds = loop_interval_seconds # Initialize modes self.hearthkeeper = HearthkeeperMode(self.llm_client) self.steward = StewardMode(self.llm_client) self.warden = WardenMode(self.llm_client) self.librarian = LibrarianMode(self.llm_client) self.scribe = ScribeMode(self.llm_client) # Initialize policies self.chat_activity = ChatActivityPolicy(inactivity_threshold_minutes=15) self.response_suppression = ResponseSuppression() self.suspicious_content = SuspiciousContentPolicy() # Track active sessions self.active_sessions: dict[str, dict] = {} logger.info("AgentOrchestrator initialized with all modes and policies") async def start_session(self, channel_name: str) -> str: """ Start a new stream session. Args: channel_name: Twitch channel name Returns: Session ID """ session_id: str | None = None async for db_session in get_session(): repo = Repository(db_session) session_id = await repo.create_session(channel_name) if session_id is None: raise RuntimeError("Failed to create stream session") self.active_sessions[session_id] = { "channel_name": channel_name, "started_at": datetime.utcnow(), "message_count": 0, "theme": None, "last_hearthkeeper_prompt_at": None, } self.chat_activity.record_activity(session_id) logger.info(f"Started session {session_id} for {channel_name}") return session_id async def restore_active_sessions(self) -> int: """Restore active sessions from the database after app startup.""" restored_count = 0 async for db_session in get_session(): repo = Repository(db_session) sessions = await repo.get_active_sessions() for session in sessions: recent_messages = await repo.get_recent_messages(session.id, limit=1) message_count = await repo.count_messages(session.id) last_activity_at = ( recent_messages[0].timestamp if recent_messages else session.started_at ) self.active_sessions[session.id] = { "channel_name": session.channel_name, "started_at": session.started_at, "message_count": message_count, "theme": session.theme, "last_hearthkeeper_prompt_at": None, } self.chat_activity.record_activity(session.id, occurred_at=last_activity_at) restored_count += 1 logger.info(f"Restored {restored_count} active sessions") return restored_count async def end_session(self, session_id: str) -> None: """ End a stream session and trigger ledger generation. Args: session_id: Session ID """ if session_id not in self.active_sessions: logger.warning(f"Session {session_id} not found") return async for db_session in get_session(): repo = Repository(db_session) await repo.end_session(session_id) del self.active_sessions[session_id] logger.info(f"Ended session {session_id}") async def handle_chat_message( self, session_id: str, username: str, message: str, ) -> dict: """ Process a chat message and determine agent response. Args: session_id: Session ID username: Username of message sender message: Message content Returns: Response dict with agent_response, actions_taken, etc. """ if session_id not in self.active_sessions: logger.warning(f"Session {session_id} not found") return {"agent_response": None, "actions_taken": []} session_info = self.active_sessions[session_id] actions = [] agent_response = None async for db_session in get_session(): repo = Repository(db_session) # Store the message message_id = await repo.add_chat_message( session_id=session_id, username=username, content=message, is_bot=False, ) # Record activity self.chat_activity.record_activity(session_id) session_info["message_count"] += 1 session_info["last_hearthkeeper_prompt_at"] = None # 1. Warden always analyzes (passive mode) warden_result = await self.warden.analyze_message(message) if warden_result["is_suspicious"]: actions.append(f"WARDEN_FLAG: {warden_result['severity']}") async for db_session in get_session(): repo = Repository(db_session) await repo.record_action( session_id=session_id, action_type=AgentActionType.FLAG_SUSPICIOUS, mode="warden", description=f"Detected: {warden_result['patterns_detected']}", triggered_by_message_id=message_id, ) # 2. Check if we should suppress responses due to active chat recent_messages = [] async for db_session in get_session(): repo = Repository(db_session) recent_messages = await repo.get_messages_since( session_id=session_id, since=datetime.utcnow() - timedelta(minutes=1), ) if self.response_suppression.should_suppress_response(len(recent_messages)): logger.debug("Response suppressed due to active chat") return { "agent_response": agent_response, "actions_taken": actions, } # 3. Librarian: Archive important messages (passive) if len(message) > 50: # Archive longer messages await self.librarian.archive_message(message_id, message, username) logger.info( f"Message processed. Session: {session_id}, Actions: {actions}" ) return { "agent_response": agent_response, "actions_taken": actions, } async def emit_agent_response( self, session_id: str, message: str, mode: str, triggered_by_message_id: str | None = None, ) -> dict: """Send an agent response through the outbound chat boundary.""" session_info = self.active_sessions.get(session_id) if not session_info: logger.warning(f"Session {session_id} not found") return {"sent": False, "reason": "session_not_found"} channel_name = session_info["channel_name"] sent = await send_chat_message(channel_name=channel_name, message=message) bot_username = settings.TWITCH_BOT_USERNAME or "sanctum_chronicler" async for db_session in get_session(): repo = Repository(db_session) bot_message_id = await repo.add_chat_message( session_id=session_id, username=bot_username, content=message, is_bot=True, ) action_id = await repo.record_action( session_id=session_id, action_type=AgentActionType.RESPONSE, mode=mode, description=message, triggered_by_message_id=triggered_by_message_id, ) logger.info( f"Agent response emitted. Session: {session_id}, Mode: {mode}, Sent: {sent}" ) return { "sent": sent, "channel": channel_name, "message_id": bot_message_id, "action_id": action_id, } def set_loop_interval(self, interval_seconds: float) -> None: """Update how frequently the background agent loop runs.""" if interval_seconds < 1: raise ValueError("Loop interval must be at least 1 second") self.loop_interval_seconds = interval_seconds def get_loop_status(self) -> dict: """Get background loop configuration and current session count.""" return { "interval_seconds": self.loop_interval_seconds, "active_session_count": len(self.active_sessions), } async def tick(self) -> list[dict]: """Evaluate active sessions for time-based agent behavior.""" results = [] for session_id in list(self.active_sessions.keys()): result = await self._tick_session(session_id) if result: results.append(result) return results async def _tick_session(self, session_id: str) -> dict | None: """Evaluate a single active session during the background loop.""" session_info = self.active_sessions.get(session_id) if not session_info: return None recent_messages = [] async for db_session in get_session(): repo = Repository(db_session) recent_messages = await repo.get_messages_since( session_id=session_id, since=datetime.utcnow() - timedelta(minutes=1), ) if self.response_suppression.should_suppress_response(len(recent_messages)): return { "session_id": session_id, "actions_taken": [], "agent_response": None, "reason": "active_chat", } if not self.chat_activity.should_hearthkeeper_prompt(session_id): return None last_activity_at = self.chat_activity.last_activity_at(session_id) last_prompt_at = session_info.get("last_hearthkeeper_prompt_at") if last_prompt_at and last_activity_at and last_prompt_at >= last_activity_at: return None try: agent_response = await self.hearthkeeper.generate_prompt( theme=session_info.get("theme") ) session_info["last_hearthkeeper_prompt_at"] = datetime.utcnow() delivery = await self.emit_agent_response( session_id=session_id, message=agent_response, mode="hearthkeeper", ) return { "session_id": session_id, "actions_taken": ["HEARTHKEEPER_PROMPT"], "agent_response": agent_response, "delivery": delivery, "reason": "inactive_chat", } except Exception as e: logger.error(f"Error in Hearthkeeper loop: {e}") return { "session_id": session_id, "actions_taken": [], "agent_response": None, "reason": "hearthkeeper_error", } async def get_session_status(self, session_id: str) -> dict: """Get status of a session.""" if session_id not in self.active_sessions: return {} session = self.active_sessions[session_id] return { "session_id": session_id, "channel_name": session["channel_name"], "message_count": session["message_count"], "uptime_seconds": (datetime.utcnow() - session["started_at"]).total_seconds(), "theme": session.get("theme"), }