From 76c4f456076dfe7a69fa89d57800b930d69bec1c Mon Sep 17 00:00:00 2001 From: Ken Schaefer Date: Tue, 12 May 2026 12:20:17 -0500 Subject: [PATCH] Add current stream operator status --- app/agent/orchestrator.py | 66 ++++++++++++++++++++++++- app/main.py | 100 ++++++++++++++++++++++++++++++++++++++ app/memory/repository.py | 27 ++++++++++ 3 files changed, 192 insertions(+), 1 deletion(-) diff --git a/app/agent/orchestrator.py b/app/agent/orchestrator.py index e65a9e3..1ed65a6 100644 --- a/app/agent/orchestrator.py +++ b/app/agent/orchestrator.py @@ -144,6 +144,11 @@ class AgentOrchestrator: ) message_count = await repo.count_messages(session.id) dashboard = await repo.get_dashboard(session.id) + last_hearthkeeper_action = await repo.get_latest_action( + session_id=session.id, + action_type=AgentActionType.RESPONSE, + mode="hearthkeeper", + ) last_activity_at = ( recent_messages[0].timestamp if recent_messages else session.started_at ) @@ -154,7 +159,11 @@ class AgentOrchestrator: "message_count": message_count, "theme": session.theme, "dashboard": Repository.serialize_dashboard(dashboard), - "last_hearthkeeper_prompt_at": None, + "last_hearthkeeper_prompt_at": ( + last_hearthkeeper_action.timestamp + if last_hearthkeeper_action + else None + ), } self.chat_activity.record_activity(session.id, occurred_at=last_activity_at) restored_count += 1 @@ -392,6 +401,61 @@ class AgentOrchestrator: "active_session_count": len(self.active_sessions), } + def get_hearthkeeper_runtime_status(self, session_id: str) -> dict: + """Get Hearthkeeper timing status for an active session.""" + session_info = self.active_sessions.get(session_id) + if not session_info: + return {} + + now = datetime.utcnow() + last_activity_at = self.chat_activity.last_activity_at(session_id) + last_prompt_at = session_info.get("last_hearthkeeper_prompt_at") + + next_from_activity = None + if last_activity_at: + next_from_activity = last_activity_at + self.chat_activity.inactivity_threshold + + next_from_prompt = None + if last_prompt_at: + next_from_prompt = last_prompt_at + self.hearthkeeper_prompt_interval + + next_eligible_at = max( + [candidate for candidate in (next_from_activity, next_from_prompt) if candidate], + default=None, + ) + seconds_until_next_prompt = None + if next_eligible_at: + seconds_until_next_prompt = max( + 0, + int((next_eligible_at - now).total_seconds()), + ) + quiet_enough = bool( + last_activity_at + and now - last_activity_at >= self.chat_activity.inactivity_threshold + ) + prompt_interval_elapsed = bool( + not last_prompt_at + or now - last_prompt_at >= self.hearthkeeper_prompt_interval + ) + + return { + "last_activity_at": last_activity_at.isoformat() if last_activity_at else None, + "last_hearthkeeper_prompt_at": ( + last_prompt_at.isoformat() if last_prompt_at else None + ), + "next_eligible_prompt_at": ( + next_eligible_at.isoformat() if next_eligible_at else None + ), + "seconds_until_next_prompt": seconds_until_next_prompt, + "inactivity_threshold_minutes": int( + self.chat_activity.inactivity_threshold.total_seconds() / 60 + ), + "prompt_interval_minutes": int( + self.hearthkeeper_prompt_interval.total_seconds() / 60 + ), + "can_prompt_now": quiet_enough and prompt_interval_elapsed, + } + async def tick(self) -> list[dict]: """Evaluate active sessions for time-based agent behavior.""" results = [] diff --git a/app/main.py b/app/main.py index 142c244..dc2deb9 100644 --- a/app/main.py +++ b/app/main.py @@ -12,6 +12,7 @@ from app.config import settings from app.agent.orchestrator import AgentOrchestrator from app.memory.database import init_db from app.memory.database import get_session as get_db_session +from app.memory.models import AgentActionType from app.memory.repository import Repository from app.exports.markdown import MarkdownExporter from app.twitch.chat import TwitchChatMessage, TwitchIRCClient, set_active_client @@ -286,6 +287,30 @@ def serialize_dashboard(dashboard) -> dict: return Repository.serialize_dashboard(dashboard) +def dashboard_status(dashboard: dict | None) -> dict: + """Return a compact dashboard status for operator checks.""" + if not dashboard: + return { + "present": False, + "stream_title": None, + "game": None, + "mood": None, + "content_angle": None, + "session_goal_count": 0, + "updated_at": None, + } + + return { + "present": True, + "stream_title": dashboard.get("stream_title"), + "game": dashboard.get("game"), + "mood": dashboard.get("mood"), + "content_angle": dashboard.get("content_angle"), + "session_goal_count": len(dashboard.get("session_goals") or []), + "updated_at": dashboard.get("updated_at"), + } + + @app.post("/admin/session/dashboard", dependencies=[Depends(require_admin)]) async def save_session_dashboard(request: DashboardRequest) -> dict: """Create or update the approved dashboard for a stream session.""" @@ -396,6 +421,81 @@ async def get_twitch_status() -> dict: } +@app.get("/admin/stream/status", dependencies=[Depends(require_admin)]) +async def get_current_stream_status() -> dict: + """Get one operator view of the current Twitch stream runtime state.""" + if not orchestrator: + raise HTTPException(status_code=503, detail="Orchestrator not initialized") + + session_id = twitch_session_id + if not session_id and len(orchestrator.active_sessions) == 1: + session_id = next(iter(orchestrator.active_sessions)) + if not session_id or session_id not in orchestrator.active_sessions: + raise HTTPException(status_code=404, detail="No active stream session found") + + session_info = orchestrator.active_sessions[session_id] + latest_human_message = None + latest_hearthkeeper_action = None + dashboard = None + async for db_session in get_db_session(): + repo = Repository(db_session) + dashboard = await repo.get_dashboard(session_id) + latest_human_message = await repo.get_latest_human_message(session_id) + latest_hearthkeeper_action = await repo.get_latest_action( + session_id=session_id, + action_type=AgentActionType.RESPONSE, + mode="hearthkeeper", + ) + + dashboard_data = serialize_dashboard(dashboard) + runtime = orchestrator.get_hearthkeeper_runtime_status(session_id) + twitch_status = twitch_chat_client.status() if twitch_chat_client else {} + + return { + "session": { + "id": session_id, + "channel": session_info["channel_name"], + "started_at": session_info["started_at"].isoformat(), + "uptime_seconds": ( + datetime.utcnow() - session_info["started_at"] + ).total_seconds(), + "message_count": session_info["message_count"], + "theme": session_info.get("theme"), + }, + "twitch": { + "configured": twitch_configured(), + "running": bool(twitch_chat_task and not twitch_chat_task.done()), + "session_id": twitch_session_id, + **twitch_status, + }, + "dashboard": dashboard_status(dashboard_data), + "loop": { + "running": bool(agent_loop_task and not agent_loop_task.done()), + "interval_seconds": orchestrator.loop_interval_seconds, + "active_session_count": len(orchestrator.active_sessions), + }, + "hearthkeeper": { + **runtime, + "last_human_chat_at": ( + latest_human_message.timestamp.isoformat() + if latest_human_message + else None + ), + "last_posted_at": ( + latest_hearthkeeper_action.timestamp.isoformat() + if latest_hearthkeeper_action + else None + ), + "last_posted_message": ( + latest_hearthkeeper_action.description + if latest_hearthkeeper_action + else None + ), + }, + "timestamp": datetime.utcnow().isoformat(), + } + + @app.post("/admin/loop/frequency", dependencies=[Depends(require_admin)]) async def set_loop_frequency(interval_seconds: float = Form(...)) -> dict: """Set how frequently the background agent loop runs.""" diff --git a/app/memory/repository.py b/app/memory/repository.py index e7752c2..d180103 100644 --- a/app/memory/repository.py +++ b/app/memory/repository.py @@ -205,6 +205,11 @@ class Repository: result = await self.session.execute(stmt) return list(result.scalars().all()) + async def get_latest_human_message(self, session_id: str) -> ChatMessage | None: + """Get the most recent non-bot chat message from a session.""" + messages = await self.get_recent_human_messages(session_id=session_id, limit=1) + return messages[0] if messages else None + async def count_messages(self, session_id: str) -> int: """Count chat messages stored for a session.""" stmt = select(func.count()).select_from(ChatMessage).where( @@ -280,6 +285,28 @@ class Repository: result = await self.session.execute(stmt) return list(result.scalars().all()) + async def get_latest_action( + self, + session_id: str, + action_type: AgentActionType | None = None, + mode: str | None = None, + ) -> AgentAction | None: + """Get the most recent action for a session, optionally filtered.""" + conditions = [AgentAction.session_id == session_id] + if action_type: + conditions.append(AgentAction.action_type == action_type) + if mode: + conditions.append(AgentAction.mode == mode) + + stmt = ( + select(AgentAction) + .where(*conditions) + .order_by(AgentAction.timestamp.desc()) + .limit(1) + ) + result = await self.session.execute(stmt) + return result.scalars().first() + # Clip Candidate operations async def add_clip_candidate(