Files
ws-sanctum-chronicler/app/main.py

264 lines
8.3 KiB
Python

"""FastAPI main application."""
import asyncio
import secrets
from contextlib import suppress
from fastapi import Depends, FastAPI, Form, Header, HTTPException
from datetime import datetime
import logging
from app.config import settings
from app.agent.orchestrator import AgentOrchestrator
from app.memory.database import init_db
from app.exports.markdown import MarkdownExporter
logger = logging.getLogger(__name__)
app = FastAPI(
title=settings.APP_NAME,
description="AI stream assistant for monitoring and guiding Twitch chat",
version="0.1.0",
)
# Global orchestrator instance
orchestrator: AgentOrchestrator | None = None
agent_loop_task: asyncio.Task | None = None
async def require_admin(
admin_token: str | None = Header(default=None, alias="X-Admin-Token"),
) -> None:
"""Require the configured admin token for mutable/control endpoints."""
if not settings.ADMIN_API_KEY:
raise HTTPException(status_code=503, detail="Admin API key is not configured")
if not admin_token or not secrets.compare_digest(
admin_token,
settings.ADMIN_API_KEY,
):
raise HTTPException(status_code=401, detail="Invalid admin token")
async def agent_loop() -> None:
"""Run periodic time-based agent behavior for active sessions."""
if not orchestrator:
return
while True:
try:
results = await orchestrator.tick()
if results:
logger.info(f"Agent loop actions: {results}")
except asyncio.CancelledError:
raise
except Exception as e:
logger.error(f"Agent loop tick failed: {e}")
await asyncio.sleep(orchestrator.loop_interval_seconds)
@app.on_event("startup")
async def startup_event():
"""Initialize database and services on startup."""
global orchestrator, agent_loop_task
try:
await init_db()
orchestrator = AgentOrchestrator(
loop_interval_seconds=settings.AGENT_LOOP_INTERVAL_SECONDS
)
await orchestrator.restore_active_sessions()
agent_loop_task = asyncio.create_task(agent_loop())
logger.info("Application started successfully")
except Exception as e:
logger.error(f"Failed to start application: {e}")
raise
@app.on_event("shutdown")
async def shutdown_event():
"""Clean up resources on shutdown."""
if agent_loop_task:
agent_loop_task.cancel()
with suppress(asyncio.CancelledError):
await agent_loop_task
logger.info("Application shutting down")
@app.get("/health")
async def health_check() -> dict:
"""Health check endpoint."""
return {
"status": "healthy",
"app": settings.APP_NAME,
"environment": settings.APP_ENV,
"timestamp": datetime.utcnow().isoformat(),
}
@app.post("/admin/session/start", dependencies=[Depends(require_admin)])
async def start_session(channel_name: str = Form(...)) -> dict:
"""Start a new stream session."""
if not orchestrator:
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
session_id = await orchestrator.start_session(channel_name)
return {
"status": "session_started",
"session_id": session_id,
"channel": channel_name,
"timestamp": datetime.utcnow().isoformat(),
}
@app.post("/admin/session/end", dependencies=[Depends(require_admin)])
async def end_session(session_id: str = Form(...)) -> dict:
"""End the current stream session."""
if not orchestrator:
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
await orchestrator.end_session(session_id)
return {
"status": "session_ended",
"session_id": session_id,
"timestamp": datetime.utcnow().isoformat(),
}
@app.post("/admin/test-message", dependencies=[Depends(require_admin)])
async def test_message(session_id: str = Form(...), message: str = Form(...), username: str = Form("test_user")) -> dict:
"""Send a test message to the orchestrator."""
if not orchestrator:
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
response = await orchestrator.handle_chat_message(
session_id=session_id,
username=username,
message=message,
)
return {
"status": "message_processed",
"agent_response": response.get("agent_response"),
"actions_taken": response.get("actions_taken", []),
"timestamp": datetime.utcnow().isoformat(),
}
@app.post("/admin/test-agent-response", dependencies=[Depends(require_admin)])
async def test_agent_response(
session_id: str = Form(...),
message: str = Form(...),
mode: str = Form("admin"),
) -> dict:
"""Send a test agent response through the outbound boundary."""
if not orchestrator:
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
delivery = await orchestrator.emit_agent_response(
session_id=session_id,
message=message,
mode=mode,
)
if not delivery.get("sent"):
raise HTTPException(status_code=404, detail=delivery.get("reason", "send_failed"))
return {
"status": "agent_response_emitted",
"delivery": delivery,
"timestamp": datetime.utcnow().isoformat(),
}
@app.post("/admin/test-loop-inactivity", dependencies=[Depends(require_admin)])
async def test_loop_inactivity(
session_id: str = Form(...),
inactive_minutes: int = Form(16),
) -> dict:
"""Verify the quiet-chat loop records exactly one Hearthkeeper prompt."""
if not orchestrator:
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
result = await orchestrator.run_hearthkeeper_loop_test(
session_id=session_id,
inactive_minutes=inactive_minutes,
)
if result.get("reason") == "session_not_found":
raise HTTPException(status_code=404, detail="Active session not found")
return {
"status": "passed" if result["passed"] else "failed",
"result": result,
"timestamp": datetime.utcnow().isoformat(),
}
@app.get("/admin/ledger", dependencies=[Depends(require_admin)])
async def get_ledger(session_id: str) -> dict:
"""Get the markdown ledger for a session."""
if not orchestrator:
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
exporter = MarkdownExporter()
ledger = await exporter.export_session(session_id)
return {
"session_id": session_id,
"ledger": ledger,
"timestamp": datetime.utcnow().isoformat(),
}
@app.get("/admin/session/status", dependencies=[Depends(require_admin)])
async def get_session_status(session_id: str) -> dict:
"""Get status for an active stream session."""
if not orchestrator:
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
status = await orchestrator.get_session_status(session_id)
if not status:
raise HTTPException(status_code=404, detail="Active session not found")
return {
**status,
"timestamp": datetime.utcnow().isoformat(),
}
@app.get("/admin/loop/status", dependencies=[Depends(require_admin)])
async def get_loop_status() -> dict:
"""Get the background agent loop runtime configuration."""
if not orchestrator:
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
return {
"status": "running" if agent_loop_task and not agent_loop_task.done() else "stopped",
**orchestrator.get_loop_status(),
"timestamp": datetime.utcnow().isoformat(),
}
@app.post("/admin/loop/frequency", dependencies=[Depends(require_admin)])
async def set_loop_frequency(interval_seconds: float = Form(...)) -> dict:
"""Set how frequently the background agent loop runs."""
if not orchestrator:
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
try:
orchestrator.set_loop_interval(interval_seconds)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
return {
"status": "loop_frequency_updated",
"interval_seconds": orchestrator.loop_interval_seconds,
"timestamp": datetime.utcnow().isoformat(),
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
log_level="info",
)