Implement runtime agent loop and container hygiene

This commit is contained in:
2026-05-12 07:56:37 -05:00
parent 412d7caec3
commit a09197e85a
13 changed files with 524 additions and 70 deletions

View File

@@ -1,9 +1,7 @@
"""Agent Orchestrator - Routes messages and manages agent modes."""
import logging
import uuid
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from datetime import datetime, timedelta
from app.agent.policies import (
ChatActivityPolicy,
@@ -16,7 +14,7 @@ from app.agent.modes.warden import WardenMode
from app.agent.modes.librarian import LibrarianMode
from app.agent.modes.scribe import ScribeMode
from app.llm.client import LLMClient
from app.memory.database import async_session_factory
from app.memory.database import get_session
from app.memory.models import AgentActionType
from app.memory.repository import Repository
@@ -32,9 +30,10 @@ class AgentOrchestrator:
and how to flag suspicious content.
"""
def __init__(self):
def __init__(self, loop_interval_seconds: float = 60.0):
"""Initialize the orchestrator and all modes."""
self.llm_client = LLMClient()
self.loop_interval_seconds = loop_interval_seconds
# Initialize modes
self.hearthkeeper = HearthkeeperMode(self.llm_client)
@@ -63,18 +62,22 @@ class AgentOrchestrator:
Returns:
Session ID
"""
session_id = str(uuid.uuid4())
async with async_session_factory() as db_session:
session_id: str | None = None
async for db_session in get_session():
repo = Repository(db_session)
await repo.create_session(channel_name)
session_id = await repo.create_session(channel_name)
if session_id is None:
raise RuntimeError("Failed to create stream session")
self.active_sessions[session_id] = {
"channel_name": channel_name,
"started_at": datetime.utcnow(),
"message_count": 0,
"theme": None,
"last_hearthkeeper_prompt_at": None,
}
self.chat_activity.record_activity(session_id)
logger.info(f"Started session {session_id} for {channel_name}")
return session_id
@@ -90,7 +93,7 @@ class AgentOrchestrator:
logger.warning(f"Session {session_id} not found")
return
async with async_session_factory() as db_session:
async for db_session in get_session():
repo = Repository(db_session)
await repo.end_session(session_id)
@@ -122,7 +125,7 @@ class AgentOrchestrator:
actions = []
agent_response = None
async with async_session_factory() as db_session:
async for db_session in get_session():
repo = Repository(db_session)
# Store the message
@@ -136,12 +139,13 @@ class AgentOrchestrator:
# Record activity
self.chat_activity.record_activity(session_id)
session_info["message_count"] += 1
session_info["last_hearthkeeper_prompt_at"] = None
# 1. Warden always analyzes (passive mode)
warden_result = await self.warden.analyze_message(message)
if warden_result["is_suspicious"]:
actions.append(f"WARDEN_FLAG: {warden_result['severity']}")
async with async_session_factory() as db_session:
async for db_session in get_session():
repo = Repository(db_session)
await repo.record_action(
session_id=session_id,
@@ -153,9 +157,12 @@ class AgentOrchestrator:
# 2. Check if we should suppress responses due to active chat
recent_messages = []
async with async_session_factory() as db_session:
async for db_session in get_session():
repo = Repository(db_session)
recent_messages = await repo.get_recent_messages(session_id, limit=10)
recent_messages = await repo.get_messages_since(
session_id=session_id,
since=datetime.utcnow() - timedelta(minutes=1),
)
if self.response_suppression.should_suppress_response(len(recent_messages)):
logger.debug("Response suppressed due to active chat")
@@ -164,25 +171,7 @@ class AgentOrchestrator:
"actions_taken": actions,
}
# 3. Hearthkeeper: Generate prompt if chat inactive
if self.chat_activity.should_hearthkeeper_prompt(session_id):
try:
agent_response = await self.hearthkeeper.generate_prompt(
theme=session_info.get("theme")
)
actions.append("HEARTHKEEPER_PROMPT")
async with async_session_factory() as db_session:
repo = Repository(db_session)
await repo.record_action(
session_id=session_id,
action_type=AgentActionType.RESPONSE,
mode="hearthkeeper",
description=agent_response,
)
except Exception as e:
logger.error(f"Error in Hearthkeeper: {e}")
# 4. Librarian: Archive important messages (passive)
# 3. Librarian: Archive important messages (passive)
if len(message) > 50: # Archive longer messages
await self.librarian.archive_message(message_id, message, username)
@@ -195,6 +184,88 @@ class AgentOrchestrator:
"actions_taken": actions,
}
def set_loop_interval(self, interval_seconds: float) -> None:
"""Update how frequently the background agent loop runs."""
if interval_seconds < 1:
raise ValueError("Loop interval must be at least 1 second")
self.loop_interval_seconds = interval_seconds
def get_loop_status(self) -> dict:
"""Get background loop configuration and current session count."""
return {
"interval_seconds": self.loop_interval_seconds,
"active_session_count": len(self.active_sessions),
}
async def tick(self) -> list[dict]:
"""Evaluate active sessions for time-based agent behavior."""
results = []
for session_id in list(self.active_sessions.keys()):
result = await self._tick_session(session_id)
if result:
results.append(result)
return results
async def _tick_session(self, session_id: str) -> dict | None:
"""Evaluate a single active session during the background loop."""
session_info = self.active_sessions.get(session_id)
if not session_info:
return None
recent_messages = []
async for db_session in get_session():
repo = Repository(db_session)
recent_messages = await repo.get_messages_since(
session_id=session_id,
since=datetime.utcnow() - timedelta(minutes=1),
)
if self.response_suppression.should_suppress_response(len(recent_messages)):
return {
"session_id": session_id,
"actions_taken": [],
"agent_response": None,
"reason": "active_chat",
}
if not self.chat_activity.should_hearthkeeper_prompt(session_id):
return None
last_activity_at = self.chat_activity.last_activity_at(session_id)
last_prompt_at = session_info.get("last_hearthkeeper_prompt_at")
if last_prompt_at and last_activity_at and last_prompt_at >= last_activity_at:
return None
try:
agent_response = await self.hearthkeeper.generate_prompt(
theme=session_info.get("theme")
)
session_info["last_hearthkeeper_prompt_at"] = datetime.utcnow()
async for db_session in get_session():
repo = Repository(db_session)
await repo.record_action(
session_id=session_id,
action_type=AgentActionType.RESPONSE,
mode="hearthkeeper",
description=agent_response,
)
return {
"session_id": session_id,
"actions_taken": ["HEARTHKEEPER_PROMPT"],
"agent_response": agent_response,
"reason": "inactive_chat",
}
except Exception as e:
logger.error(f"Error in Hearthkeeper loop: {e}")
return {
"session_id": session_id,
"actions_taken": [],
"agent_response": None,
"reason": "hearthkeeper_error",
}
async def get_session_status(self, session_id: str) -> dict:
"""Get status of a session."""
if session_id not in self.active_sessions:

View File

@@ -19,9 +19,13 @@ class ChatActivityPolicy:
self.inactivity_threshold = timedelta(minutes=inactivity_threshold_minutes)
self.last_message_time: dict[str, datetime] = {}
def record_activity(self, session_id: str) -> None:
def record_activity(self, session_id: str, occurred_at: datetime | None = None) -> None:
"""Record that chat activity occurred."""
self.last_message_time[session_id] = datetime.utcnow()
self.last_message_time[session_id] = occurred_at or datetime.utcnow()
def last_activity_at(self, session_id: str) -> datetime | None:
"""Get the most recent chat activity time for a session."""
return self.last_message_time.get(session_id)
def minutes_since_activity(self, session_id: str) -> int:
"""Get minutes since last chat message."""

View File

@@ -1,5 +1,6 @@
"""Configuration management using pydantic-settings."""
from pydantic import field_validator
from pydantic_settings import BaseSettings
from typing import Optional
@@ -12,6 +13,30 @@ class Settings(BaseSettings):
APP_ENV: str = "development"
DEBUG: bool = False
@field_validator("DEBUG", mode="before")
@classmethod
def parse_debug(cls, value: object) -> bool:
"""Parse permissive runtime DEBUG values from shell environments."""
if isinstance(value, bool):
return value
if isinstance(value, str):
normalized = value.strip().lower()
if normalized in {"1", "true", "t", "yes", "y", "on", "debug"}:
return True
if normalized in {
"0",
"false",
"f",
"no",
"n",
"off",
"release",
"prod",
"production",
}:
return False
return False
# Database
DATABASE_URL: str = "postgresql+asyncpg://sanctum:password@localhost:5432/sanctum"
@@ -27,6 +52,9 @@ class Settings(BaseSettings):
LLM_API_KEY: Optional[str] = None
LLM_MODEL: str = "gpt-3.5-turbo"
# Agent loop
AGENT_LOOP_INTERVAL_SECONDS: float = 60.0
# Export
EXPORT_PATH: str = "exports"

1
app/exports/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Exports module."""

126
app/exports/markdown.py Normal file
View File

@@ -0,0 +1,126 @@
"""Markdown export functionality for stream ledgers."""
import logging
from datetime import datetime
from pathlib import Path
from app.config import settings
from app.memory.database import get_session
from app.memory.repository import Repository
logger = logging.getLogger(__name__)
class MarkdownExporter:
"""Exports stream session data as markdown ledgers."""
def __init__(self, export_path: str | None = None):
"""
Initialize exporter.
Args:
export_path: Directory to export ledgers to (defaults to settings.EXPORT_PATH)
"""
self.export_path = Path(export_path or settings.EXPORT_PATH)
self.export_path.mkdir(parents=True, exist_ok=True)
async def export_session(self, session_id: str) -> str:
"""
Export a session as a markdown ledger.
Args:
session_id: Session ID to export
Returns:
Markdown content
"""
async for db_session in get_session():
repo = Repository(db_session)
session = await repo.get_session(session_id)
if not session:
logger.warning(f"Session {session_id} not found")
return ""
# Gather data
messages = await repo.get_recent_messages(session_id, limit=1000)
actions = await repo.get_session_actions(session_id)
clips = await repo.get_clip_candidates(session_id)
seeds = await repo.get_blog_seeds(session_id)
# Build markdown
date = session.started_at.strftime("%Y-%m-%d")
ledger = f"# Sanctum Ledger — {date}\n\n"
ledger += f"**Channel:** {session.channel_name}\n"
ledger += f"**Started:** {session.started_at.isoformat()}\n"
if session.ended_at:
ledger += f"**Ended:** {session.ended_at.isoformat()}\n"
ledger += "\n"
# Stream Theme
ledger += "## Stream Theme\n"
if session.theme:
ledger += f"{session.theme}\n"
else:
ledger += "*No theme recorded*\n"
ledger += "\n"
# Notable Discussion
ledger += "## Notable Discussion\n"
if messages:
for msg in messages[:20]: # Latest 20 messages
ledger += f"- **{msg.username}:** {msg.content[:100]}\n"
else:
ledger += "*No messages recorded*\n"
ledger += "\n"
# Agent Actions
ledger += "## Agent Actions\n"
if actions:
for action in actions:
ledger += f"- **{action.mode}** ({action.action_type}): {action.description}\n"
else:
ledger += "*No agent actions recorded*\n"
ledger += "\n"
# Clip Candidates
ledger += "## Clip Candidates\n"
if clips:
for clip in clips:
ledger += f"- {clip.reason}\n"
else:
ledger += "*No clip candidates identified*\n"
ledger += "\n"
# Blog Seeds
ledger += "## Blog Seeds\n"
if seeds:
for seed in seeds:
ledger += f"- **{seed.topic}:** {seed.description}\n"
else:
ledger += "*No blog seeds proposed*\n"
ledger += "\n"
logger.info(f"Generated ledger for session {session_id}")
return ledger
async def save_session_ledger(self, session_id: str) -> Path:
"""
Export session and save to file.
Args:
session_id: Session ID
Returns:
Path to saved file
"""
ledger = await self.export_session(session_id)
date = datetime.utcnow().strftime("%Y-%m-%d")
filename = f"ledger_{date}_{session_id[:8]}.md"
filepath = self.export_path / filename
filepath.write_text(ledger, encoding="utf-8")
logger.info(f"Saved ledger to {filepath}")
return filepath

View File

@@ -1,7 +1,8 @@
"""FastAPI main application."""
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
import asyncio
from contextlib import suppress
from fastapi import FastAPI, HTTPException, Form
from datetime import datetime
import logging
@@ -20,15 +21,37 @@ app = FastAPI(
# Global orchestrator instance
orchestrator: AgentOrchestrator | None = None
agent_loop_task: asyncio.Task | None = None
async def agent_loop() -> None:
"""Run periodic time-based agent behavior for active sessions."""
if not orchestrator:
return
while True:
try:
results = await orchestrator.tick()
if results:
logger.info(f"Agent loop actions: {results}")
except asyncio.CancelledError:
raise
except Exception as e:
logger.error(f"Agent loop tick failed: {e}")
await asyncio.sleep(orchestrator.loop_interval_seconds)
@app.on_event("startup")
async def startup_event():
"""Initialize database and services on startup."""
global orchestrator
global orchestrator, agent_loop_task
try:
await init_db()
orchestrator = AgentOrchestrator()
orchestrator = AgentOrchestrator(
loop_interval_seconds=settings.AGENT_LOOP_INTERVAL_SECONDS
)
agent_loop_task = asyncio.create_task(agent_loop())
logger.info("Application started successfully")
except Exception as e:
logger.error(f"Failed to start application: {e}")
@@ -38,6 +61,10 @@ async def startup_event():
@app.on_event("shutdown")
async def shutdown_event():
"""Clean up resources on shutdown."""
if agent_loop_task:
agent_loop_task.cancel()
with suppress(asyncio.CancelledError):
await agent_loop_task
logger.info("Application shutting down")
@@ -53,7 +80,7 @@ async def health_check() -> dict:
@app.post("/admin/session/start")
async def start_session(channel_name: str) -> dict:
async def start_session(channel_name: str = Form(...)) -> dict:
"""Start a new stream session."""
if not orchestrator:
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
@@ -68,7 +95,7 @@ async def start_session(channel_name: str) -> dict:
@app.post("/admin/session/end")
async def end_session(session_id: str) -> dict:
async def end_session(session_id: str = Form(...)) -> dict:
"""End the current stream session."""
if not orchestrator:
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
@@ -82,7 +109,7 @@ async def end_session(session_id: str) -> dict:
@app.post("/admin/test-message")
async def test_message(session_id: str, message: str, username: str = "test_user") -> dict:
async def test_message(session_id: str = Form(...), message: str = Form(...), username: str = Form("test_user")) -> dict:
"""Send a test message to the orchestrator."""
if not orchestrator:
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
@@ -116,6 +143,37 @@ async def get_ledger(session_id: str) -> dict:
}
@app.get("/admin/loop/status")
async def get_loop_status() -> dict:
"""Get the background agent loop runtime configuration."""
if not orchestrator:
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
return {
"status": "running" if agent_loop_task and not agent_loop_task.done() else "stopped",
**orchestrator.get_loop_status(),
"timestamp": datetime.utcnow().isoformat(),
}
@app.post("/admin/loop/frequency")
async def set_loop_frequency(interval_seconds: float = Form(...)) -> dict:
"""Set how frequently the background agent loop runs."""
if not orchestrator:
raise HTTPException(status_code=503, detail="Orchestrator not initialized")
try:
orchestrator.set_loop_interval(interval_seconds)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
return {
"status": "loop_frequency_updated",
"interval_seconds": orchestrator.loop_interval_seconds,
"timestamp": datetime.utcnow().isoformat(),
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(

View File

@@ -95,6 +95,21 @@ class Repository:
result = await self.session.execute(stmt)
return list(result.scalars().all())
async def get_messages_since(
self, session_id: str, since: datetime
) -> list[ChatMessage]:
"""Get messages recorded since a specific timestamp."""
stmt = (
select(ChatMessage)
.where(
ChatMessage.session_id == session_id,
ChatMessage.timestamp >= since,
)
.order_by(ChatMessage.timestamp.desc())
)
result = await self.session.execute(stmt)
return list(result.scalars().all())
# Agent Action operations
async def record_action(