Add stream dashboard ingestion

This commit is contained in:
2026-05-12 10:18:20 -05:00
parent c65b51c61c
commit c1d6032eb2
3 changed files with 160 additions and 0 deletions

View File

@@ -1,8 +1,10 @@
"""FastAPI main application.""" """FastAPI main application."""
import asyncio import asyncio
import json
import secrets import secrets
from contextlib import suppress from contextlib import suppress
from pydantic import BaseModel, Field
from fastapi import Depends, FastAPI, Form, Header, HTTPException from fastapi import Depends, FastAPI, Form, Header, HTTPException
from datetime import datetime from datetime import datetime
import logging import logging
@@ -10,6 +12,8 @@ import logging
from app.config import settings from app.config import settings
from app.agent.orchestrator import AgentOrchestrator from app.agent.orchestrator import AgentOrchestrator
from app.memory.database import init_db from app.memory.database import init_db
from app.memory.database import get_session as get_db_session
from app.memory.repository import Repository
from app.exports.markdown import MarkdownExporter from app.exports.markdown import MarkdownExporter
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -20,6 +24,20 @@ app = FastAPI(
version="0.1.0", version="0.1.0",
) )
class DashboardRequest(BaseModel):
"""Request body for saving a stream dashboard."""
session_id: str
raw_markdown: str
stream_title: str | None = None
game: str | None = None
mood: str | None = None
go_live_notification: str | None = None
social_post: str | None = None
session_goals: list[str] = Field(default_factory=list)
content_angle: str | None = None
# Global orchestrator instance # Global orchestrator instance
orchestrator: AgentOrchestrator | None = None orchestrator: AgentOrchestrator | None = None
agent_loop_task: asyncio.Task | None = None agent_loop_task: asyncio.Task | None = None
@@ -190,6 +208,73 @@ async def test_loop_inactivity(
} }
def serialize_dashboard(dashboard) -> dict:
"""Serialize a dashboard database model into an API response."""
session_goals = []
if dashboard.session_goals:
try:
session_goals = json.loads(dashboard.session_goals)
except json.JSONDecodeError:
session_goals = []
return {
"session_id": dashboard.session_id,
"raw_markdown": dashboard.raw_markdown,
"stream_title": dashboard.stream_title,
"game": dashboard.game,
"mood": dashboard.mood,
"go_live_notification": dashboard.go_live_notification,
"social_post": dashboard.social_post,
"session_goals": session_goals,
"content_angle": dashboard.content_angle,
"created_at": dashboard.created_at.isoformat(),
"updated_at": dashboard.updated_at.isoformat(),
}
@app.post("/admin/session/dashboard", dependencies=[Depends(require_admin)])
async def save_session_dashboard(request: DashboardRequest) -> dict:
"""Create or update the approved dashboard for a stream session."""
async for db_session in get_db_session():
repo = Repository(db_session)
stream_session = await repo.get_session(request.session_id)
if not stream_session:
raise HTTPException(status_code=404, detail="Session not found")
dashboard = await repo.upsert_dashboard(
session_id=request.session_id,
raw_markdown=request.raw_markdown,
stream_title=request.stream_title,
game=request.game,
mood=request.mood,
go_live_notification=request.go_live_notification,
social_post=request.social_post,
session_goals=request.session_goals,
content_angle=request.content_angle,
)
return {
"status": "dashboard_saved",
"dashboard": serialize_dashboard(dashboard),
"timestamp": datetime.utcnow().isoformat(),
}
@app.get("/admin/session/dashboard", dependencies=[Depends(require_admin)])
async def get_session_dashboard(session_id: str) -> dict:
"""Get the approved dashboard for a stream session."""
async for db_session in get_db_session():
repo = Repository(db_session)
dashboard = await repo.get_dashboard(session_id)
if not dashboard:
raise HTTPException(status_code=404, detail="Dashboard not found")
return {
"dashboard": serialize_dashboard(dashboard),
"timestamp": datetime.utcnow().isoformat(),
}
@app.get("/admin/ledger", dependencies=[Depends(require_admin)]) @app.get("/admin/ledger", dependencies=[Depends(require_admin)])
async def get_ledger(session_id: str) -> dict: async def get_ledger(session_id: str) -> dict:
"""Get the markdown ledger for a session.""" """Get the markdown ledger for a session."""

View File

@@ -35,6 +35,24 @@ class ChatMessage(Base):
is_moderator = Column(Boolean, default=False) is_moderator = Column(Boolean, default=False)
class StreamDashboard(Base):
"""Stores the approved stream dashboard for a session."""
__tablename__ = "stream_dashboards"
session_id = Column(String, primary_key=True)
raw_markdown = Column(Text, nullable=False)
stream_title = Column(String, nullable=True)
game = Column(String, nullable=True)
mood = Column(String, nullable=True)
go_live_notification = Column(Text, nullable=True)
social_post = Column(Text, nullable=True)
session_goals = Column(Text, nullable=True) # JSON array of strings
content_angle = Column(Text, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, nullable=False)
class AgentActionType(str, Enum): class AgentActionType(str, Enum):
"""Types of actions the agent can take.""" """Types of actions the agent can take."""

View File

@@ -1,5 +1,6 @@
"""Data access layer for database operations.""" """Data access layer for database operations."""
import json
import logging import logging
import uuid import uuid
from datetime import datetime from datetime import datetime
@@ -8,6 +9,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.memory.models import ( from app.memory.models import (
StreamSession, StreamSession,
StreamDashboard,
ChatMessage, ChatMessage,
AgentAction, AgentAction,
ClipCandidate, ClipCandidate,
@@ -66,6 +68,61 @@ class Repository:
result = await self.session.execute(stmt) result = await self.session.execute(stmt)
return list(result.scalars().all()) return list(result.scalars().all())
# Stream Dashboard operations
async def upsert_dashboard(
self,
session_id: str,
raw_markdown: str,
stream_title: str | None = None,
game: str | None = None,
mood: str | None = None,
go_live_notification: str | None = None,
social_post: str | None = None,
session_goals: list[str] | None = None,
content_angle: str | None = None,
) -> StreamDashboard:
"""Create or update a stream dashboard for a session."""
dashboard = await self.get_dashboard(session_id)
now = datetime.utcnow()
goals_json = json.dumps(session_goals or [])
if dashboard is None:
dashboard = StreamDashboard(
session_id=session_id,
raw_markdown=raw_markdown,
stream_title=stream_title,
game=game,
mood=mood,
go_live_notification=go_live_notification,
social_post=social_post,
session_goals=goals_json,
content_angle=content_angle,
created_at=now,
updated_at=now,
)
self.session.add(dashboard)
else:
dashboard.raw_markdown = raw_markdown
dashboard.stream_title = stream_title
dashboard.game = game
dashboard.mood = mood
dashboard.go_live_notification = go_live_notification
dashboard.social_post = social_post
dashboard.session_goals = goals_json
dashboard.content_angle = content_angle
dashboard.updated_at = now
await self.session.commit()
logger.info(f"Saved dashboard for session {session_id}")
return dashboard
async def get_dashboard(self, session_id: str) -> StreamDashboard | None:
"""Retrieve the dashboard for a session."""
stmt = select(StreamDashboard).where(StreamDashboard.session_id == session_id)
result = await self.session.execute(stmt)
return result.scalars().first()
# Chat Message operations # Chat Message operations
async def add_chat_message( async def add_chat_message(