337 lines
11 KiB
Python
337 lines
11 KiB
Python
"""FastAPI main application."""
|
|
|
|
import asyncio
|
|
import secrets
|
|
from contextlib import suppress
|
|
from pydantic import BaseModel, Field
|
|
from fastapi import Depends, FastAPI, Form, Header, HTTPException
|
|
from datetime import datetime
|
|
import logging
|
|
|
|
from app.config import settings
|
|
from app.agent.orchestrator import AgentOrchestrator
|
|
from app.memory.database import init_db
|
|
from app.memory.database import get_session as get_db_session
|
|
from app.memory.repository import Repository
|
|
from app.exports.markdown import MarkdownExporter
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
app = FastAPI(
|
|
title=settings.APP_NAME,
|
|
description="AI stream assistant for monitoring and guiding Twitch chat",
|
|
version="0.1.0",
|
|
)
|
|
|
|
|
|
class DashboardRequest(BaseModel):
|
|
"""Request body for saving a stream dashboard."""
|
|
|
|
session_id: str
|
|
raw_markdown: str
|
|
stream_title: str | None = None
|
|
game: str | None = None
|
|
mood: str | None = None
|
|
go_live_notification: str | None = None
|
|
social_post: str | None = None
|
|
session_goals: list[str] = Field(default_factory=list)
|
|
content_angle: str | None = None
|
|
|
|
# Global orchestrator instance
|
|
orchestrator: AgentOrchestrator | None = None
|
|
agent_loop_task: asyncio.Task | None = None
|
|
|
|
|
|
async def require_admin(
|
|
admin_token: str | None = Header(default=None, alias="X-Admin-Token"),
|
|
) -> None:
|
|
"""Require the configured admin token for mutable/control endpoints."""
|
|
if not settings.ADMIN_API_KEY:
|
|
raise HTTPException(status_code=503, detail="Admin API key is not configured")
|
|
if not admin_token or not secrets.compare_digest(
|
|
admin_token,
|
|
settings.ADMIN_API_KEY,
|
|
):
|
|
raise HTTPException(status_code=401, detail="Invalid admin token")
|
|
|
|
|
|
async def agent_loop() -> None:
|
|
"""Run periodic time-based agent behavior for active sessions."""
|
|
if not orchestrator:
|
|
return
|
|
|
|
while True:
|
|
try:
|
|
results = await orchestrator.tick()
|
|
if results:
|
|
logger.info(f"Agent loop actions: {results}")
|
|
except asyncio.CancelledError:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Agent loop tick failed: {e}")
|
|
|
|
await asyncio.sleep(orchestrator.loop_interval_seconds)
|
|
|
|
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
"""Initialize database and services on startup."""
|
|
global orchestrator, agent_loop_task
|
|
try:
|
|
await init_db()
|
|
orchestrator = AgentOrchestrator(
|
|
loop_interval_seconds=settings.AGENT_LOOP_INTERVAL_SECONDS
|
|
)
|
|
await orchestrator.restore_active_sessions()
|
|
agent_loop_task = asyncio.create_task(agent_loop())
|
|
logger.info("Application started successfully")
|
|
except Exception as e:
|
|
logger.error(f"Failed to start application: {e}")
|
|
raise
|
|
|
|
|
|
@app.on_event("shutdown")
|
|
async def shutdown_event():
|
|
"""Clean up resources on shutdown."""
|
|
if agent_loop_task:
|
|
agent_loop_task.cancel()
|
|
with suppress(asyncio.CancelledError):
|
|
await agent_loop_task
|
|
logger.info("Application shutting down")
|
|
|
|
|
|
@app.get("/health")
|
|
async def health_check() -> dict:
|
|
"""Health check endpoint."""
|
|
return {
|
|
"status": "healthy",
|
|
"app": settings.APP_NAME,
|
|
"environment": settings.APP_ENV,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
|
|
@app.post("/admin/session/start", dependencies=[Depends(require_admin)])
|
|
async def start_session(channel_name: str = Form(...)) -> dict:
|
|
"""Start a new stream session."""
|
|
if not orchestrator:
|
|
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
|
|
|
|
session_id = await orchestrator.start_session(channel_name)
|
|
return {
|
|
"status": "session_started",
|
|
"session_id": session_id,
|
|
"channel": channel_name,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
|
|
@app.post("/admin/session/end", dependencies=[Depends(require_admin)])
|
|
async def end_session(session_id: str = Form(...)) -> dict:
|
|
"""End the current stream session."""
|
|
if not orchestrator:
|
|
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
|
|
|
|
await orchestrator.end_session(session_id)
|
|
return {
|
|
"status": "session_ended",
|
|
"session_id": session_id,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
|
|
@app.post("/admin/test-message", dependencies=[Depends(require_admin)])
|
|
async def test_message(session_id: str = Form(...), message: str = Form(...), username: str = Form("test_user")) -> dict:
|
|
"""Send a test message to the orchestrator."""
|
|
if not orchestrator:
|
|
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
|
|
|
|
response = await orchestrator.handle_chat_message(
|
|
session_id=session_id,
|
|
username=username,
|
|
message=message,
|
|
)
|
|
return {
|
|
"status": "message_processed",
|
|
"agent_response": response.get("agent_response"),
|
|
"actions_taken": response.get("actions_taken", []),
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
|
|
@app.post("/admin/test-agent-response", dependencies=[Depends(require_admin)])
|
|
async def test_agent_response(
|
|
session_id: str = Form(...),
|
|
message: str = Form(...),
|
|
mode: str = Form("admin"),
|
|
) -> dict:
|
|
"""Send a test agent response through the outbound boundary."""
|
|
if not orchestrator:
|
|
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
|
|
|
|
delivery = await orchestrator.emit_agent_response(
|
|
session_id=session_id,
|
|
message=message,
|
|
mode=mode,
|
|
)
|
|
if not delivery.get("sent"):
|
|
raise HTTPException(status_code=404, detail=delivery.get("reason", "send_failed"))
|
|
|
|
return {
|
|
"status": "agent_response_emitted",
|
|
"delivery": delivery,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
|
|
@app.post("/admin/test-loop-inactivity", dependencies=[Depends(require_admin)])
|
|
async def test_loop_inactivity(
|
|
session_id: str = Form(...),
|
|
inactive_minutes: int = Form(16),
|
|
) -> dict:
|
|
"""Verify the quiet-chat loop records exactly one Hearthkeeper prompt."""
|
|
if not orchestrator:
|
|
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
|
|
|
|
result = await orchestrator.run_hearthkeeper_loop_test(
|
|
session_id=session_id,
|
|
inactive_minutes=inactive_minutes,
|
|
)
|
|
if result.get("reason") == "session_not_found":
|
|
raise HTTPException(status_code=404, detail="Active session not found")
|
|
|
|
return {
|
|
"status": "passed" if result["passed"] else "failed",
|
|
"result": result,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
|
|
def serialize_dashboard(dashboard) -> dict:
|
|
"""Serialize a dashboard database model into an API response."""
|
|
return Repository.serialize_dashboard(dashboard)
|
|
|
|
|
|
@app.post("/admin/session/dashboard", dependencies=[Depends(require_admin)])
|
|
async def save_session_dashboard(request: DashboardRequest) -> dict:
|
|
"""Create or update the approved dashboard for a stream session."""
|
|
async for db_session in get_db_session():
|
|
repo = Repository(db_session)
|
|
stream_session = await repo.get_session(request.session_id)
|
|
if not stream_session:
|
|
raise HTTPException(status_code=404, detail="Session not found")
|
|
|
|
dashboard = await repo.upsert_dashboard(
|
|
session_id=request.session_id,
|
|
raw_markdown=request.raw_markdown,
|
|
stream_title=request.stream_title,
|
|
game=request.game,
|
|
mood=request.mood,
|
|
go_live_notification=request.go_live_notification,
|
|
social_post=request.social_post,
|
|
session_goals=request.session_goals,
|
|
content_angle=request.content_angle,
|
|
)
|
|
|
|
if orchestrator and request.session_id in orchestrator.active_sessions:
|
|
orchestrator.active_sessions[request.session_id]["dashboard"] = (
|
|
serialize_dashboard(dashboard)
|
|
)
|
|
orchestrator.active_sessions[request.session_id]["theme"] = (
|
|
request.content_angle or request.stream_title
|
|
)
|
|
|
|
return {
|
|
"status": "dashboard_saved",
|
|
"dashboard": serialize_dashboard(dashboard),
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
|
|
@app.get("/admin/session/dashboard", dependencies=[Depends(require_admin)])
|
|
async def get_session_dashboard(session_id: str) -> dict:
|
|
"""Get the approved dashboard for a stream session."""
|
|
async for db_session in get_db_session():
|
|
repo = Repository(db_session)
|
|
dashboard = await repo.get_dashboard(session_id)
|
|
if not dashboard:
|
|
raise HTTPException(status_code=404, detail="Dashboard not found")
|
|
|
|
return {
|
|
"dashboard": serialize_dashboard(dashboard),
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
|
|
@app.get("/admin/ledger", dependencies=[Depends(require_admin)])
|
|
async def get_ledger(session_id: str) -> dict:
|
|
"""Get the markdown ledger for a session."""
|
|
if not orchestrator:
|
|
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
|
|
|
|
exporter = MarkdownExporter()
|
|
ledger = await exporter.export_session(session_id)
|
|
|
|
return {
|
|
"session_id": session_id,
|
|
"ledger": ledger,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
|
|
@app.get("/admin/session/status", dependencies=[Depends(require_admin)])
|
|
async def get_session_status(session_id: str) -> dict:
|
|
"""Get status for an active stream session."""
|
|
if not orchestrator:
|
|
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
|
|
|
|
status = await orchestrator.get_session_status(session_id)
|
|
if not status:
|
|
raise HTTPException(status_code=404, detail="Active session not found")
|
|
|
|
return {
|
|
**status,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
|
|
@app.get("/admin/loop/status", dependencies=[Depends(require_admin)])
|
|
async def get_loop_status() -> dict:
|
|
"""Get the background agent loop runtime configuration."""
|
|
if not orchestrator:
|
|
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
|
|
|
|
return {
|
|
"status": "running" if agent_loop_task and not agent_loop_task.done() else "stopped",
|
|
**orchestrator.get_loop_status(),
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
|
|
@app.post("/admin/loop/frequency", dependencies=[Depends(require_admin)])
|
|
async def set_loop_frequency(interval_seconds: float = Form(...)) -> dict:
|
|
"""Set how frequently the background agent loop runs."""
|
|
if not orchestrator:
|
|
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
|
|
|
|
try:
|
|
orchestrator.set_loop_interval(interval_seconds)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e)) from e
|
|
|
|
return {
|
|
"status": "loop_frequency_updated",
|
|
"interval_seconds": orchestrator.loop_interval_seconds,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(
|
|
app,
|
|
host="0.0.0.0",
|
|
port=8000,
|
|
log_level="info",
|
|
)
|