diff --git a/app/agent/orchestrator.py b/app/agent/orchestrator.py index 1ed65a6..f884f59 100644 --- a/app/agent/orchestrator.py +++ b/app/agent/orchestrator.py @@ -1,6 +1,7 @@ """Agent Orchestrator - Routes messages and manages agent modes.""" import logging +from collections.abc import Awaitable, Callable from datetime import datetime, timedelta from app.agent.policies import ( @@ -54,9 +55,27 @@ class AgentOrchestrator: # Track active sessions self.active_sessions: dict[str, dict] = {} + self.chat_interaction_gate: Callable[[str], Awaitable[bool]] | None = None logger.info("AgentOrchestrator initialized with all modes and policies") + def set_chat_interaction_gate( + self, + gate: Callable[[str], Awaitable[bool]] | None, + ) -> None: + """Set an async gate that must pass before the agent can post to chat.""" + self.chat_interaction_gate = gate + + async def can_interact_with_chat(self, channel_name: str) -> bool: + """Return whether outbound chat interaction is currently allowed.""" + if not self.chat_interaction_gate: + return True + try: + return await self.chat_interaction_gate(channel_name) + except Exception as e: + logger.warning("Chat interaction gate failed closed: %s", e) + return False + async def start_session(self, channel_name: str) -> str: """ Start a new stream session. @@ -287,6 +306,13 @@ class AgentOrchestrator: return {"sent": False, "reason": "session_not_found"} channel_name = session_info["channel_name"] + if not await self.can_interact_with_chat(channel_name): + logger.info( + "Agent response suppressed because stream is not live. Session: %s", + session_id, + ) + return {"sent": False, "reason": "stream_not_live"} + sent = await send_chat_message(channel_name=channel_name, message=message) bot_username = settings.TWITCH_BOT_USERNAME or "sanctum_chronicler" @@ -495,6 +521,9 @@ class AgentOrchestrator: if not self.chat_activity.should_hearthkeeper_prompt(session_id): return None + if not await self.can_interact_with_chat(session_info["channel_name"]): + return None + last_activity_at = self.chat_activity.last_activity_at(session_id) last_prompt_at = session_info.get("last_hearthkeeper_prompt_at") if last_activity_at and last_prompt_at and last_activity_at > last_prompt_at: diff --git a/app/config.py b/app/config.py index 25f2b57..6fd92fd 100644 --- a/app/config.py +++ b/app/config.py @@ -48,6 +48,8 @@ class Settings(BaseSettings): TWITCH_CHANNEL_NAME: Optional[str] = None TWITCH_ACCESS_TOKEN: Optional[str] = None TWITCH_CHAT_ENABLED: bool = True + TWITCH_REQUIRE_LIVE_STREAM: bool = True + TWITCH_LIVE_STATUS_CACHE_SECONDS: int = 60 # LLM LLM_PROVIDER: Optional[str] = None # "openai", "ollama", "lm_studio", or None diff --git a/app/main.py b/app/main.py index 4d8ad46..9c58de1 100644 --- a/app/main.py +++ b/app/main.py @@ -16,6 +16,7 @@ from app.memory.models import AgentActionType from app.memory.repository import Repository from app.exports.markdown import MarkdownExporter from app.twitch.chat import TwitchChatMessage, TwitchIRCClient, set_active_client +from app.twitch.live import TwitchLiveStatusService logger = logging.getLogger(__name__) @@ -45,6 +46,7 @@ agent_loop_task: asyncio.Task | None = None twitch_chat_client: TwitchIRCClient | None = None twitch_chat_task: asyncio.Task | None = None twitch_session_id: str | None = None +twitch_live_status: TwitchLiveStatusService | None = None async def require_admin( @@ -88,6 +90,16 @@ def twitch_configured() -> bool: ) +async def channel_is_live_for_chat(channel_name: str) -> bool: + """Return whether the agent may interact with chat for a channel.""" + if not settings.TWITCH_REQUIRE_LIVE_STREAM: + return True + if not twitch_live_status: + return False + status = await twitch_live_status.get_status(channel_name) + return status.is_live + + async def get_or_create_twitch_session(channel_name: str) -> str: """Use an active session for the Twitch channel, or create one.""" if not orchestrator: @@ -140,12 +152,16 @@ async def start_twitch_chat() -> None: @app.on_event("startup") async def startup_event(): """Initialize database and services on startup.""" - global orchestrator, agent_loop_task + global orchestrator, agent_loop_task, twitch_live_status try: await init_db() + twitch_live_status = TwitchLiveStatusService( + cache_seconds=settings.TWITCH_LIVE_STATUS_CACHE_SECONDS + ) orchestrator = AgentOrchestrator( loop_interval_seconds=settings.AGENT_LOOP_INTERVAL_SECONDS ) + orchestrator.set_chat_interaction_gate(channel_is_live_for_chat) await orchestrator.restore_active_sessions() await start_twitch_chat() agent_loop_task = asyncio.create_task(agent_loop()) @@ -158,7 +174,7 @@ async def startup_event(): @app.on_event("shutdown") async def shutdown_event(): """Clean up resources on shutdown.""" - global twitch_chat_client + global twitch_chat_client, twitch_live_status if twitch_chat_task: twitch_chat_task.cancel() @@ -168,6 +184,7 @@ async def shutdown_event(): await twitch_chat_client.disconnect() set_active_client(None) twitch_chat_client = None + twitch_live_status = None if agent_loop_task: agent_loop_task.cancel() with suppress(asyncio.CancelledError): @@ -470,9 +487,16 @@ async def get_twitch_status() -> dict: """Get Twitch chat connection status.""" configured = twitch_configured() client_status = twitch_chat_client.status() if twitch_chat_client else {} + live_status = None + if twitch_live_status and settings.TWITCH_CHANNEL_NAME: + live_status = ( + await twitch_live_status.get_status(settings.TWITCH_CHANNEL_NAME) + ).to_dict() return { "configured": configured, "running": bool(twitch_chat_task and not twitch_chat_task.done()), + "require_live_stream": settings.TWITCH_REQUIRE_LIVE_STREAM, + "live_status": live_status, "session_id": twitch_session_id, **client_status, "timestamp": datetime.utcnow().isoformat(), @@ -508,6 +532,15 @@ async def get_current_stream_status() -> dict: dashboard_data = serialize_dashboard(dashboard) runtime = orchestrator.get_hearthkeeper_runtime_status(session_id) twitch_status = twitch_chat_client.status() if twitch_chat_client else {} + live_status = None + if twitch_live_status: + live_status = ( + await twitch_live_status.get_status(session_info["channel_name"]) + ).to_dict() + chat_interaction_allowed = ( + not settings.TWITCH_REQUIRE_LIVE_STREAM + or bool(live_status and live_status["is_live"]) + ) return { "session": { @@ -523,6 +556,8 @@ async def get_current_stream_status() -> dict: "twitch": { "configured": twitch_configured(), "running": bool(twitch_chat_task and not twitch_chat_task.done()), + "require_live_stream": settings.TWITCH_REQUIRE_LIVE_STREAM, + "live_status": live_status, "session_id": twitch_session_id, **twitch_status, }, @@ -534,6 +569,7 @@ async def get_current_stream_status() -> dict: }, "hearthkeeper": { **runtime, + "chat_interaction_allowed": chat_interaction_allowed, "last_human_chat_at": ( latest_human_message.timestamp.isoformat() if latest_human_message diff --git a/app/twitch/live.py b/app/twitch/live.py new file mode 100644 index 0000000..3bd85f7 --- /dev/null +++ b/app/twitch/live.py @@ -0,0 +1,184 @@ +"""Twitch live-stream status checks.""" + +import logging +from dataclasses import dataclass +from datetime import datetime, timedelta + +import httpx + +from app.config import settings + +logger = logging.getLogger(__name__) + +TWITCH_STREAMS_URL = "https://api.twitch.tv/helix/streams" +TWITCH_TOKEN_URL = "https://id.twitch.tv/oauth2/token" + + +@dataclass +class TwitchLiveStatus: + """Cached Twitch live status for a channel.""" + + channel_name: str + is_live: bool + checked_at: datetime + title: str | None = None + game_name: str | None = None + started_at: str | None = None + viewer_count: int | None = None + reason: str | None = None + + def to_dict(self) -> dict: + """Return a response-safe representation.""" + return { + "channel": self.channel_name, + "is_live": self.is_live, + "checked_at": self.checked_at.isoformat(), + "title": self.title, + "game_name": self.game_name, + "started_at": self.started_at, + "viewer_count": self.viewer_count, + "reason": self.reason, + } + + +def _bearer_token(access_token: str) -> str: + """Normalize a Twitch token for Helix Authorization.""" + token = access_token.strip() + if token.startswith("oauth:"): + return token.removeprefix("oauth:") + return token + + +class TwitchLiveStatusService: + """Small cached client for Twitch Helix stream status.""" + + def __init__(self, cache_seconds: int = 60): + self.cache_duration = timedelta(seconds=max(1, cache_seconds)) + self._cache: dict[str, TwitchLiveStatus] = {} + self._app_access_token: str | None = None + self._app_access_token_expires_at: datetime | None = None + + async def get_status( + self, + channel_name: str, + force_refresh: bool = False, + ) -> TwitchLiveStatus: + """Return current live status for a channel, using a short cache.""" + normalized_channel = channel_name.strip().lower().lstrip("#") + now = datetime.utcnow() + cached = self._cache.get(normalized_channel) + if ( + cached + and not force_refresh + and now - cached.checked_at < self.cache_duration + ): + return cached + + status = await self._fetch_status(normalized_channel, now) + self._cache[normalized_channel] = status + return status + + async def _fetch_status( + self, + channel_name: str, + checked_at: datetime, + ) -> TwitchLiveStatus: + """Fetch live status from Twitch Helix.""" + if not settings.TWITCH_CLIENT_ID or not settings.TWITCH_ACCESS_TOKEN: + return TwitchLiveStatus( + channel_name=channel_name, + is_live=False, + checked_at=checked_at, + reason="twitch_api_not_configured", + ) + + headers = { + "Client-ID": settings.TWITCH_CLIENT_ID, + "Authorization": f"Bearer {_bearer_token(settings.TWITCH_ACCESS_TOKEN)}", + } + params = {"user_login": channel_name} + + try: + async with httpx.AsyncClient(timeout=10) as client: + response = await client.get( + TWITCH_STREAMS_URL, + headers=headers, + params=params, + ) + if response.status_code == 401: + app_access_token = await self._get_app_access_token(client) + if app_access_token: + headers["Authorization"] = f"Bearer {app_access_token}" + response = await client.get( + TWITCH_STREAMS_URL, + headers=headers, + params=params, + ) + response.raise_for_status() + except Exception as e: + logger.warning("Failed to check Twitch live status: %s", e) + return TwitchLiveStatus( + channel_name=channel_name, + is_live=False, + checked_at=checked_at, + reason="twitch_api_error", + ) + + streams = response.json().get("data", []) + if not streams: + return TwitchLiveStatus( + channel_name=channel_name, + is_live=False, + checked_at=checked_at, + reason="stream_offline", + ) + + stream = streams[0] + return TwitchLiveStatus( + channel_name=channel_name, + is_live=True, + checked_at=checked_at, + title=stream.get("title"), + game_name=stream.get("game_name"), + started_at=stream.get("started_at"), + viewer_count=stream.get("viewer_count"), + ) + + async def _get_app_access_token(self, client: httpx.AsyncClient) -> str | None: + """Get a Twitch app access token for Helix reads.""" + now = datetime.utcnow() + if ( + self._app_access_token + and self._app_access_token_expires_at + and now < self._app_access_token_expires_at + ): + return self._app_access_token + + if not settings.TWITCH_CLIENT_ID or not settings.TWITCH_CLIENT_SECRET: + return None + + try: + response = await client.post( + TWITCH_TOKEN_URL, + data={ + "client_id": settings.TWITCH_CLIENT_ID, + "client_secret": settings.TWITCH_CLIENT_SECRET, + "grant_type": "client_credentials", + }, + ) + response.raise_for_status() + except Exception as e: + logger.warning("Failed to get Twitch app access token: %s", e) + return None + + payload = response.json() + access_token = payload.get("access_token") + expires_in = int(payload.get("expires_in", 0)) + if not access_token: + return None + + self._app_access_token = access_token + self._app_access_token_expires_at = now + timedelta( + seconds=max(60, expires_in - 60) + ) + return access_token diff --git a/docker-compose.yml b/docker-compose.yml index b360377..ce5f55f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -40,6 +40,8 @@ services: TWITCH_CHANNEL_NAME: ${TWITCH_CHANNEL_NAME:-} TWITCH_ACCESS_TOKEN: ${TWITCH_ACCESS_TOKEN:-} TWITCH_CHAT_ENABLED: ${TWITCH_CHAT_ENABLED:-true} + TWITCH_REQUIRE_LIVE_STREAM: ${TWITCH_REQUIRE_LIVE_STREAM:-true} + TWITCH_LIVE_STATUS_CACHE_SECONDS: ${TWITCH_LIVE_STATUS_CACHE_SECONDS:-60} LLM_PROVIDER: ${LLM_PROVIDER:-} LLM_BASE_URL: ${LLM_BASE_URL:-} LLM_API_KEY: ${LLM_API_KEY:-}