Compare commits

...

7 Commits

8 changed files with 1003 additions and 34 deletions

View File

@@ -1,6 +1,7 @@
"""Agent Orchestrator - Routes messages and manages agent modes.""" """Agent Orchestrator - Routes messages and manages agent modes."""
import logging import logging
from collections.abc import Awaitable, Callable
from datetime import datetime, timedelta from datetime import datetime, timedelta
from app.agent.policies import ( from app.agent.policies import (
@@ -54,9 +55,27 @@ class AgentOrchestrator:
# Track active sessions # Track active sessions
self.active_sessions: dict[str, dict] = {} self.active_sessions: dict[str, dict] = {}
self.chat_interaction_gate: Callable[[str], Awaitable[bool]] | None = None
logger.info("AgentOrchestrator initialized with all modes and policies") logger.info("AgentOrchestrator initialized with all modes and policies")
def set_chat_interaction_gate(
self,
gate: Callable[[str], Awaitable[bool]] | None,
) -> None:
"""Set an async gate that must pass before the agent can post to chat."""
self.chat_interaction_gate = gate
async def can_interact_with_chat(self, channel_name: str) -> bool:
"""Return whether outbound chat interaction is currently allowed."""
if not self.chat_interaction_gate:
return True
try:
return await self.chat_interaction_gate(channel_name)
except Exception as e:
logger.warning("Chat interaction gate failed closed: %s", e)
return False
async def start_session(self, channel_name: str) -> str: async def start_session(self, channel_name: str) -> str:
""" """
Start a new stream session. Start a new stream session.
@@ -88,6 +107,47 @@ class AgentOrchestrator:
logger.info(f"Started session {session_id} for {channel_name}") logger.info(f"Started session {session_id} for {channel_name}")
return session_id return session_id
async def ensure_single_active_session_for_channel(self, channel_name: str) -> str:
"""Return one active session for a channel and end duplicate active sessions."""
normalized_channel = channel_name.lower()
matching_sessions = [
(session_id, session)
for session_id, session in self.active_sessions.items()
if session.get("channel_name", "").lower() == normalized_channel
]
if not matching_sessions:
return await self.start_session(channel_name)
keep_session_id, _ = max(
matching_sessions,
key=lambda item: item[1].get("started_at", datetime.min),
)
duplicate_session_ids = [
session_id
for session_id, _ in matching_sessions
if session_id != keep_session_id
]
if duplicate_session_ids:
async for db_session in get_session():
repo = Repository(db_session)
for session_id in duplicate_session_ids:
await repo.end_session(session_id)
for session_id in duplicate_session_ids:
self.active_sessions.pop(session_id, None)
self.chat_activity.clear_activity(session_id)
logger.warning(
"Ended %s duplicate active session(s) for channel %s; keeping %s",
len(duplicate_session_ids),
channel_name,
keep_session_id,
)
return keep_session_id
async def restore_active_sessions(self) -> int: async def restore_active_sessions(self) -> int:
"""Restore active sessions from the database after app startup.""" """Restore active sessions from the database after app startup."""
restored_count = 0 restored_count = 0
@@ -103,6 +163,11 @@ class AgentOrchestrator:
) )
message_count = await repo.count_messages(session.id) message_count = await repo.count_messages(session.id)
dashboard = await repo.get_dashboard(session.id) dashboard = await repo.get_dashboard(session.id)
last_hearthkeeper_action = await repo.get_latest_action(
session_id=session.id,
action_type=AgentActionType.RESPONSE,
mode="hearthkeeper",
)
last_activity_at = ( last_activity_at = (
recent_messages[0].timestamp if recent_messages else session.started_at recent_messages[0].timestamp if recent_messages else session.started_at
) )
@@ -113,7 +178,11 @@ class AgentOrchestrator:
"message_count": message_count, "message_count": message_count,
"theme": session.theme, "theme": session.theme,
"dashboard": Repository.serialize_dashboard(dashboard), "dashboard": Repository.serialize_dashboard(dashboard),
"last_hearthkeeper_prompt_at": None, "last_hearthkeeper_prompt_at": (
last_hearthkeeper_action.timestamp
if last_hearthkeeper_action
else None
),
} }
self.chat_activity.record_activity(session.id, occurred_at=last_activity_at) self.chat_activity.record_activity(session.id, occurred_at=last_activity_at)
restored_count += 1 restored_count += 1
@@ -237,6 +306,13 @@ class AgentOrchestrator:
return {"sent": False, "reason": "session_not_found"} return {"sent": False, "reason": "session_not_found"}
channel_name = session_info["channel_name"] channel_name = session_info["channel_name"]
if not await self.can_interact_with_chat(channel_name):
logger.info(
"Agent response suppressed because stream is not live. Session: %s",
session_id,
)
return {"sent": False, "reason": "stream_not_live"}
sent = await send_chat_message(channel_name=channel_name, message=message) sent = await send_chat_message(channel_name=channel_name, message=message)
bot_username = settings.TWITCH_BOT_USERNAME or "sanctum_chronicler" bot_username = settings.TWITCH_BOT_USERNAME or "sanctum_chronicler"
@@ -322,6 +398,38 @@ class AgentOrchestrator:
"third_tick_after_interval": third_tick, "third_tick_after_interval": third_tick,
} }
async def preview_hearthkeeper_prompt(self, session_id: str) -> dict:
"""Generate a Hearthkeeper prompt preview without sending or recording it."""
session_info = self.active_sessions.get(session_id)
if not session_info:
return {"generated": False, "reason": "session_not_found"}
recent_discussion_messages = []
async for db_session in get_session():
repo = Repository(db_session)
recent_discussion_messages = await repo.get_recent_human_messages(
session_id=session_id,
limit=5,
)
recent_discussion = [
message.content for message in recent_discussion_messages[:5]
]
agent_response = await self.hearthkeeper.generate_prompt(
theme=session_info.get("theme"),
dashboard=session_info.get("dashboard"),
recent_discussion=recent_discussion,
)
return {
"generated": True,
"session_id": session_id,
"agent_response": agent_response,
"theme": session_info.get("theme"),
"dashboard": session_info.get("dashboard"),
"recent_discussion": recent_discussion,
}
async def _count_response_actions(self, session_id: str, mode: str) -> int: async def _count_response_actions(self, session_id: str, mode: str) -> int:
"""Count response actions for a mode in a session.""" """Count response actions for a mode in a session."""
async for db_session in get_session(): async for db_session in get_session():
@@ -351,6 +459,61 @@ class AgentOrchestrator:
"active_session_count": len(self.active_sessions), "active_session_count": len(self.active_sessions),
} }
def get_hearthkeeper_runtime_status(self, session_id: str) -> dict:
"""Get Hearthkeeper timing status for an active session."""
session_info = self.active_sessions.get(session_id)
if not session_info:
return {}
now = datetime.utcnow()
last_activity_at = self.chat_activity.last_activity_at(session_id)
last_prompt_at = session_info.get("last_hearthkeeper_prompt_at")
next_from_activity = None
if last_activity_at:
next_from_activity = last_activity_at + self.chat_activity.inactivity_threshold
next_from_prompt = None
if last_prompt_at:
next_from_prompt = last_prompt_at + self.hearthkeeper_prompt_interval
next_eligible_at = max(
[candidate for candidate in (next_from_activity, next_from_prompt) if candidate],
default=None,
)
seconds_until_next_prompt = None
if next_eligible_at:
seconds_until_next_prompt = max(
0,
int((next_eligible_at - now).total_seconds()),
)
quiet_enough = bool(
last_activity_at
and now - last_activity_at >= self.chat_activity.inactivity_threshold
)
prompt_interval_elapsed = bool(
not last_prompt_at
or now - last_prompt_at >= self.hearthkeeper_prompt_interval
)
return {
"last_activity_at": last_activity_at.isoformat() if last_activity_at else None,
"last_hearthkeeper_prompt_at": (
last_prompt_at.isoformat() if last_prompt_at else None
),
"next_eligible_prompt_at": (
next_eligible_at.isoformat() if next_eligible_at else None
),
"seconds_until_next_prompt": seconds_until_next_prompt,
"inactivity_threshold_minutes": int(
self.chat_activity.inactivity_threshold.total_seconds() / 60
),
"prompt_interval_minutes": int(
self.hearthkeeper_prompt_interval.total_seconds() / 60
),
"can_prompt_now": quiet_enough and prompt_interval_elapsed,
}
async def tick(self) -> list[dict]: async def tick(self) -> list[dict]:
"""Evaluate active sessions for time-based agent behavior.""" """Evaluate active sessions for time-based agent behavior."""
results = [] results = []
@@ -390,6 +553,9 @@ class AgentOrchestrator:
if not self.chat_activity.should_hearthkeeper_prompt(session_id): if not self.chat_activity.should_hearthkeeper_prompt(session_id):
return None return None
if not await self.can_interact_with_chat(session_info["channel_name"]):
return None
last_activity_at = self.chat_activity.last_activity_at(session_id) last_activity_at = self.chat_activity.last_activity_at(session_id)
last_prompt_at = session_info.get("last_hearthkeeper_prompt_at") last_prompt_at = session_info.get("last_hearthkeeper_prompt_at")
if last_activity_at and last_prompt_at and last_activity_at > last_prompt_at: if last_activity_at and last_prompt_at and last_activity_at > last_prompt_at:

View File

@@ -27,6 +27,10 @@ class ChatActivityPolicy:
"""Get the most recent chat activity time for a session.""" """Get the most recent chat activity time for a session."""
return self.last_message_time.get(session_id) return self.last_message_time.get(session_id)
def clear_activity(self, session_id: str) -> None:
"""Stop tracking activity for an ended session."""
self.last_message_time.pop(session_id, None)
def minutes_since_activity(self, session_id: str) -> int: def minutes_since_activity(self, session_id: str) -> int:
"""Get minutes since last chat message.""" """Get minutes since last chat message."""
if session_id not in self.last_message_time: if session_id not in self.last_message_time:

View File

@@ -46,6 +46,11 @@ class Settings(BaseSettings):
TWITCH_CLIENT_SECRET: Optional[str] = None TWITCH_CLIENT_SECRET: Optional[str] = None
TWITCH_BOT_USERNAME: Optional[str] = None TWITCH_BOT_USERNAME: Optional[str] = None
TWITCH_CHANNEL_NAME: Optional[str] = None TWITCH_CHANNEL_NAME: Optional[str] = None
TWITCH_ACCESS_TOKEN: Optional[str] = None
TWITCH_CHAT_ENABLED: bool = True
TWITCH_REQUIRE_LIVE_STREAM: bool = True
TWITCH_LIVE_STATUS_CACHE_SECONDS: int = 60
TWITCH_OFFLINE_CHAT_TEST_MODE: bool = False
# LLM # LLM
LLM_PROVIDER: Optional[str] = None # "openai", "ollama", "lm_studio", or None LLM_PROVIDER: Optional[str] = None # "openai", "ollama", "lm_studio", or None

View File

@@ -12,8 +12,11 @@ from app.config import settings
from app.agent.orchestrator import AgentOrchestrator from app.agent.orchestrator import AgentOrchestrator
from app.memory.database import init_db from app.memory.database import init_db
from app.memory.database import get_session as get_db_session from app.memory.database import get_session as get_db_session
from app.memory.models import AgentActionType
from app.memory.repository import Repository from app.memory.repository import Repository
from app.exports.markdown import MarkdownExporter from app.exports.markdown import MarkdownExporter
from app.twitch.chat import TwitchChatMessage, TwitchIRCClient, set_active_client
from app.twitch.live import TwitchLiveStatusService
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -27,7 +30,7 @@ app = FastAPI(
class DashboardRequest(BaseModel): class DashboardRequest(BaseModel):
"""Request body for saving a stream dashboard.""" """Request body for saving a stream dashboard."""
session_id: str session_id: str | None = None
raw_markdown: str raw_markdown: str
stream_title: str | None = None stream_title: str | None = None
game: str | None = None game: str | None = None
@@ -40,6 +43,11 @@ class DashboardRequest(BaseModel):
# Global orchestrator instance # Global orchestrator instance
orchestrator: AgentOrchestrator | None = None orchestrator: AgentOrchestrator | None = None
agent_loop_task: asyncio.Task | None = None agent_loop_task: asyncio.Task | None = None
twitch_chat_client: TwitchIRCClient | None = None
twitch_chat_task: asyncio.Task | None = None
twitch_session_id: str | None = None
twitch_live_status: TwitchLiveStatusService | None = None
offline_chat_test_mode: bool = settings.TWITCH_OFFLINE_CHAT_TEST_MODE
async def require_admin( async def require_admin(
@@ -73,16 +81,92 @@ async def agent_loop() -> None:
await asyncio.sleep(orchestrator.loop_interval_seconds) await asyncio.sleep(orchestrator.loop_interval_seconds)
def twitch_configured() -> bool:
"""Return whether Twitch chat has enough runtime configuration to start."""
return bool(
settings.TWITCH_CHAT_ENABLED
and settings.TWITCH_CHANNEL_NAME
and settings.TWITCH_BOT_USERNAME
and settings.TWITCH_ACCESS_TOKEN
)
async def channel_is_live_for_chat(channel_name: str) -> bool:
"""Return whether the agent may interact with chat for a channel."""
if offline_chat_test_mode:
return True
if not settings.TWITCH_REQUIRE_LIVE_STREAM:
return True
if not twitch_live_status:
return False
status = await twitch_live_status.get_status(channel_name)
return status.is_live
async def get_or_create_twitch_session(channel_name: str) -> str:
"""Use an active session for the Twitch channel, or create one."""
if not orchestrator:
raise RuntimeError("Orchestrator not initialized")
return await orchestrator.ensure_single_active_session_for_channel(channel_name)
async def handle_twitch_chat_message(message: TwitchChatMessage) -> None:
"""Route a Twitch chat message into the orchestrator."""
if not orchestrator or not twitch_session_id:
return
if settings.TWITCH_BOT_USERNAME and (
message.username.lower() == settings.TWITCH_BOT_USERNAME.lower()
):
return
await orchestrator.handle_chat_message(
session_id=twitch_session_id,
username=message.display_name or message.username,
message=message.content,
)
async def start_twitch_chat() -> None:
"""Start Twitch chat monitoring when configured."""
global twitch_chat_client, twitch_chat_task, twitch_session_id
if not orchestrator or not twitch_configured():
logger.info("Twitch chat listener not started; configuration is incomplete")
return
twitch_session_id = await get_or_create_twitch_session(settings.TWITCH_CHANNEL_NAME)
twitch_chat_client = TwitchIRCClient(
channel_name=settings.TWITCH_CHANNEL_NAME,
bot_username=settings.TWITCH_BOT_USERNAME,
access_token=settings.TWITCH_ACCESS_TOKEN,
on_message=handle_twitch_chat_message,
)
set_active_client(twitch_chat_client)
twitch_chat_task = asyncio.create_task(twitch_chat_client.run())
logger.info(
"Twitch chat listener starting for %s on session %s",
settings.TWITCH_CHANNEL_NAME,
twitch_session_id,
)
@app.on_event("startup") @app.on_event("startup")
async def startup_event(): async def startup_event():
"""Initialize database and services on startup.""" """Initialize database and services on startup."""
global orchestrator, agent_loop_task global orchestrator, agent_loop_task, twitch_live_status
try: try:
await init_db() await init_db()
twitch_live_status = TwitchLiveStatusService(
cache_seconds=settings.TWITCH_LIVE_STATUS_CACHE_SECONDS
)
orchestrator = AgentOrchestrator( orchestrator = AgentOrchestrator(
loop_interval_seconds=settings.AGENT_LOOP_INTERVAL_SECONDS loop_interval_seconds=settings.AGENT_LOOP_INTERVAL_SECONDS
) )
orchestrator.set_chat_interaction_gate(channel_is_live_for_chat)
await orchestrator.restore_active_sessions() await orchestrator.restore_active_sessions()
await start_twitch_chat()
agent_loop_task = asyncio.create_task(agent_loop()) agent_loop_task = asyncio.create_task(agent_loop())
logger.info("Application started successfully") logger.info("Application started successfully")
except Exception as e: except Exception as e:
@@ -93,6 +177,17 @@ async def startup_event():
@app.on_event("shutdown") @app.on_event("shutdown")
async def shutdown_event(): async def shutdown_event():
"""Clean up resources on shutdown.""" """Clean up resources on shutdown."""
global twitch_chat_client, twitch_live_status
if twitch_chat_task:
twitch_chat_task.cancel()
with suppress(asyncio.CancelledError):
await twitch_chat_task
if twitch_chat_client:
await twitch_chat_client.disconnect()
set_active_client(None)
twitch_chat_client = None
twitch_live_status = None
if agent_loop_task: if agent_loop_task:
agent_loop_task.cancel() agent_loop_task.cancel()
with suppress(asyncio.CancelledError): with suppress(asyncio.CancelledError):
@@ -207,22 +302,91 @@ async def test_loop_inactivity(
} }
@app.post("/admin/hearthkeeper/preview", dependencies=[Depends(require_admin)])
async def preview_hearthkeeper_prompt(session_id: str | None = Form(None)) -> dict:
"""Generate a Hearthkeeper prompt preview without posting to Twitch."""
if not orchestrator:
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
resolved_session_id = session_id or get_current_stream_session_id()
if not resolved_session_id:
raise HTTPException(status_code=404, detail="No active stream session found")
result = await orchestrator.preview_hearthkeeper_prompt(resolved_session_id)
if result.get("reason") == "session_not_found":
raise HTTPException(status_code=404, detail="Active session not found")
live_status = None
session_info = orchestrator.active_sessions.get(resolved_session_id)
if twitch_live_status and session_info:
live_status = (
await twitch_live_status.get_status(session_info["channel_name"])
).to_dict()
return {
"status": "preview_generated" if result.get("generated") else "failed",
"preview": result,
"would_post_now": await orchestrator.can_interact_with_chat(
session_info["channel_name"]
) if session_info else False,
"live_status": live_status,
"timestamp": datetime.utcnow().isoformat(),
}
def serialize_dashboard(dashboard) -> dict: def serialize_dashboard(dashboard) -> dict:
"""Serialize a dashboard database model into an API response.""" """Serialize a dashboard database model into an API response."""
return Repository.serialize_dashboard(dashboard) return Repository.serialize_dashboard(dashboard)
@app.post("/admin/session/dashboard", dependencies=[Depends(require_admin)]) def dashboard_status(dashboard: dict | None) -> dict:
async def save_session_dashboard(request: DashboardRequest) -> dict: """Return a compact dashboard status for operator checks."""
"""Create or update the approved dashboard for a stream session.""" if not dashboard:
return {
"present": False,
"stream_title": None,
"game": None,
"mood": None,
"content_angle": None,
"session_goal_count": 0,
"updated_at": None,
}
return {
"present": True,
"stream_title": dashboard.get("stream_title"),
"game": dashboard.get("game"),
"mood": dashboard.get("mood"),
"content_angle": dashboard.get("content_angle"),
"session_goal_count": len(dashboard.get("session_goals") or []),
"updated_at": dashboard.get("updated_at"),
}
def get_current_stream_session_id() -> str | None:
"""Resolve the session currently bound to Twitch or the only active session."""
if not orchestrator:
return None
if twitch_session_id and twitch_session_id in orchestrator.active_sessions:
return twitch_session_id
if len(orchestrator.active_sessions) == 1:
return next(iter(orchestrator.active_sessions))
return None
async def save_dashboard_for_session(
session_id: str,
request: DashboardRequest,
) -> dict:
"""Persist a dashboard and refresh live orchestrator context."""
async for db_session in get_db_session(): async for db_session in get_db_session():
repo = Repository(db_session) repo = Repository(db_session)
stream_session = await repo.get_session(request.session_id) stream_session = await repo.get_session(session_id)
if not stream_session: if not stream_session:
raise HTTPException(status_code=404, detail="Session not found") raise HTTPException(status_code=404, detail="Session not found")
dashboard = await repo.upsert_dashboard( dashboard = await repo.upsert_dashboard(
session_id=request.session_id, session_id=session_id,
raw_markdown=request.raw_markdown, raw_markdown=request.raw_markdown,
stream_title=request.stream_title, stream_title=request.stream_title,
game=request.game, game=request.game,
@@ -233,17 +397,42 @@ async def save_session_dashboard(request: DashboardRequest) -> dict:
content_angle=request.content_angle, content_angle=request.content_angle,
) )
if orchestrator and request.session_id in orchestrator.active_sessions: dashboard_data = serialize_dashboard(dashboard)
orchestrator.active_sessions[request.session_id]["dashboard"] = ( if orchestrator and session_id in orchestrator.active_sessions:
serialize_dashboard(dashboard) orchestrator.active_sessions[session_id]["dashboard"] = dashboard_data
) orchestrator.active_sessions[session_id]["theme"] = (
orchestrator.active_sessions[request.session_id]["theme"] = (
request.content_angle or request.stream_title request.content_angle or request.stream_title
) )
return dashboard_data
@app.post("/admin/session/dashboard", dependencies=[Depends(require_admin)])
async def save_session_dashboard(request: DashboardRequest) -> dict:
"""Create or update the approved dashboard for a stream session."""
if not request.session_id:
raise HTTPException(status_code=400, detail="session_id is required")
dashboard = await save_dashboard_for_session(request.session_id, request)
return { return {
"status": "dashboard_saved", "status": "dashboard_saved",
"dashboard": serialize_dashboard(dashboard), "dashboard": dashboard,
"timestamp": datetime.utcnow().isoformat(),
}
@app.post("/admin/stream/dashboard", dependencies=[Depends(require_admin)])
async def save_current_stream_dashboard(request: DashboardRequest) -> dict:
"""Create or update the approved dashboard for the current stream session."""
session_id = get_current_stream_session_id()
if not session_id:
raise HTTPException(status_code=404, detail="No active stream session found")
dashboard = await save_dashboard_for_session(session_id, request)
return {
"status": "dashboard_saved",
"session_id": session_id,
"dashboard": dashboard,
"timestamp": datetime.utcnow().isoformat(), "timestamp": datetime.utcnow().isoformat(),
} }
@@ -263,6 +452,26 @@ async def get_session_dashboard(session_id: str) -> dict:
} }
@app.get("/admin/stream/dashboard", dependencies=[Depends(require_admin)])
async def get_current_stream_dashboard() -> dict:
"""Get the approved dashboard for the current stream session."""
session_id = get_current_stream_session_id()
if not session_id:
raise HTTPException(status_code=404, detail="No active stream session found")
async for db_session in get_db_session():
repo = Repository(db_session)
dashboard = await repo.get_dashboard(session_id)
if not dashboard:
raise HTTPException(status_code=404, detail="Dashboard not found")
return {
"session_id": session_id,
"dashboard": serialize_dashboard(dashboard),
"timestamp": datetime.utcnow().isoformat(),
}
@app.get("/admin/ledger", dependencies=[Depends(require_admin)]) @app.get("/admin/ledger", dependencies=[Depends(require_admin)])
async def get_ledger(session_id: str) -> dict: async def get_ledger(session_id: str) -> dict:
"""Get the markdown ledger for a session.""" """Get the markdown ledger for a session."""
@@ -308,6 +517,135 @@ async def get_loop_status() -> dict:
} }
@app.get("/admin/twitch/status", dependencies=[Depends(require_admin)])
async def get_twitch_status() -> dict:
"""Get Twitch chat connection status."""
configured = twitch_configured()
client_status = twitch_chat_client.status() if twitch_chat_client else {}
live_status = None
if twitch_live_status and settings.TWITCH_CHANNEL_NAME:
live_status = (
await twitch_live_status.get_status(settings.TWITCH_CHANNEL_NAME)
).to_dict()
return {
"configured": configured,
"running": bool(twitch_chat_task and not twitch_chat_task.done()),
"require_live_stream": settings.TWITCH_REQUIRE_LIVE_STREAM,
"offline_chat_test_mode": offline_chat_test_mode,
"live_status": live_status,
"session_id": twitch_session_id,
**client_status,
"timestamp": datetime.utcnow().isoformat(),
}
@app.post("/admin/twitch/offline-test-mode", dependencies=[Depends(require_admin)])
async def set_offline_chat_test_mode(enabled: bool = Form(...)) -> dict:
"""Allow or block Twitch chat posting while the stream is offline."""
global offline_chat_test_mode
offline_chat_test_mode = enabled
return {
"status": "offline_chat_test_mode_updated",
"offline_chat_test_mode": offline_chat_test_mode,
"warning": (
"Agent may post to Twitch chat while the stream is offline"
if offline_chat_test_mode
else None
),
"timestamp": datetime.utcnow().isoformat(),
}
@app.get("/admin/stream/status", dependencies=[Depends(require_admin)])
async def get_current_stream_status() -> dict:
"""Get one operator view of the current Twitch stream runtime state."""
if not orchestrator:
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
session_id = twitch_session_id
if not session_id and len(orchestrator.active_sessions) == 1:
session_id = next(iter(orchestrator.active_sessions))
if not session_id or session_id not in orchestrator.active_sessions:
raise HTTPException(status_code=404, detail="No active stream session found")
session_info = orchestrator.active_sessions[session_id]
latest_human_message = None
latest_hearthkeeper_action = None
dashboard = None
async for db_session in get_db_session():
repo = Repository(db_session)
dashboard = await repo.get_dashboard(session_id)
latest_human_message = await repo.get_latest_human_message(session_id)
latest_hearthkeeper_action = await repo.get_latest_action(
session_id=session_id,
action_type=AgentActionType.RESPONSE,
mode="hearthkeeper",
)
dashboard_data = serialize_dashboard(dashboard)
runtime = orchestrator.get_hearthkeeper_runtime_status(session_id)
twitch_status = twitch_chat_client.status() if twitch_chat_client else {}
live_status = None
if twitch_live_status:
live_status = (
await twitch_live_status.get_status(session_info["channel_name"])
).to_dict()
chat_interaction_allowed = (
offline_chat_test_mode
or not settings.TWITCH_REQUIRE_LIVE_STREAM
or bool(live_status and live_status["is_live"])
)
return {
"session": {
"id": session_id,
"channel": session_info["channel_name"],
"started_at": session_info["started_at"].isoformat(),
"uptime_seconds": (
datetime.utcnow() - session_info["started_at"]
).total_seconds(),
"message_count": session_info["message_count"],
"theme": session_info.get("theme"),
},
"twitch": {
"configured": twitch_configured(),
"running": bool(twitch_chat_task and not twitch_chat_task.done()),
"require_live_stream": settings.TWITCH_REQUIRE_LIVE_STREAM,
"offline_chat_test_mode": offline_chat_test_mode,
"live_status": live_status,
"session_id": twitch_session_id,
**twitch_status,
},
"dashboard": dashboard_status(dashboard_data),
"loop": {
"running": bool(agent_loop_task and not agent_loop_task.done()),
"interval_seconds": orchestrator.loop_interval_seconds,
"active_session_count": len(orchestrator.active_sessions),
},
"hearthkeeper": {
**runtime,
"chat_interaction_allowed": chat_interaction_allowed,
"last_human_chat_at": (
latest_human_message.timestamp.isoformat()
if latest_human_message
else None
),
"last_posted_at": (
latest_hearthkeeper_action.timestamp.isoformat()
if latest_hearthkeeper_action
else None
),
"last_posted_message": (
latest_hearthkeeper_action.description
if latest_hearthkeeper_action
else None
),
},
"timestamp": datetime.utcnow().isoformat(),
}
@app.post("/admin/loop/frequency", dependencies=[Depends(require_admin)]) @app.post("/admin/loop/frequency", dependencies=[Depends(require_admin)])
async def set_loop_frequency(interval_seconds: float = Form(...)) -> dict: async def set_loop_frequency(interval_seconds: float = Form(...)) -> dict:
"""Set how frequently the background agent loop runs.""" """Set how frequently the background agent loop runs."""

View File

@@ -205,6 +205,11 @@ class Repository:
result = await self.session.execute(stmt) result = await self.session.execute(stmt)
return list(result.scalars().all()) return list(result.scalars().all())
async def get_latest_human_message(self, session_id: str) -> ChatMessage | None:
"""Get the most recent non-bot chat message from a session."""
messages = await self.get_recent_human_messages(session_id=session_id, limit=1)
return messages[0] if messages else None
async def count_messages(self, session_id: str) -> int: async def count_messages(self, session_id: str) -> int:
"""Count chat messages stored for a session.""" """Count chat messages stored for a session."""
stmt = select(func.count()).select_from(ChatMessage).where( stmt = select(func.count()).select_from(ChatMessage).where(
@@ -280,6 +285,28 @@ class Repository:
result = await self.session.execute(stmt) result = await self.session.execute(stmt)
return list(result.scalars().all()) return list(result.scalars().all())
async def get_latest_action(
self,
session_id: str,
action_type: AgentActionType | None = None,
mode: str | None = None,
) -> AgentAction | None:
"""Get the most recent action for a session, optionally filtered."""
conditions = [AgentAction.session_id == session_id]
if action_type:
conditions.append(AgentAction.action_type == action_type)
if mode:
conditions.append(AgentAction.mode == mode)
stmt = (
select(AgentAction)
.where(*conditions)
.order_by(AgentAction.timestamp.desc())
.limit(1)
)
result = await self.session.execute(stmt)
return result.scalars().first()
# Clip Candidate operations # Clip Candidate operations
async def add_clip_candidate( async def add_clip_candidate(

View File

@@ -1,10 +1,242 @@
"""Twitch chat client for sending and receiving messages.""" """Twitch chat client for sending and receiving messages over IRC."""
import asyncio
import contextlib
import logging import logging
import ssl
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from typing import Optional from typing import Optional
from app.config import settings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
TWITCH_IRC_HOST = "irc.chat.twitch.tv"
TWITCH_IRC_PORT = 6697
@dataclass
class TwitchChatMessage:
"""Parsed Twitch chat message."""
username: str
display_name: str
content: str
is_moderator: bool = False
def _normalize_token(access_token: str) -> str:
"""Return a Twitch IRC PASS token with the expected oauth: prefix."""
token = access_token.strip()
if token.startswith("oauth:"):
return token
return f"oauth:{token}"
def _channel_name(channel_name: str) -> str:
"""Normalize a channel name for IRC commands."""
return channel_name.strip().lower().lstrip("#")
def _parse_tags(raw_tags: str) -> dict[str, str]:
"""Parse IRCv3 tags from a Twitch message."""
tags = {}
for raw_tag in raw_tags.split(";"):
key, _, value = raw_tag.partition("=")
tags[key] = (
value.replace(r"\s", " ")
.replace(r"\:", ";")
.replace(r"\\", "\\")
.replace(r"\r", "\r")
.replace(r"\n", "\n")
)
return tags
def parse_privmsg(line: str) -> TwitchChatMessage | None:
"""Parse a Twitch IRC PRIVMSG line into a chat message."""
tags: dict[str, str] = {}
if line.startswith("@"):
raw_tags, _, line = line.partition(" ")
tags = _parse_tags(raw_tags[1:])
if not line.startswith(":"):
return None
prefix, _, rest = line[1:].partition(" ")
command, _, params = rest.partition(" ")
if command != "PRIVMSG":
return None
_, _, content = params.partition(" :")
username = prefix.split("!", 1)[0]
display_name = tags.get("display-name") or username
badges = tags.get("badges", "")
is_moderator = tags.get("mod") == "1" or "moderator/" in badges
if not username or not content:
return None
return TwitchChatMessage(
username=username,
display_name=display_name,
content=content,
is_moderator=is_moderator,
)
class TwitchIRCClient:
"""Small Twitch IRC client for one channel."""
def __init__(
self,
channel_name: str,
bot_username: str,
access_token: str,
on_message: Callable[[TwitchChatMessage], Awaitable[None]] | None = None,
):
self.channel_name = _channel_name(channel_name)
self.bot_username = bot_username.strip().lower()
self.access_token = _normalize_token(access_token)
self.on_message = on_message
self.connected = False
self._reader: asyncio.StreamReader | None = None
self._writer: asyncio.StreamWriter | None = None
self._stop_event = asyncio.Event()
self._last_error: str | None = None
@property
def last_error(self) -> str | None:
"""Return the most recent connection/listener error."""
return self._last_error
async def connect(self) -> None:
"""Open an IRC connection and join the configured channel."""
ssl_context = ssl.create_default_context()
self._reader, self._writer = await asyncio.open_connection(
TWITCH_IRC_HOST,
TWITCH_IRC_PORT,
ssl=ssl_context,
)
await self._send_raw(f"PASS {self.access_token}")
await self._send_raw(f"NICK {self.bot_username}")
await self._send_raw("CAP REQ :twitch.tv/tags twitch.tv/commands")
await self._send_raw(f"JOIN #{self.channel_name}")
await self._wait_for_login()
self.connected = True
self._last_error = None
logger.info("Connected to Twitch chat for #%s", self.channel_name)
async def disconnect(self) -> None:
"""Close the active IRC connection."""
self._stop_event.set()
self.connected = False
if self._writer:
self._writer.close()
with contextlib.suppress(Exception):
await self._writer.wait_closed()
self._reader = None
self._writer = None
async def run(self) -> None:
"""Run the chat listener with retry on connection loss."""
backoff_seconds = 5
while not self._stop_event.is_set():
try:
await self.connect()
await self._listen()
except asyncio.CancelledError:
raise
except Exception as e:
self.connected = False
self._last_error = str(e)
logger.warning("Twitch chat listener disconnected: %s", e)
finally:
await self._close_writer()
if not self._stop_event.is_set():
await asyncio.sleep(backoff_seconds)
async def send_message(self, message: str) -> bool:
"""Send a chat message to the configured channel."""
if not self.connected or not self._writer:
return False
await self._send_raw(f"PRIVMSG #{self.channel_name} :{message}")
logger.info("Sent Twitch chat message to #%s", self.channel_name)
return True
def status(self) -> dict:
"""Return connection status safe for API responses."""
return {
"configured": True,
"connected": self.connected,
"channel": self.channel_name,
"bot_username": self.bot_username,
"last_error": self._last_error,
}
async def _listen(self) -> None:
if not self._reader:
raise RuntimeError("Twitch IRC reader is not connected")
while not self._stop_event.is_set():
line = await self._read_line()
if await self._handle_control_line(line):
continue
message = parse_privmsg(line)
if message and self.on_message:
await self.on_message(message)
async def _wait_for_login(self) -> None:
"""Wait briefly for Twitch to accept or reject IRC authentication."""
while True:
line = await asyncio.wait_for(self._read_line(), timeout=10)
if await self._handle_control_line(line):
continue
if "Login authentication failed" in line:
raise PermissionError("Twitch IRC authentication failed")
if " 001 " in line or "GLOBALUSERSTATE" in line or " JOIN #" in line:
return
async def _read_line(self) -> str:
if not self._reader:
raise RuntimeError("Twitch IRC reader is not connected")
raw_line = await self._reader.readline()
if not raw_line:
raise ConnectionError("Twitch IRC connection closed")
return raw_line.decode("utf-8", errors="replace").strip()
async def _handle_control_line(self, line: str) -> bool:
if line.startswith("PING"):
await self._send_raw("PONG :tmi.twitch.tv")
return True
return False
async def _send_raw(self, command: str) -> None:
if not self._writer:
raise RuntimeError("Twitch IRC writer is not connected")
self._writer.write(f"{command}\r\n".encode("utf-8"))
await self._writer.drain()
async def _close_writer(self) -> None:
self.connected = False
if self._writer:
self._writer.close()
with contextlib.suppress(Exception):
await self._writer.wait_closed()
self._reader = None
self._writer = None
_active_client: TwitchIRCClient | None = None
def set_active_client(client: TwitchIRCClient | None) -> None:
"""Register the process-wide Twitch chat client used for outbound messages."""
global _active_client
_active_client = client
async def send_chat_message( async def send_chat_message(
channel_name: str, channel_name: str,
@@ -12,26 +244,34 @@ async def send_chat_message(
access_token: Optional[str] = None, access_token: Optional[str] = None,
) -> bool: ) -> bool:
""" """
Send a message to Twitch chat. Send a message to Twitch chat through the active IRC client when available.
Args: The fallback creates a short-lived IRC connection for admin/test paths.
channel_name: Twitch channel to send message to
message: Message content
access_token: OAuth token with chat:edit scope
Returns:
True if message sent successfully
TODO: Implement Twitch Send Chat Message API
Reference: https://dev.twitch.tv/docs/api/reference#send-chat-message
TODO: Handle rate limiting (20 messages per 30 seconds for verified bots)
TODO: Implement message queue for reliable delivery
TODO: Add retry logic with exponential backoff
""" """
logger.info(f"Sending message to {channel_name}: {message[:50]}...") if _active_client and _active_client.channel_name == _channel_name(channel_name):
# Stub implementation sent = await _active_client.send_message(message)
return True if sent:
return True
token = access_token or settings.TWITCH_ACCESS_TOKEN
bot_username = settings.TWITCH_BOT_USERNAME
if not token or not bot_username:
logger.info("Twitch chat send skipped because credentials are incomplete")
return False
client = TwitchIRCClient(
channel_name=channel_name,
bot_username=bot_username,
access_token=token,
)
try:
await client.connect()
return await client.send_message(message)
except Exception as e:
logger.warning("Failed to send Twitch chat message: %s", e)
return False
finally:
await client.disconnect()
class ChatMessageBuffer: class ChatMessageBuffer:

184
app/twitch/live.py Normal file
View File

@@ -0,0 +1,184 @@
"""Twitch live-stream status checks."""
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
import httpx
from app.config import settings
logger = logging.getLogger(__name__)
TWITCH_STREAMS_URL = "https://api.twitch.tv/helix/streams"
TWITCH_TOKEN_URL = "https://id.twitch.tv/oauth2/token"
@dataclass
class TwitchLiveStatus:
"""Cached Twitch live status for a channel."""
channel_name: str
is_live: bool
checked_at: datetime
title: str | None = None
game_name: str | None = None
started_at: str | None = None
viewer_count: int | None = None
reason: str | None = None
def to_dict(self) -> dict:
"""Return a response-safe representation."""
return {
"channel": self.channel_name,
"is_live": self.is_live,
"checked_at": self.checked_at.isoformat(),
"title": self.title,
"game_name": self.game_name,
"started_at": self.started_at,
"viewer_count": self.viewer_count,
"reason": self.reason,
}
def _bearer_token(access_token: str) -> str:
"""Normalize a Twitch token for Helix Authorization."""
token = access_token.strip()
if token.startswith("oauth:"):
return token.removeprefix("oauth:")
return token
class TwitchLiveStatusService:
"""Small cached client for Twitch Helix stream status."""
def __init__(self, cache_seconds: int = 60):
self.cache_duration = timedelta(seconds=max(1, cache_seconds))
self._cache: dict[str, TwitchLiveStatus] = {}
self._app_access_token: str | None = None
self._app_access_token_expires_at: datetime | None = None
async def get_status(
self,
channel_name: str,
force_refresh: bool = False,
) -> TwitchLiveStatus:
"""Return current live status for a channel, using a short cache."""
normalized_channel = channel_name.strip().lower().lstrip("#")
now = datetime.utcnow()
cached = self._cache.get(normalized_channel)
if (
cached
and not force_refresh
and now - cached.checked_at < self.cache_duration
):
return cached
status = await self._fetch_status(normalized_channel, now)
self._cache[normalized_channel] = status
return status
async def _fetch_status(
self,
channel_name: str,
checked_at: datetime,
) -> TwitchLiveStatus:
"""Fetch live status from Twitch Helix."""
if not settings.TWITCH_CLIENT_ID or not settings.TWITCH_ACCESS_TOKEN:
return TwitchLiveStatus(
channel_name=channel_name,
is_live=False,
checked_at=checked_at,
reason="twitch_api_not_configured",
)
headers = {
"Client-ID": settings.TWITCH_CLIENT_ID,
"Authorization": f"Bearer {_bearer_token(settings.TWITCH_ACCESS_TOKEN)}",
}
params = {"user_login": channel_name}
try:
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get(
TWITCH_STREAMS_URL,
headers=headers,
params=params,
)
if response.status_code == 401:
app_access_token = await self._get_app_access_token(client)
if app_access_token:
headers["Authorization"] = f"Bearer {app_access_token}"
response = await client.get(
TWITCH_STREAMS_URL,
headers=headers,
params=params,
)
response.raise_for_status()
except Exception as e:
logger.warning("Failed to check Twitch live status: %s", e)
return TwitchLiveStatus(
channel_name=channel_name,
is_live=False,
checked_at=checked_at,
reason="twitch_api_error",
)
streams = response.json().get("data", [])
if not streams:
return TwitchLiveStatus(
channel_name=channel_name,
is_live=False,
checked_at=checked_at,
reason="stream_offline",
)
stream = streams[0]
return TwitchLiveStatus(
channel_name=channel_name,
is_live=True,
checked_at=checked_at,
title=stream.get("title"),
game_name=stream.get("game_name"),
started_at=stream.get("started_at"),
viewer_count=stream.get("viewer_count"),
)
async def _get_app_access_token(self, client: httpx.AsyncClient) -> str | None:
"""Get a Twitch app access token for Helix reads."""
now = datetime.utcnow()
if (
self._app_access_token
and self._app_access_token_expires_at
and now < self._app_access_token_expires_at
):
return self._app_access_token
if not settings.TWITCH_CLIENT_ID or not settings.TWITCH_CLIENT_SECRET:
return None
try:
response = await client.post(
TWITCH_TOKEN_URL,
data={
"client_id": settings.TWITCH_CLIENT_ID,
"client_secret": settings.TWITCH_CLIENT_SECRET,
"grant_type": "client_credentials",
},
)
response.raise_for_status()
except Exception as e:
logger.warning("Failed to get Twitch app access token: %s", e)
return None
payload = response.json()
access_token = payload.get("access_token")
expires_in = int(payload.get("expires_in", 0))
if not access_token:
return None
self._app_access_token = access_token
self._app_access_token_expires_at = now + timedelta(
seconds=max(60, expires_in - 60)
)
return access_token

View File

@@ -38,6 +38,11 @@ services:
TWITCH_CLIENT_SECRET: ${TWITCH_CLIENT_SECRET:-} TWITCH_CLIENT_SECRET: ${TWITCH_CLIENT_SECRET:-}
TWITCH_BOT_USERNAME: ${TWITCH_BOT_USERNAME:-} TWITCH_BOT_USERNAME: ${TWITCH_BOT_USERNAME:-}
TWITCH_CHANNEL_NAME: ${TWITCH_CHANNEL_NAME:-} TWITCH_CHANNEL_NAME: ${TWITCH_CHANNEL_NAME:-}
TWITCH_ACCESS_TOKEN: ${TWITCH_ACCESS_TOKEN:-}
TWITCH_CHAT_ENABLED: ${TWITCH_CHAT_ENABLED:-true}
TWITCH_REQUIRE_LIVE_STREAM: ${TWITCH_REQUIRE_LIVE_STREAM:-true}
TWITCH_LIVE_STATUS_CACHE_SECONDS: ${TWITCH_LIVE_STATUS_CACHE_SECONDS:-60}
TWITCH_OFFLINE_CHAT_TEST_MODE: ${TWITCH_OFFLINE_CHAT_TEST_MODE:-false}
LLM_PROVIDER: ${LLM_PROVIDER:-} LLM_PROVIDER: ${LLM_PROVIDER:-}
LLM_BASE_URL: ${LLM_BASE_URL:-} LLM_BASE_URL: ${LLM_BASE_URL:-}
LLM_API_KEY: ${LLM_API_KEY:-} LLM_API_KEY: ${LLM_API_KEY:-}