Files
ws-sanctum-chronicler/app/main.py

127 lines
3.5 KiB
Python

"""FastAPI main application."""
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from datetime import datetime
import logging
from app.config import settings
from app.agent.orchestrator import AgentOrchestrator
from app.memory.database import init_db
from app.exports.markdown import MarkdownExporter
logger = logging.getLogger(__name__)
app = FastAPI(
title=settings.APP_NAME,
description="AI stream assistant for monitoring and guiding Twitch chat",
version="0.1.0",
)
# Global orchestrator instance
orchestrator: AgentOrchestrator | None = None
@app.on_event("startup")
async def startup_event():
"""Initialize database and services on startup."""
global orchestrator
try:
await init_db()
orchestrator = AgentOrchestrator()
logger.info("Application started successfully")
except Exception as e:
logger.error(f"Failed to start application: {e}")
raise
@app.on_event("shutdown")
async def shutdown_event():
"""Clean up resources on shutdown."""
logger.info("Application shutting down")
@app.get("/health")
async def health_check() -> dict:
"""Health check endpoint."""
return {
"status": "healthy",
"app": settings.APP_NAME,
"environment": settings.APP_ENV,
"timestamp": datetime.utcnow().isoformat(),
}
@app.post("/admin/session/start")
async def start_session(channel_name: str) -> dict:
"""Start a new stream session."""
if not orchestrator:
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
session_id = await orchestrator.start_session(channel_name)
return {
"status": "session_started",
"session_id": session_id,
"channel": channel_name,
"timestamp": datetime.utcnow().isoformat(),
}
@app.post("/admin/session/end")
async def end_session(session_id: str) -> dict:
"""End the current stream session."""
if not orchestrator:
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
await orchestrator.end_session(session_id)
return {
"status": "session_ended",
"session_id": session_id,
"timestamp": datetime.utcnow().isoformat(),
}
@app.post("/admin/test-message")
async def test_message(session_id: str, message: str, username: str = "test_user") -> dict:
"""Send a test message to the orchestrator."""
if not orchestrator:
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
response = await orchestrator.handle_chat_message(
session_id=session_id,
username=username,
message=message,
)
return {
"status": "message_processed",
"agent_response": response.get("agent_response"),
"actions_taken": response.get("actions_taken", []),
"timestamp": datetime.utcnow().isoformat(),
}
@app.get("/admin/ledger")
async def get_ledger(session_id: str) -> dict:
"""Get the markdown ledger for a session."""
if not orchestrator:
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
exporter = MarkdownExporter()
ledger = await exporter.export_session(session_id)
return {
"session_id": session_id,
"ledger": ledger,
"timestamp": datetime.utcnow().isoformat(),
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
log_level="info",
)