From c1d6032eb2d77cca281427199d37324077e0cfb7 Mon Sep 17 00:00:00 2001 From: Ken Schaefer Date: Tue, 12 May 2026 10:18:20 -0500 Subject: [PATCH] Add stream dashboard ingestion --- app/main.py | 85 ++++++++++++++++++++++++++++++++++++++++ app/memory/models.py | 18 +++++++++ app/memory/repository.py | 57 +++++++++++++++++++++++++++ 3 files changed, 160 insertions(+) diff --git a/app/main.py b/app/main.py index 4d950d9..6cfdaf9 100644 --- a/app/main.py +++ b/app/main.py @@ -1,8 +1,10 @@ """FastAPI main application.""" import asyncio +import json import secrets from contextlib import suppress +from pydantic import BaseModel, Field from fastapi import Depends, FastAPI, Form, Header, HTTPException from datetime import datetime import logging @@ -10,6 +12,8 @@ import logging from app.config import settings from app.agent.orchestrator import AgentOrchestrator from app.memory.database import init_db +from app.memory.database import get_session as get_db_session +from app.memory.repository import Repository from app.exports.markdown import MarkdownExporter logger = logging.getLogger(__name__) @@ -20,6 +24,20 @@ app = FastAPI( version="0.1.0", ) + +class DashboardRequest(BaseModel): + """Request body for saving a stream dashboard.""" + + session_id: str + raw_markdown: str + stream_title: str | None = None + game: str | None = None + mood: str | None = None + go_live_notification: str | None = None + social_post: str | None = None + session_goals: list[str] = Field(default_factory=list) + content_angle: str | None = None + # Global orchestrator instance orchestrator: AgentOrchestrator | None = None agent_loop_task: asyncio.Task | None = None @@ -190,6 +208,73 @@ async def test_loop_inactivity( } +def serialize_dashboard(dashboard) -> dict: + """Serialize a dashboard database model into an API response.""" + session_goals = [] + if dashboard.session_goals: + try: + session_goals = json.loads(dashboard.session_goals) + except json.JSONDecodeError: + session_goals = [] + + return { + "session_id": dashboard.session_id, + "raw_markdown": dashboard.raw_markdown, + "stream_title": dashboard.stream_title, + "game": dashboard.game, + "mood": dashboard.mood, + "go_live_notification": dashboard.go_live_notification, + "social_post": dashboard.social_post, + "session_goals": session_goals, + "content_angle": dashboard.content_angle, + "created_at": dashboard.created_at.isoformat(), + "updated_at": dashboard.updated_at.isoformat(), + } + + +@app.post("/admin/session/dashboard", dependencies=[Depends(require_admin)]) +async def save_session_dashboard(request: DashboardRequest) -> dict: + """Create or update the approved dashboard for a stream session.""" + async for db_session in get_db_session(): + repo = Repository(db_session) + stream_session = await repo.get_session(request.session_id) + if not stream_session: + raise HTTPException(status_code=404, detail="Session not found") + + dashboard = await repo.upsert_dashboard( + session_id=request.session_id, + raw_markdown=request.raw_markdown, + stream_title=request.stream_title, + game=request.game, + mood=request.mood, + go_live_notification=request.go_live_notification, + social_post=request.social_post, + session_goals=request.session_goals, + content_angle=request.content_angle, + ) + + return { + "status": "dashboard_saved", + "dashboard": serialize_dashboard(dashboard), + "timestamp": datetime.utcnow().isoformat(), + } + + +@app.get("/admin/session/dashboard", dependencies=[Depends(require_admin)]) +async def get_session_dashboard(session_id: str) -> dict: + """Get the approved dashboard for a stream session.""" + async for db_session in get_db_session(): + repo = Repository(db_session) + dashboard = await repo.get_dashboard(session_id) + if not dashboard: + raise HTTPException(status_code=404, detail="Dashboard not found") + + return { + "dashboard": serialize_dashboard(dashboard), + "timestamp": datetime.utcnow().isoformat(), + } + + @app.get("/admin/ledger", dependencies=[Depends(require_admin)]) async def get_ledger(session_id: str) -> dict: """Get the markdown ledger for a session.""" diff --git a/app/memory/models.py b/app/memory/models.py index e365cd4..fb71258 100644 --- a/app/memory/models.py +++ b/app/memory/models.py @@ -35,6 +35,24 @@ class ChatMessage(Base): is_moderator = Column(Boolean, default=False) +class StreamDashboard(Base): + """Stores the approved stream dashboard for a session.""" + + __tablename__ = "stream_dashboards" + + session_id = Column(String, primary_key=True) + raw_markdown = Column(Text, nullable=False) + stream_title = Column(String, nullable=True) + game = Column(String, nullable=True) + mood = Column(String, nullable=True) + go_live_notification = Column(Text, nullable=True) + social_post = Column(Text, nullable=True) + session_goals = Column(Text, nullable=True) # JSON array of strings + content_angle = Column(Text, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, nullable=False) + + class AgentActionType(str, Enum): """Types of actions the agent can take.""" diff --git a/app/memory/repository.py b/app/memory/repository.py index 9022e1e..37c3bf6 100644 --- a/app/memory/repository.py +++ b/app/memory/repository.py @@ -1,5 +1,6 @@ """Data access layer for database operations.""" +import json import logging import uuid from datetime import datetime @@ -8,6 +9,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.memory.models import ( StreamSession, + StreamDashboard, ChatMessage, AgentAction, ClipCandidate, @@ -66,6 +68,61 @@ class Repository: result = await self.session.execute(stmt) return list(result.scalars().all()) + # Stream Dashboard operations + + async def upsert_dashboard( + self, + session_id: str, + raw_markdown: str, + stream_title: str | None = None, + game: str | None = None, + mood: str | None = None, + go_live_notification: str | None = None, + social_post: str | None = None, + session_goals: list[str] | None = None, + content_angle: str | None = None, + ) -> StreamDashboard: + """Create or update a stream dashboard for a session.""" + dashboard = await self.get_dashboard(session_id) + now = datetime.utcnow() + goals_json = json.dumps(session_goals or []) + + if dashboard is None: + dashboard = StreamDashboard( + session_id=session_id, + raw_markdown=raw_markdown, + stream_title=stream_title, + game=game, + mood=mood, + go_live_notification=go_live_notification, + social_post=social_post, + session_goals=goals_json, + content_angle=content_angle, + created_at=now, + updated_at=now, + ) + self.session.add(dashboard) + else: + dashboard.raw_markdown = raw_markdown + dashboard.stream_title = stream_title + dashboard.game = game + dashboard.mood = mood + dashboard.go_live_notification = go_live_notification + dashboard.social_post = social_post + dashboard.session_goals = goals_json + dashboard.content_angle = content_angle + dashboard.updated_at = now + + await self.session.commit() + logger.info(f"Saved dashboard for session {session_id}") + return dashboard + + async def get_dashboard(self, session_id: str) -> StreamDashboard | None: + """Retrieve the dashboard for a session.""" + stmt = select(StreamDashboard).where(StreamDashboard.session_id == session_id) + result = await self.session.execute(stmt) + return result.scalars().first() + # Chat Message operations async def add_chat_message(