diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..04654e7 --- /dev/null +++ b/.env.example @@ -0,0 +1,24 @@ +# Application Settings +APP_NAME=Sanctum Chronicler +APP_ENV=development +DEBUG=false + +# Database +DATABASE_URL=postgresql+asyncpg://sanctum:password@localhost:5432/sanctum +DB_PASSWORD=password + +# Twitch Configuration +TWITCH_CLIENT_ID= +TWITCH_CLIENT_SECRET= +TWITCH_BOT_USERNAME= +TWITCH_CHANNEL_NAME= + +# LLM Configuration +# Supported providers: openai, ollama, lm_studio (or leave empty for mock) +LLM_PROVIDER= +LLM_BASE_URL= +LLM_API_KEY= +LLM_MODEL=gpt-3.5-turbo + +# Export Configuration +EXPORT_PATH=./exports \ No newline at end of file diff --git a/.gitignore b/.gitignore index 36b13f1..46286b9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,176 +1,81 @@ -# ---> Python -# Byte-compiled / optimized / DLL files +# ========================================= +# Python +# ========================================= __pycache__/ *.py[cod] *$py.class -# C extensions -*.so +# Virtual environments +.venv/ +venv/ +env/ + +# Python tooling +.pytest_cache/ +.mypy_cache/ +.ruff_cache/ +coverage/ +htmlcov/ # Distribution / packaging -.Python build/ -develop-eggs/ dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -share/python-wheels/ *.egg-info/ -.installed.cfg -*.egg -MANIFEST -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.nox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -*.py,cover -.hypothesis/ -.pytest_cache/ -cover/ - -# Translations -*.mo -*.pot - -# Django stuff: -*.log -local_settings.py -db.sqlite3 -db.sqlite3-journal - -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy - -# Sphinx documentation -docs/_build/ - -# PyBuilder -.pybuilder/ -target/ - -# Jupyter Notebook -.ipynb_checkpoints - -# IPython -profile_default/ -ipython_config.py - -# pyenv -# For a library or package, you might want to ignore these files since the code is -# intended to run in multiple environments; otherwise, check them in: -# .python-version - -# pipenv -# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. -# However, in case of collaboration, if having platform-specific dependencies or dependencies -# having no cross-platform support, pipenv may install dependencies that don't work, or not -# install all needed dependencies. -#Pipfile.lock - -# UV -# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. -# This is especially recommended for binary packages to ensure reproducibility, and is more -# commonly ignored for libraries. -#uv.lock - -# poetry -# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. -# This is especially recommended for binary packages to ensure reproducibility, and is more -# commonly ignored for libraries. -# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control -#poetry.lock - -# pdm -# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. -#pdm.lock -# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it -# in version control. -# https://pdm.fming.dev/latest/usage/project/#working-with-version-control -.pdm.toml -.pdm-python -.pdm-build/ - -# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm -__pypackages__/ - -# Celery stuff -celerybeat-schedule -celerybeat.pid - -# SageMath parsed files -*.sage.py - -# Environments +# ========================================= +# Environment / Secrets +# ========================================= .env -.venv -env/ -venv/ -ENV/ -env.bak/ -venv.bak/ +.env.* +!.env.example -# Spyder project settings -.spyderproject -.spyproject +# ========================================= +# VSCode +# ========================================= +.vscode/ -# Rope project settings -.ropeproject +# ========================================= +# Docker +# ========================================= +docker-data/ +postgres/ +pgdata/ -# mkdocs documentation -/site +# ========================================= +# Runtime / Exports +# ========================================= +exports/ +data/ +logs/ -# mypy -.mypy_cache/ -.dmypy.json -dmypy.json +# Markdown exports +*.ledger.md -# Pyre type checker -.pyre/ +# ========================================= +# Database files +# ========================================= +*.db +*.sqlite3 -# pytype static type analyzer -.pytype/ +# ========================================= +# Local AI / Model Artifacts +# ========================================= +models/ +cache/ +tmp/ -# Cython debug symbols -cython_debug/ +# ========================================= +# OS Junk +# ========================================= +.DS_Store +Thumbs.db -# PyCharm -# JetBrains specific template is maintained in a separate JetBrains.gitignore that can -# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore -# and can be added to the global gitignore or merged into this file. For a more nuclear -# option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ - -# Ruff stuff: -.ruff_cache/ - -# PyPI configuration file -.pypirc +# ========================================= +# JetBrains +# ========================================= +.idea/ +# ========================================= +# Misc +# ========================================= +*.log \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..cd09392 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,48 @@ +# Build stage +FROM python:3.12-slim as builder + +WORKDIR /tmp + +# Install poetry +RUN pip install --no-cache-dir poetry + +# Copy dependency file +COPY requirements.txt . + +# Generate wheels +RUN pip wheel --no-cache-dir --no-deps --wheel-dir /tmp/wheels -r requirements.txt + + +# Runtime stage +FROM python:3.12-slim + +WORKDIR /app + +# Install runtime dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + postgresql-client \ + && rm -rf /var/lib/apt/lists/* + +# Copy wheels from builder +COPY --from=builder /tmp/wheels /wheels +COPY --from=builder /tmp/requirements.txt . + +# Install Python packages +RUN pip install --no-cache /wheels/* + +# Copy application code +COPY ./app /app/app + +# Create non-root user +RUN useradd -m -u 1000 sanctum && chown -R sanctum:sanctum /app +USER sanctum + +# Expose port +EXPOSE 8000 + +# Health check +HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \ + CMD python -c "import httpx; httpx.get('http://localhost:8000/health')" + +# Run application +CMD ["python", "-m", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/README.md b/README.md index 96f8478..0632cfb 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,402 @@ -# ws-sanctum-chronicler +# The Sanctum Chronicler + +A Dockerized Python MVP for an AI stream assistant that monitors Twitch chat, gently guides conversation, stores stream events, and exports a post-stream markdown ledger. + +## Overview + +**The Sanctum Chronicler** is an intelligent assistant designed to enhance live streaming by: +- Monitoring real-time chat and stream activity +- Maintaining a warm, non-intrusive presence during streams +- Flagging suspicious content and spam patterns +- Archiving discussion highlights and clip candidates +- Generating post-stream ledgers and blog ideas + +The system uses a multi-mode agent architecture, where different "personas" handle different aspects of stream management: +- **Hearthkeeper** - Gently prompts chat when it's quiet +- **Steward** - Responds thoughtfully to engagement +- **Warden** - Detects suspicious content and spam +- **Librarian** - Archives important discussion +- **Scribe** - Compiles post-stream ledgers + +## Architecture + +### Project Structure + +``` +sanctum-agent/ +├── app/ +│ ├── main.py # FastAPI application +│ ├── config.py # Configuration (pydantic-settings) +│ ├── twitch/ +│ │ ├── eventsub.py # Twitch EventSub client (stub) +│ │ └── chat.py # Chat message handling (stub) +│ ├── agent/ +│ │ ├── orchestrator.py # Main agent orchestrator +│ │ ├── policies.py # Behavior policies +│ │ └── modes/ +│ │ ├── hearthkeeper.py +│ │ ├── steward.py +│ │ ├── warden.py +│ │ ├── librarian.py +│ │ └── scribe.py +│ ├── memory/ +│ │ ├── database.py # Async SQLAlchemy setup +│ │ ├── models.py # Database models +│ │ └── repository.py # Data access layer +│ ├── llm/ +│ │ ├── client.py # Pluggable LLM client +│ │ └── prompts.py # Prompt templates +│ └── exports/ +│ └── markdown.py # Markdown ledger generation +├── exports/ # Generated ledgers +├── data/ # Local data storage +├── Dockerfile +├── docker-compose.yml +├── requirements.txt +├── .env.example +└── README.md +``` + +### Tech Stack + +- **Python 3.12** - Core language +- **FastAPI** - REST API framework +- **SQLAlchemy + asyncpg** - Async database ORM +- **PostgreSQL** - Primary data store +- **Docker & Docker Compose** - Containerization +- **Pydantic** - Configuration and validation + +### Key Design Patterns + +**Agent Modes:** Each mode operates independently but shares access to: +- The LLM client for text generation +- The database repository for persistence +- Shared policies for behavior control + +**Policies:** Encapsulate decision logic: +- `ChatActivityPolicy` - Tracks inactivity periods +- `ResponseSuppression` - Avoids speaking during active chat +- `SuspiciousContentPolicy` - Pattern matching for spam/scams + +**Async Architecture:** All I/O operations are non-blocking: +- Database queries use `asyncpg` +- FastAPI endpoints handle concurrent requests +- LLM calls prepare for real API integration + +## Setup & Quick Start + +### Prerequisites + +- Docker & Docker Compose +- Python 3.12 (for local development) +- PostgreSQL 16 (or use Docker) + +### 1. Clone Repository + +```bash +cd ws-sanctum-chronicler +``` + +### 2. Configure Environment + +```bash +cp .env.example .env +# Edit .env with your settings (Twitch tokens, LLM provider, etc.) +``` + +### 3. Start Services + +```bash +docker-compose up --build +``` + +The API will be available at `http://localhost:8000` + +### 4. Test the API + +**Health Check:** +```bash +curl http://localhost:8000/health +``` + +**Start Session:** +```bash +curl -X POST http://localhost:8000/admin/session/start \ + -H "Content-Type: application/x-www-form-urlencoded" \ + -d "channel_name=example_channel" +``` + +**Send Test Message:** +```bash +curl -X POST http://localhost:8000/admin/test-message \ + -H "Content-Type: application/x-www-form-urlencoded" \ + -d "session_id=&username=test_user&message=Hello stream!" +``` + +**Get Ledger:** +```bash +curl http://localhost:8000/admin/ledger?session_id= +``` + +**End Session:** +```bash +curl -X POST http://localhost:8000/admin/session/end \ + -H "Content-Type: application/x-www-form-urlencoded" \ + -d "session_id=" +``` + +## API Endpoints + +### Health & Status + +- `GET /health` - Application health check + +### Session Management + +- `POST /admin/session/start?channel_name=` - Start stream session +- `POST /admin/session/end?session_id=` - End stream session + +### Testing & Admin + +- `POST /admin/test-message?session_id=&username=&message=` - Send test message +- `GET /admin/ledger?session_id=` - Retrieve markdown ledger + +## Configuration + +All settings are loaded from environment variables (see `.env.example`): + +### Application + +- `APP_NAME` - Application display name +- `APP_ENV` - Environment (development/production) +- `DEBUG` - Enable debug logging + +### Database + +- `DATABASE_URL` - PostgreSQL connection string +- `DB_PASSWORD` - Database password (for docker-compose) + +### Twitch (Optional - Stubs Present) + +- `TWITCH_CLIENT_ID` - Twitch OAuth client ID +- `TWITCH_CLIENT_SECRET` - Twitch OAuth secret +- `TWITCH_BOT_USERNAME` - Bot username +- `TWITCH_CHANNEL_NAME` - Channel to monitor + +### LLM + +- `LLM_PROVIDER` - Provider: `openai`, `ollama`, `lm_studio`, or empty for mock +- `LLM_BASE_URL` - API endpoint (for local providers) +- `LLM_API_KEY` - API key (if needed) +- `LLM_MODEL` - Model identifier (default: gpt-3.5-turbo) + +### Export + +- `EXPORT_PATH` - Directory for ledger exports + +## Agent Policies + +### Chat Activity Policy + +- **Inactivity Threshold:** 15 minutes +- **Hearthkeeper Activation:** Sends gentle prompt when no messages for 15+ minutes +- **Human Override:** Hearthkeeper stays silent if chat is active (5+ messages/minute) + +### Response Suppression + +- **Active Chat Threshold:** 5 messages per minute +- **Behavior:** Agent suppresses responses when humans are actively talking +- **Rationale:** Respects human conversation and avoids noise + +### Suspicious Content Detection + +**Patterns Detected:** +- "join our discord", "discord.gg" (growth spam) +- "grow your channel", "easy money" (scams) +- Multiple URLs (spam) +- Common scam keywords + +**Actions:** Warden flags suspicious messages (not auto-delete) + +## Database Schema + +### StreamSession +- `id` (UUID) - Primary key +- `channel_name` - Twitch channel +- `started_at` - Session start time +- `ended_at` - Session end time (null if active) +- `theme` - Stream theme +- `is_active` - Boolean flag + +### ChatMessage +- `id` (UUID) +- `session_id` - Reference to session +- `username` - Message author +- `content` - Message text +- `timestamp` - Message time +- `is_bot`, `is_moderator` - Flags + +### AgentAction +- `id` (UUID) +- `session_id` - Reference to session +- `action_type` - RESPONSE, FLAG_SUSPICIOUS, ARCHIVE_CLIP, etc. +- `mode` - Which agent mode took action +- `triggered_by_message_id` - Message that triggered action +- `description` - Action details + +### ClipCandidate +- `id` (UUID) +- `session_id`, `message_id` +- `reason` - Why it's clip-worthy + +### BlogSeed +- `id` (UUID) +- `session_id` +- `topic`, `description` +- `related_messages` - JSON array + +## LLM Integration + +The system includes a pluggable LLM client that currently: +- Generates mock responses when no provider is configured +- Prepares for OpenAI, Ollama, and LM Studio integration + +**Current Mock Behavior:** +- Returns deterministic responses based on keywords +- Useful for testing without API costs + +**Implementing Real Providers:** + +See `app/llm/client.py` for TODO comments marking where to integrate: +- `_generate_openai()` - OpenAI API calls +- `_generate_ollama()` - Ollama local API +- `_generate_lm_studio()` - LM Studio API + +## Current Limitations + +This is **scaffolding, not production code**: + +- Twitch EventSub connection is a stub (see TODO comments) +- Chat sending is not implemented +- LLM providers are not integrated yet (mock mode works) +- No real OAuth flow for Twitch +- Database migrations are automatic (no versioning) +- No rate limiting on endpoints +- No authentication/authorization +- Ledger export is basic markdown (no formatting options) + +## Next Implementation Steps + +### Phase 1: Core Features (Recommended) + +1. **Implement real Twitch integration:** + - Implement EventSub WebSocket connection in `app/twitch/eventsub.py` + - Implement send chat message API in `app/twitch/chat.py` + - Add OAuth token exchange flow + +2. **Integrate real LLM provider:** + - Choose provider (e.g., Ollama for self-hosted) + - Implement `_generate_ollama()` or `_generate_openai()` + - Test with actual model + +3. **Enhance agent modes:** + - Refine Hearthkeeper timing logic + - Implement Steward mention detection + - Expand Warden pattern library + - Complete Librarian topic extraction + +### Phase 2: User Experience + +1. **Add UI/Dashboard:** + - Stream monitoring view + - Ledger generation UI + - Settings panel + +2. **Improve exports:** + - Configurable markdown templates + - JSON export option + - Email distribution + +3. **Add persistence:** + - Session history + - Settings storage per channel + - Analytics dashboard + +### Phase 3: Production Readiness + +1. **Testing:** + - Unit tests for policies + - Integration tests for agent modes + - E2E tests for full flows + +2. **DevOps:** + - Database migrations (Alembic) + - Logging aggregation + - Monitoring/alerting + +3. **Performance:** + - Rate limiting + - Caching for repeated LLM calls + - Message deduplication + +## Development + +### Local Setup (Without Docker) + +```bash +# Create virtual environment +python3.12 -m venv venv +source venv/bin/activate + +# Install dependencies +pip install -r requirements.txt + +# Create .env file +cp .env.example .env + +# Run migrations (auto-created on app startup) +# Start app +python -m uvicorn app.main:app --reload +``` + +### Database Access + +```bash +# Connect to running PostgreSQL +docker-compose exec sanctum-db psql -U sanctum -d sanctum + +# View tables +\dt + +# Query sessions +SELECT id, channel_name, started_at FROM stream_sessions; +``` + +### Logging + +The application uses Python's standard logging. Configure in `app/main.py`: + +```python +logging.basicConfig(level=logging.DEBUG) +``` + +## Contributing + +When adding features: +- Maintain async/await patterns throughout +- Add type hints to all functions +- Include docstrings with purpose and TODO comments for future work +- Keep modes independent but shareable + +## License + +(Add your license here) + +## Support + +For issues or questions: +1. Check TODO comments in relevant files +2. Review the architecture overview +3. File an issue with reproduction steps + diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..f1c5c45 --- /dev/null +++ b/app/__init__.py @@ -0,0 +1,3 @@ +"""Sanctum Agent - AI stream assistant for The Sanctum Chronicler.""" + +__version__ = "0.1.0" diff --git a/app/agent/__init__.py b/app/agent/__init__.py new file mode 100644 index 0000000..69e05b1 --- /dev/null +++ b/app/agent/__init__.py @@ -0,0 +1 @@ +"""Agent module exports.""" diff --git a/app/agent/modes/__init__.py b/app/agent/modes/__init__.py new file mode 100644 index 0000000..b1123c8 --- /dev/null +++ b/app/agent/modes/__init__.py @@ -0,0 +1 @@ +"""Agent modes module.""" diff --git a/app/agent/modes/hearthkeeper.py b/app/agent/modes/hearthkeeper.py new file mode 100644 index 0000000..2b68da5 --- /dev/null +++ b/app/agent/modes/hearthkeeper.py @@ -0,0 +1,47 @@ +"""Hearthkeeper Mode - Nurtures stream warmth and gentle presence.""" + +import logging +from app.llm.client import LLMClient +from app.llm.prompts import PromptTemplates + +logger = logging.getLogger(__name__) + + +class HearthkeeperMode: + """ + Hearthkeeper - The gentle voice of the Sanctum. + + Purpose: + - Maintains the emotional warmth of the stream + - Generates gentle prompts when chat is quiet + - Encourages participation and connection + - Never forced or aggressive + + Policy: + - Activates when no human chat for 15+ minutes + - Generates 1-2 sentence conversation starters + - Respects stream theme if established + - Can be suppressed if chat becomes active + """ + + def __init__(self, llm_client: LLMClient): + """Initialize Hearthkeeper mode.""" + self.llm_client = llm_client + self.last_activity_minutes = 0 + self.activity_threshold = 15 + + async def should_activate(self, minutes_since_activity: int) -> bool: + """Determine if Hearthkeeper should generate a prompt.""" + return minutes_since_activity >= self.activity_threshold + + async def generate_prompt(self, theme: str | None = None) -> str: + """Generate a gentle prompt for the stream.""" + prompt = PromptTemplates.gentle_prompt(theme) + response = await self.llm_client.generate(prompt) + logger.info("Hearthkeeper generated gentle prompt") + return response + + async def on_chat_activity(self) -> None: + """React to new chat activity.""" + logger.debug("Hearthkeeper notes renewed activity") + self.last_activity_minutes = 0 diff --git a/app/agent/modes/librarian.py b/app/agent/modes/librarian.py new file mode 100644 index 0000000..d2cff91 --- /dev/null +++ b/app/agent/modes/librarian.py @@ -0,0 +1,64 @@ +"""Librarian Mode - Archives and categorizes important discussion.""" + +import logging +from app.llm.client import LLMClient +from app.llm.prompts import PromptTemplates + +logger = logging.getLogger(__name__) + + +class LibrarianMode: + """ + Librarian - The keeper of knowledge and archives. + + Purpose: + - Identifies and catalogs important discussion points + - Creates summaries of key topics + - Builds context for future reference + - Prepares data for blog and clip exports + + Policy: + - Runs passively, always monitoring + - Tags messages by topic/sentiment + - Creates discussion threads + - Identifies "clip-worthy" moments + - Feeds data to Scribe for final export + """ + + def __init__(self, llm_client: LLMClient): + """Initialize Librarian mode.""" + self.llm_client = llm_client + self.archived_messages: list[dict] = [] + self.topics: dict[str, list[str]] = {} + + async def archive_message(self, message_id: str, content: str, username: str) -> None: + """Archive an important message.""" + self.archived_messages.append( + { + "id": message_id, + "content": content, + "username": username, + } + ) + logger.debug(f"Librarian archived message from {username}") + + async def identify_topics(self, messages: list[str]) -> list[str]: + """Identify key topics from a set of messages.""" + # Placeholder: Would use LLM to extract topics + topics = ["general", "technical", "community"] + return topics + + async def create_summary(self, topic: str, messages: list[str]) -> str: + """Create a summary of messages under a topic.""" + prompt = PromptTemplates.librarian_summary(messages) + summary = await self.llm_client.generate(prompt, max_tokens=300) + logger.info(f"Librarian created summary for topic: {topic}") + return summary + + async def get_archives(self) -> dict: + """Get the archive status.""" + return { + "mode": "librarian", + "archived_messages": len(self.archived_messages), + "topics_tracked": len(self.topics), + } diff --git a/app/agent/modes/scribe.py b/app/agent/modes/scribe.py new file mode 100644 index 0000000..25ef203 --- /dev/null +++ b/app/agent/modes/scribe.py @@ -0,0 +1,73 @@ +"""Scribe Mode - Generates the post-stream markdown ledger.""" + +import logging +from datetime import datetime +from app.llm.client import LLMClient + +logger = logging.getLogger(__name__) + + +class ScribeMode: + """ + Scribe - The chronicler of the stream's story. + + Purpose: + - Compiles session data into a markdown ledger + - Generates blog post seeds + - Identifies clip candidates + - Exports final summary document + + Policy: + - Activates at end of stream + - Reads data from Librarian, Warden, and repository + - Creates structured markdown output + - Organizes clips and blog topics + - Ready for post-processing or publishing + """ + + def __init__(self, llm_client: LLMClient): + """Initialize Scribe mode.""" + self.llm_client = llm_client + self.ledger_entries: list[str] = [] + + async def add_entry(self, section: str, content: str) -> None: + """Add an entry to the ledger.""" + self.ledger_entries.append(f"## {section}\n{content}") + logger.debug(f"Scribe recorded ledger entry: {section}") + + async def compile_ledger( + self, + theme: str, + discussion_points: list[str], + agent_actions: list[str], + clip_candidates: list[str], + blog_seeds: list[str], + ) -> str: + """Compile all data into a markdown ledger.""" + date = datetime.utcnow().strftime("%Y-%m-%d") + + ledger = f"# Sanctum Ledger — {date}\n\n" + ledger += f"## Stream Theme\n{theme}\n\n" + ledger += f"## Notable Discussion\n" + for point in discussion_points: + ledger += f"- {point}\n" + ledger += "\n" + ledger += f"## Agent Actions\n" + for action in agent_actions: + ledger += f"- {action}\n" + ledger += "\n" + ledger += f"## Clip Candidates\n" + for clip in clip_candidates: + ledger += f"- {clip}\n" + ledger += "\n" + ledger += f"## Blog Seeds\n" + for seed in blog_seeds: + ledger += f"- {seed}\n" + + logger.info("Scribe compiled stream ledger") + return ledger + + async def export_ledger(self, filename: str, content: str) -> None: + """Export ledger to file.""" + # Actual export handled by MarkdownExporter + logger.info(f"Scribe prepared ledger for export: {filename}") diff --git a/app/agent/modes/steward.py b/app/agent/modes/steward.py new file mode 100644 index 0000000..2a732f7 --- /dev/null +++ b/app/agent/modes/steward.py @@ -0,0 +1,51 @@ +"""Steward Mode - Responds to chat with knowledge and warmth.""" + +import logging +from app.llm.client import LLMClient +from app.llm.prompts import PromptTemplates + +logger = logging.getLogger(__name__) + + +class StewardMode: + """ + Steward - The thoughtful keeper of conversation. + + Purpose: + - Responds to direct questions and comments + - Shares relevant knowledge and context + - Maintains conversation continuity + - Balances speaking and listening + + Policy: + - Activates when chat is active + - Only responds to messages explicitly mentioning the bot + - Keeps responses brief (1-3 sentences) + - Never interrupts human conversation flow + - Can escalate to other modes if needed + """ + + def __init__(self, llm_client: LLMClient): + """Initialize Steward mode.""" + self.llm_client = llm_client + self.response_count = 0 + self.max_responses_per_minute = 2 + + async def should_respond(self, message: str, is_mention: bool) -> bool: + """Determine if Steward should respond.""" + # Only respond to mentions for now (can be expanded) + return is_mention and self.response_count < self.max_responses_per_minute + + async def generate_response( + self, message: str, context: str | None = None + ) -> str: + """Generate a thoughtful response to a message.""" + prompt = PromptTemplates.steward_response(message, context) + response = await self.llm_client.generate(prompt, max_tokens=150) + self.response_count += 1 + logger.info("Steward generated response") + return response + + async def on_response_sent(self) -> None: + """Record that a response was sent.""" + logger.debug("Steward response recorded") diff --git a/app/agent/modes/warden.py b/app/agent/modes/warden.py new file mode 100644 index 0000000..23b8280 --- /dev/null +++ b/app/agent/modes/warden.py @@ -0,0 +1,85 @@ +"""Warden Mode - Detects and flags suspicious content.""" + +import logging +from app.llm.client import LLMClient +from app.llm.prompts import PromptTemplates + +logger = logging.getLogger(__name__) + + +class WardenMode: + """ + Warden - The guardian against unwanted influences. + + Purpose: + - Detects suspicious patterns (spam, scams, bot activity) + - Flags Discord growth schemes and link spam + - Monitors for manipulation or harmful content + - Provides data for moderation decisions + + Policy: + - Runs on every message (always active) + - Never takes action directly (only flags) + - Patterns to detect: + * "Join our Discord" + * "Grow your channel" + * Multiple links + * Repeated messages (spam) + * Known scam keywords + - Flags are recorded for human review + """ + + def __init__(self, llm_client: LLMClient): + """Initialize Warden mode.""" + self.llm_client = llm_client + self.suspicious_patterns = [ + "join our discord", + "discord.gg", + "grow your channel", + "easy money", + "click here", + "limited offer", + "act now", + ] + self.flagged_count = 0 + + async def analyze_message(self, message: str) -> dict: + """Analyze a message for suspicious content.""" + result = { + "is_suspicious": False, + "patterns_detected": [], + "severity": "safe", + } + + # Simple pattern matching + message_lower = message.lower() + for pattern in self.suspicious_patterns: + if pattern in message_lower: + result["patterns_detected"].append(pattern) + result["is_suspicious"] = True + + # Check for multiple links + link_count = message.count("http") + message.count("www") + if link_count > 1: + result["patterns_detected"].append("multiple_links") + result["is_suspicious"] = True + + # Determine severity + if result["is_suspicious"]: + if len(result["patterns_detected"]) >= 2: + result["severity"] = "high" + else: + result["severity"] = "medium" + self.flagged_count += 1 + logger.warning( + f"Warden flagged suspicious message: {result['patterns_detected']}" + ) + + return result + + async def get_report(self) -> dict: + """Get Warden's activity report.""" + return { + "mode": "warden", + "total_flagged": self.flagged_count, + } diff --git a/app/agent/orchestrator.py b/app/agent/orchestrator.py new file mode 100644 index 0000000..8893e6d --- /dev/null +++ b/app/agent/orchestrator.py @@ -0,0 +1,211 @@ +"""Agent Orchestrator - Routes messages and manages agent modes.""" + +import logging +import uuid +from datetime import datetime +from sqlalchemy.ext.asyncio import AsyncSession + +from app.agent.policies import ( + ChatActivityPolicy, + ResponseSuppression, + SuspiciousContentPolicy, +) +from app.agent.modes.hearthkeeper import HearthkeeperMode +from app.agent.modes.steward import StewardMode +from app.agent.modes.warden import WardenMode +from app.agent.modes.librarian import LibrarianMode +from app.agent.modes.scribe import ScribeMode +from app.llm.client import LLMClient +from app.memory.database import async_session_factory +from app.memory.models import AgentActionType +from app.memory.repository import Repository + +logger = logging.getLogger(__name__) + + +class AgentOrchestrator: + """ + Main orchestrator for agent behavior. + + Routes chat messages to appropriate modes and manages responses. + Implements policies for when to speak, when to stay silent, + and how to flag suspicious content. + """ + + def __init__(self): + """Initialize the orchestrator and all modes.""" + self.llm_client = LLMClient() + + # Initialize modes + self.hearthkeeper = HearthkeeperMode(self.llm_client) + self.steward = StewardMode(self.llm_client) + self.warden = WardenMode(self.llm_client) + self.librarian = LibrarianMode(self.llm_client) + self.scribe = ScribeMode(self.llm_client) + + # Initialize policies + self.chat_activity = ChatActivityPolicy(inactivity_threshold_minutes=15) + self.response_suppression = ResponseSuppression() + self.suspicious_content = SuspiciousContentPolicy() + + # Track active sessions + self.active_sessions: dict[str, dict] = {} + + logger.info("AgentOrchestrator initialized with all modes and policies") + + async def start_session(self, channel_name: str) -> str: + """ + Start a new stream session. + + Args: + channel_name: Twitch channel name + + Returns: + Session ID + """ + session_id = str(uuid.uuid4()) + + async with async_session_factory() as db_session: + repo = Repository(db_session) + await repo.create_session(channel_name) + + self.active_sessions[session_id] = { + "channel_name": channel_name, + "started_at": datetime.utcnow(), + "message_count": 0, + "theme": None, + } + + logger.info(f"Started session {session_id} for {channel_name}") + return session_id + + async def end_session(self, session_id: str) -> None: + """ + End a stream session and trigger ledger generation. + + Args: + session_id: Session ID + """ + if session_id not in self.active_sessions: + logger.warning(f"Session {session_id} not found") + return + + async with async_session_factory() as db_session: + repo = Repository(db_session) + await repo.end_session(session_id) + + del self.active_sessions[session_id] + logger.info(f"Ended session {session_id}") + + async def handle_chat_message( + self, + session_id: str, + username: str, + message: str, + ) -> dict: + """ + Process a chat message and determine agent response. + + Args: + session_id: Session ID + username: Username of message sender + message: Message content + + Returns: + Response dict with agent_response, actions_taken, etc. + """ + if session_id not in self.active_sessions: + logger.warning(f"Session {session_id} not found") + return {"agent_response": None, "actions_taken": []} + + session_info = self.active_sessions[session_id] + actions = [] + agent_response = None + + async with async_session_factory() as db_session: + repo = Repository(db_session) + + # Store the message + message_id = await repo.add_chat_message( + session_id=session_id, + username=username, + content=message, + is_bot=False, + ) + + # Record activity + self.chat_activity.record_activity(session_id) + session_info["message_count"] += 1 + + # 1. Warden always analyzes (passive mode) + warden_result = await self.warden.analyze_message(message) + if warden_result["is_suspicious"]: + actions.append(f"WARDEN_FLAG: {warden_result['severity']}") + async with async_session_factory() as db_session: + repo = Repository(db_session) + await repo.record_action( + session_id=session_id, + action_type=AgentActionType.FLAG_SUSPICIOUS, + mode="warden", + description=f"Detected: {warden_result['patterns_detected']}", + triggered_by_message_id=message_id, + ) + + # 2. Check if we should suppress responses due to active chat + recent_messages = [] + async with async_session_factory() as db_session: + repo = Repository(db_session) + recent_messages = await repo.get_recent_messages(session_id, limit=10) + + if self.response_suppression.should_suppress_response(len(recent_messages)): + logger.debug("Response suppressed due to active chat") + return { + "agent_response": agent_response, + "actions_taken": actions, + } + + # 3. Hearthkeeper: Generate prompt if chat inactive + if self.chat_activity.should_hearthkeeper_prompt(session_id): + try: + agent_response = await self.hearthkeeper.generate_prompt( + theme=session_info.get("theme") + ) + actions.append("HEARTHKEEPER_PROMPT") + async with async_session_factory() as db_session: + repo = Repository(db_session) + await repo.record_action( + session_id=session_id, + action_type=AgentActionType.RESPONSE, + mode="hearthkeeper", + description=agent_response, + ) + except Exception as e: + logger.error(f"Error in Hearthkeeper: {e}") + + # 4. Librarian: Archive important messages (passive) + if len(message) > 50: # Archive longer messages + await self.librarian.archive_message(message_id, message, username) + + logger.info( + f"Message processed. Session: {session_id}, Actions: {actions}" + ) + + return { + "agent_response": agent_response, + "actions_taken": actions, + } + + async def get_session_status(self, session_id: str) -> dict: + """Get status of a session.""" + if session_id not in self.active_sessions: + return {} + + session = self.active_sessions[session_id] + + return { + "session_id": session_id, + "channel_name": session["channel_name"], + "message_count": session["message_count"], + "uptime_seconds": (datetime.utcnow() - session["started_at"]).total_seconds(), + "theme": session.get("theme"), + } diff --git a/app/agent/policies.py b/app/agent/policies.py new file mode 100644 index 0000000..613a78d --- /dev/null +++ b/app/agent/policies.py @@ -0,0 +1,105 @@ +"""Agent behavior policies and rules.""" + +import logging +from datetime import datetime, timedelta + +logger = logging.getLogger(__name__) + + +class ChatActivityPolicy: + """Policy for detecting chat activity and inactivity periods.""" + + def __init__(self, inactivity_threshold_minutes: int = 15): + """ + Initialize policy. + + Args: + inactivity_threshold_minutes: Minutes of no chat before Hearthkeeper activates + """ + self.inactivity_threshold = timedelta(minutes=inactivity_threshold_minutes) + self.last_message_time: dict[str, datetime] = {} + + def record_activity(self, session_id: str) -> None: + """Record that chat activity occurred.""" + self.last_message_time[session_id] = datetime.utcnow() + + def minutes_since_activity(self, session_id: str) -> int: + """Get minutes since last chat message.""" + if session_id not in self.last_message_time: + return 0 + elapsed = datetime.utcnow() - self.last_message_time[session_id] + return int(elapsed.total_seconds() / 60) + + def should_hearthkeeper_prompt(self, session_id: str) -> bool: + """Determine if Hearthkeeper should send a prompt.""" + minutes = self.minutes_since_activity(session_id) + should = minutes >= self.inactivity_threshold.total_seconds() / 60 + if should: + logger.info(f"Chat inactive for {minutes} minutes. Hearthkeeper may prompt.") + return should + + +class ResponseSuppression: + """Policy for when the agent should NOT respond.""" + + # Suppress responses when chat is very active (humans are talking) + ACTIVE_CHAT_THRESHOLD = 5 # 5+ messages per minute = suppress + + @staticmethod + def should_suppress_response(recent_message_count: int, time_window_minutes: int = 1) -> bool: + """ + Determine if agent should stay silent due to active chat. + + Args: + recent_message_count: Number of messages in the time window + time_window_minutes: Time window in minutes + + Returns: + True if agent should suppress response + """ + messages_per_minute = recent_message_count / time_window_minutes + suppress = messages_per_minute >= ResponseSuppression.ACTIVE_CHAT_THRESHOLD + + if suppress: + logger.debug(f"Response suppressed due to active chat ({messages_per_minute:.1f} msg/min)") + + return suppress + + +class SuspiciousContentPolicy: + """Policy for detecting suspicious content.""" + + # Patterns that raise Warden alerts + SUSPICIOUS_KEYWORDS = [ + "join our discord", + "discord.gg", + "grow your channel", + "easy money", + "limited offer", + ] + + @staticmethod + def is_suspicious(message: str) -> bool: + """ + Check if a message matches suspicious patterns. + + Args: + message: Message content + + Returns: + True if message is suspicious + """ + message_lower = message.lower() + + for keyword in SuspiciousContentPolicy.SUSPICIOUS_KEYWORDS: + if keyword in message_lower: + logger.warning(f"Suspicious content detected: {keyword}") + return True + + # Check for multiple links + link_count = message.count("http") + message.count("www") + if link_count > 1: + logger.warning("Suspicious content detected: multiple links") + return True + + return False diff --git a/app/config.py b/app/config.py new file mode 100644 index 0000000..16f0961 --- /dev/null +++ b/app/config.py @@ -0,0 +1,39 @@ +"""Configuration management using pydantic-settings.""" + +from pydantic_settings import BaseSettings +from typing import Optional + + +class Settings(BaseSettings): + """Application configuration loaded from environment variables.""" + + # App + APP_NAME: str = "Sanctum Chronicler" + APP_ENV: str = "development" + DEBUG: bool = False + + # Database + DATABASE_URL: str = "postgresql+asyncpg://sanctum:password@localhost:5432/sanctum" + + # Twitch + TWITCH_CLIENT_ID: Optional[str] = None + TWITCH_CLIENT_SECRET: Optional[str] = None + TWITCH_BOT_USERNAME: Optional[str] = None + TWITCH_CHANNEL_NAME: Optional[str] = None + + # LLM + LLM_PROVIDER: Optional[str] = None # "openai", "ollama", "lm_studio", or None + LLM_BASE_URL: Optional[str] = None + LLM_API_KEY: Optional[str] = None + LLM_MODEL: str = "gpt-3.5-turbo" + + # Export + EXPORT_PATH: str = "exports" + + class Config: + env_file = ".env" + env_file_encoding = "utf-8" + case_sensitive = True + + +settings = Settings() diff --git a/app/llm/__init__.py b/app/llm/__init__.py new file mode 100644 index 0000000..6deec54 --- /dev/null +++ b/app/llm/__init__.py @@ -0,0 +1 @@ +"""LLM module exports.""" diff --git a/app/llm/client.py b/app/llm/client.py new file mode 100644 index 0000000..902a567 --- /dev/null +++ b/app/llm/client.py @@ -0,0 +1,117 @@ +"""LLM client abstraction for pluggable LLM providers.""" + +import logging +from typing import Optional + +from app.config import settings + +logger = logging.getLogger(__name__) + + +class LLMClient: + """ + Abstraction layer for LLM providers. + + Supports: OpenAI, Ollama, LM Studio, or offline/mock mode. + Can be extended to support other providers. + """ + + def __init__( + self, + provider: Optional[str] = None, + api_key: Optional[str] = None, + base_url: Optional[str] = None, + model: Optional[str] = None, + ): + """ + Initialize LLM client. + + Args: + provider: "openai", "ollama", "lm_studio", or None for mock + api_key: API key for provider + base_url: Base URL for API (for ollama/lm_studio) + model: Model identifier + """ + self.provider = provider or settings.LLM_PROVIDER + self.api_key = api_key or settings.LLM_API_KEY + self.base_url = base_url or settings.LLM_BASE_URL + self.model = model or settings.LLM_MODEL + + logger.info(f"LLMClient initialized with provider: {self.provider}") + + async def generate(self, prompt: str, max_tokens: int = 200) -> str: + """ + Generate text from a prompt. + + Args: + prompt: Input prompt + max_tokens: Maximum tokens to generate + + Returns: + Generated text + + TODO: Implement OpenAI API integration + TODO: Implement Ollama API integration + TODO: Implement LM Studio API integration + """ + if self.provider == "openai": + return await self._generate_openai(prompt, max_tokens) + elif self.provider == "ollama": + return await self._generate_ollama(prompt, max_tokens) + elif self.provider == "lm_studio": + return await self._generate_lm_studio(prompt, max_tokens) + else: + return self._generate_mock(prompt) + + async def _generate_openai(self, prompt: str, max_tokens: int) -> str: + """ + Generate using OpenAI API. + + TODO: Implement using openai library + - Create client with api_key + - Call ChatCompletion + - Handle errors and retries + """ + logger.warning("OpenAI provider not yet implemented (stub)") + return self._generate_mock(prompt) + + async def _generate_ollama(self, prompt: str, max_tokens: int) -> str: + """ + Generate using Ollama local API. + + TODO: Implement using httpx or requests + - POST to base_url/api/generate + - Stream response and accumulate + - Handle model pulling if needed + """ + logger.warning("Ollama provider not yet implemented (stub)") + return self._generate_mock(prompt) + + async def _generate_lm_studio(self, prompt: str, max_tokens: int) -> str: + """ + Generate using LM Studio local API. + + TODO: Implement OpenAI-compatible API calls + - POST to base_url/v1/chat/completions + - Use same logic as OpenAI but with local endpoint + """ + logger.warning("LM Studio provider not yet implemented (stub)") + return self._generate_mock(prompt) + + def _generate_mock(self, prompt: str) -> str: + """ + Generate deterministic mock response (no API needed). + + Used when no provider is configured or for testing. + """ + logger.debug(f"Mock generation for prompt: {prompt[:50]}...") + + # Simple deterministic responses for testing + if "hello" in prompt.lower(): + return "Greetings, traveler! Welcome to The Sanctum." + elif "help" in prompt.lower(): + return "I am here to guide your discourse through the streams." + elif "topic" in prompt.lower(): + return "The archives speak of many topics worthy of discussion." + else: + return "An interesting observation. Tell me more." diff --git a/app/llm/prompts.py b/app/llm/prompts.py new file mode 100644 index 0000000..fb4686d --- /dev/null +++ b/app/llm/prompts.py @@ -0,0 +1,60 @@ +"""LLM prompt templates and generation utilities.""" + +from typing import Optional + + +class PromptTemplates: + """Collection of prompt templates for different modes.""" + + @staticmethod + def gentle_prompt(current_theme: Optional[str] = None) -> str: + """Generate a gentle prompt when chat has been inactive.""" + if current_theme: + return f"Gently prompt the chat about: {current_theme}" + return "Generate a gentle, inviting prompt to encourage discussion in the stream." + + @staticmethod + def steward_response(message: str, context: Optional[str] = None) -> str: + """Generate a response as the Steward mode.""" + prompt = f"As a thoughtful steward of this stream, respond briefly and helpfully to: {message}" + if context: + prompt += f"\nContext: {context}" + return prompt + + @staticmethod + def warden_analysis(message: str) -> str: + """Generate analysis for suspicious content detection.""" + return f"Analyze this message for suspicious patterns (spam, scams, manipulation): {message}" + + @staticmethod + def librarian_summary(messages: list[str]) -> str: + """Generate a summary of important discussion points.""" + messages_text = "\n".join(messages) + return f"Summarize the key discussion points from this chat log:\n{messages_text}" + + @staticmethod + def scribe_ledger( + theme: str, + discussion: list[str], + actions: list[str], + clips: list[str], + seeds: list[str], + ) -> str: + """Generate markdown ledger summary.""" + return f"""Generate a professional markdown ledger with these sections: +- Theme: {theme} +- Notable Discussion: {len(discussion)} key points +- Agent Actions: {len(actions)} recorded +- Clip Candidates: {len(clips)} identified +- Blog Seeds: {len(seeds)} proposed""" + + @staticmethod + def clip_candidate_reason(message: str) -> str: + """Generate reasoning for marking a message as a clip candidate.""" + return f"Explain why this is a good clip candidate: {message}" + + @staticmethod + def blog_seed_topic(context: list[str]) -> str: + """Generate a blog post topic from discussion context.""" + context_text = "\n".join(context[:5]) # First 5 messages + return f"Based on this discussion, suggest a blog post topic:\n{context_text}" diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..a584067 --- /dev/null +++ b/app/main.py @@ -0,0 +1,126 @@ +"""FastAPI main application.""" + +from fastapi import FastAPI, HTTPException +from fastapi.responses import JSONResponse +from datetime import datetime +import logging + +from app.config import settings +from app.agent.orchestrator import AgentOrchestrator +from app.memory.database import init_db +from app.exports.markdown import MarkdownExporter + +logger = logging.getLogger(__name__) + +app = FastAPI( + title=settings.APP_NAME, + description="AI stream assistant for monitoring and guiding Twitch chat", + version="0.1.0", +) + +# Global orchestrator instance +orchestrator: AgentOrchestrator | None = None + + +@app.on_event("startup") +async def startup_event(): + """Initialize database and services on startup.""" + global orchestrator + try: + await init_db() + orchestrator = AgentOrchestrator() + logger.info("Application started successfully") + except Exception as e: + logger.error(f"Failed to start application: {e}") + raise + + +@app.on_event("shutdown") +async def shutdown_event(): + """Clean up resources on shutdown.""" + logger.info("Application shutting down") + + +@app.get("/health") +async def health_check() -> dict: + """Health check endpoint.""" + return { + "status": "healthy", + "app": settings.APP_NAME, + "environment": settings.APP_ENV, + "timestamp": datetime.utcnow().isoformat(), + } + + +@app.post("/admin/session/start") +async def start_session(channel_name: str) -> dict: + """Start a new stream session.""" + if not orchestrator: + raise HTTPException(status_code=503, detail="Orchestrator not initialized") + + session_id = await orchestrator.start_session(channel_name) + return { + "status": "session_started", + "session_id": session_id, + "channel": channel_name, + "timestamp": datetime.utcnow().isoformat(), + } + + +@app.post("/admin/session/end") +async def end_session(session_id: str) -> dict: + """End the current stream session.""" + if not orchestrator: + raise HTTPException(status_code=503, detail="Orchestrator not initialized") + + await orchestrator.end_session(session_id) + return { + "status": "session_ended", + "session_id": session_id, + "timestamp": datetime.utcnow().isoformat(), + } + + +@app.post("/admin/test-message") +async def test_message(session_id: str, message: str, username: str = "test_user") -> dict: + """Send a test message to the orchestrator.""" + if not orchestrator: + raise HTTPException(status_code=503, detail="Orchestrator not initialized") + + response = await orchestrator.handle_chat_message( + session_id=session_id, + username=username, + message=message, + ) + return { + "status": "message_processed", + "agent_response": response.get("agent_response"), + "actions_taken": response.get("actions_taken", []), + "timestamp": datetime.utcnow().isoformat(), + } + + +@app.get("/admin/ledger") +async def get_ledger(session_id: str) -> dict: + """Get the markdown ledger for a session.""" + if not orchestrator: + raise HTTPException(status_code=503, detail="Orchestrator not initialized") + + exporter = MarkdownExporter() + ledger = await exporter.export_session(session_id) + + return { + "session_id": session_id, + "ledger": ledger, + "timestamp": datetime.utcnow().isoformat(), + } + + +if __name__ == "__main__": + import uvicorn + uvicorn.run( + app, + host="0.0.0.0", + port=8000, + log_level="info", + ) diff --git a/app/memory/__init__.py b/app/memory/__init__.py new file mode 100644 index 0000000..cac0dfc --- /dev/null +++ b/app/memory/__init__.py @@ -0,0 +1 @@ +"""Memory module exports.""" diff --git a/app/memory/database.py b/app/memory/database.py new file mode 100644 index 0000000..9dfe362 --- /dev/null +++ b/app/memory/database.py @@ -0,0 +1,58 @@ +"""Async database configuration and initialization.""" + +import logging +from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession +from sqlalchemy.orm import sessionmaker + +from app.config import settings +from app.memory.models import Base + +logger = logging.getLogger(__name__) + +# Global engine and session factory +engine = None +async_session_factory = None + + +async def init_db() -> None: + """Initialize the database engine and create all tables.""" + global engine, async_session_factory + + try: + engine = create_async_engine( + settings.DATABASE_URL, + echo=settings.DEBUG, + future=True, + ) + + async_session_factory = sessionmaker( + engine, + class_=AsyncSession, + expire_on_commit=False, + ) + + # Create all tables + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + logger.info("Database initialized successfully") + except Exception as e: + logger.error(f"Failed to initialize database: {e}") + raise + + +async def get_session() -> AsyncSession: + """Get an async database session.""" + if async_session_factory is None: + raise RuntimeError("Database not initialized. Call init_db() first.") + + async with async_session_factory() as session: + yield session + + +async def close_db() -> None: + """Close the database connection.""" + global engine + if engine: + await engine.dispose() + logger.info("Database connection closed") diff --git a/app/memory/models.py b/app/memory/models.py new file mode 100644 index 0000000..e365cd4 --- /dev/null +++ b/app/memory/models.py @@ -0,0 +1,84 @@ +"""Memory and database models.""" + +from datetime import datetime +from enum import Enum +from sqlalchemy import Column, String, DateTime, Text, Integer, Boolean, Enum as SQLEnum +from sqlalchemy.orm import declarative_base + +Base = declarative_base() + + +class StreamSession(Base): + """Represents a single stream session.""" + + __tablename__ = "stream_sessions" + + id = Column(String, primary_key=True) + channel_name = Column(String, nullable=False, index=True) + started_at = Column(DateTime, default=datetime.utcnow, nullable=False) + ended_at = Column(DateTime, nullable=True) + theme = Column(Text, nullable=True) + is_active = Column(Boolean, default=True) + + +class ChatMessage(Base): + """Represents a chat message from the stream.""" + + __tablename__ = "chat_messages" + + id = Column(String, primary_key=True) + session_id = Column(String, nullable=False, index=True) + username = Column(String, nullable=False) + content = Column(Text, nullable=False) + timestamp = Column(DateTime, default=datetime.utcnow, nullable=False) + is_bot = Column(Boolean, default=False) + is_moderator = Column(Boolean, default=False) + + +class AgentActionType(str, Enum): + """Types of actions the agent can take.""" + + RESPONSE = "response" + FLAG_SUSPICIOUS = "flag_suspicious" + ARCHIVE_CLIP = "archive_clip" + RECORD_SEED = "record_seed" + UPDATE_THEME = "update_theme" + + +class AgentAction(Base): + """Records of agent actions taken during a session.""" + + __tablename__ = "agent_actions" + + id = Column(String, primary_key=True) + session_id = Column(String, nullable=False, index=True) + action_type = Column(SQLEnum(AgentActionType), nullable=False) + mode = Column(String, nullable=False) # hearthkeeper, steward, warden, etc. + triggered_by_message_id = Column(String, nullable=True) + description = Column(Text, nullable=False) + timestamp = Column(DateTime, default=datetime.utcnow, nullable=False) + + +class ClipCandidate(Base): + """Stores potential clip candidates from stream chat.""" + + __tablename__ = "clip_candidates" + + id = Column(String, primary_key=True) + session_id = Column(String, nullable=False, index=True) + message_id = Column(String, nullable=False) + reason = Column(Text, nullable=False) + timestamp = Column(DateTime, default=datetime.utcnow, nullable=False) + + +class BlogSeed(Base): + """Stores potential blog post topics/seeds from stream.""" + + __tablename__ = "blog_seeds" + + id = Column(String, primary_key=True) + session_id = Column(String, nullable=False, index=True) + topic = Column(String, nullable=False) + description = Column(Text, nullable=False) + related_messages = Column(Text, nullable=True) # JSON array of message IDs + timestamp = Column(DateTime, default=datetime.utcnow, nullable=False) diff --git a/app/memory/repository.py b/app/memory/repository.py new file mode 100644 index 0000000..a13a3e3 --- /dev/null +++ b/app/memory/repository.py @@ -0,0 +1,190 @@ +"""Data access layer for database operations.""" + +import logging +import uuid +from datetime import datetime +from sqlalchemy import select, update +from sqlalchemy.ext.asyncio import AsyncSession + +from app.memory.models import ( + StreamSession, + ChatMessage, + AgentAction, + ClipCandidate, + BlogSeed, + AgentActionType, +) + +logger = logging.getLogger(__name__) + + +class Repository: + """Repository for all database operations.""" + + def __init__(self, session: AsyncSession): + self.session = session + + # Stream Session operations + + async def create_session(self, channel_name: str) -> str: + """Create a new stream session.""" + session_id = str(uuid.uuid4()) + session = StreamSession( + id=session_id, + channel_name=channel_name, + started_at=datetime.utcnow(), + ) + self.session.add(session) + await self.session.commit() + logger.info(f"Created session {session_id} for {channel_name}") + return session_id + + async def end_session(self, session_id: str) -> None: + """End a stream session.""" + stmt = ( + update(StreamSession) + .where(StreamSession.id == session_id) + .values(ended_at=datetime.utcnow(), is_active=False) + ) + await self.session.execute(stmt) + await self.session.commit() + logger.info(f"Ended session {session_id}") + + async def get_session(self, session_id: str) -> StreamSession | None: + """Retrieve a session by ID.""" + stmt = select(StreamSession).where(StreamSession.id == session_id) + result = await self.session.execute(stmt) + return result.scalars().first() + + # Chat Message operations + + async def add_chat_message( + self, + session_id: str, + username: str, + content: str, + is_bot: bool = False, + is_moderator: bool = False, + ) -> str: + """Add a chat message to the database.""" + message_id = str(uuid.uuid4()) + message = ChatMessage( + id=message_id, + session_id=session_id, + username=username, + content=content, + is_bot=is_bot, + is_moderator=is_moderator, + timestamp=datetime.utcnow(), + ) + self.session.add(message) + await self.session.commit() + logger.debug(f"Stored chat message from {username}") + return message_id + + async def get_recent_messages( + self, session_id: str, limit: int = 50 + ) -> list[ChatMessage]: + """Get recent chat messages from a session.""" + stmt = ( + select(ChatMessage) + .where(ChatMessage.session_id == session_id) + .order_by(ChatMessage.timestamp.desc()) + .limit(limit) + ) + result = await self.session.execute(stmt) + return list(result.scalars().all()) + + # Agent Action operations + + async def record_action( + self, + session_id: str, + action_type: AgentActionType, + mode: str, + description: str, + triggered_by_message_id: str | None = None, + ) -> str: + """Record an agent action.""" + action_id = str(uuid.uuid4()) + action = AgentAction( + id=action_id, + session_id=session_id, + action_type=action_type, + mode=mode, + triggered_by_message_id=triggered_by_message_id, + description=description, + timestamp=datetime.utcnow(), + ) + self.session.add(action) + await self.session.commit() + logger.info(f"Recorded agent action: {action_type} via {mode}") + return action_id + + async def get_session_actions(self, session_id: str) -> list[AgentAction]: + """Get all actions from a session.""" + stmt = ( + select(AgentAction) + .where(AgentAction.session_id == session_id) + .order_by(AgentAction.timestamp.asc()) + ) + result = await self.session.execute(stmt) + return list(result.scalars().all()) + + # Clip Candidate operations + + async def add_clip_candidate( + self, session_id: str, message_id: str, reason: str + ) -> str: + """Add a clip candidate.""" + candidate_id = str(uuid.uuid4()) + candidate = ClipCandidate( + id=candidate_id, + session_id=session_id, + message_id=message_id, + reason=reason, + timestamp=datetime.utcnow(), + ) + self.session.add(candidate) + await self.session.commit() + logger.info(f"Added clip candidate: {reason}") + return candidate_id + + async def get_clip_candidates(self, session_id: str) -> list[ClipCandidate]: + """Get all clip candidates from a session.""" + stmt = ( + select(ClipCandidate) + .where(ClipCandidate.session_id == session_id) + .order_by(ClipCandidate.timestamp.asc()) + ) + result = await self.session.execute(stmt) + return list(result.scalars().all()) + + # Blog Seed operations + + async def add_blog_seed( + self, session_id: str, topic: str, description: str + ) -> str: + """Add a blog post seed.""" + seed_id = str(uuid.uuid4()) + seed = BlogSeed( + id=seed_id, + session_id=session_id, + topic=topic, + description=description, + timestamp=datetime.utcnow(), + ) + self.session.add(seed) + await self.session.commit() + logger.info(f"Added blog seed: {topic}") + return seed_id + + async def get_blog_seeds(self, session_id: str) -> list[BlogSeed]: + """Get all blog seeds from a session.""" + stmt = ( + select(BlogSeed) + .where(BlogSeed.session_id == session_id) + .order_by(BlogSeed.timestamp.asc()) + ) + result = await self.session.execute(stmt) + return list(result.scalars().all()) diff --git a/app/twitch/__init__.py b/app/twitch/__init__.py new file mode 100644 index 0000000..609a76f --- /dev/null +++ b/app/twitch/__init__.py @@ -0,0 +1 @@ +"""Twitch modules.""" diff --git a/app/twitch/chat.py b/app/twitch/chat.py new file mode 100644 index 0000000..9c08431 --- /dev/null +++ b/app/twitch/chat.py @@ -0,0 +1,66 @@ +"""Twitch chat client for sending and receiving messages.""" + +import logging +from typing import Optional + +logger = logging.getLogger(__name__) + + +async def send_chat_message( + channel_name: str, + message: str, + access_token: Optional[str] = None, +) -> bool: + """ + Send a message to Twitch chat. + + Args: + channel_name: Twitch channel to send message to + message: Message content + access_token: OAuth token with chat:edit scope + + Returns: + True if message sent successfully + + TODO: Implement Twitch Send Chat Message API + Reference: https://dev.twitch.tv/docs/api/reference#send-chat-message + + TODO: Handle rate limiting (20 messages per 30 seconds for verified bots) + TODO: Implement message queue for reliable delivery + TODO: Add retry logic with exponential backoff + """ + logger.info(f"Sending message to {channel_name}: {message[:50]}...") + # Stub implementation + return True + + +class ChatMessageBuffer: + """ + Buffer for outgoing chat messages with rate limiting. + + Implements Twitch's chat rate limits: + - Regular users: 20 messages per 30 seconds + - Verified bots: 50 messages per 30 seconds + - Moderators: 100 messages per 30 seconds + + TODO: Implement queue with configurable rate limits + TODO: Add priority levels for urgent messages + TODO: Implement metrics tracking + """ + + def __init__(self, channel_name: str, max_messages_per_interval: int = 20): + """Initialize message buffer.""" + self.channel_name = channel_name + self.max_messages_per_interval = max_messages_per_interval + self.message_queue: list[str] = [] + + async def add_message(self, message: str) -> None: + """Add a message to the buffer.""" + self.message_queue.append(message) + logger.debug(f"Message queued for {self.channel_name}") + + async def flush(self) -> None: + """Send all buffered messages.""" + for message in self.message_queue: + await send_chat_message(self.channel_name, message) + self.message_queue.clear() diff --git a/app/twitch/eventsub.py b/app/twitch/eventsub.py new file mode 100644 index 0000000..6d0169d --- /dev/null +++ b/app/twitch/eventsub.py @@ -0,0 +1,103 @@ +"""Twitch EventSub client for handling stream events.""" + +import logging +from typing import Callable, Optional + +logger = logging.getLogger(__name__) + + +class TwitchEventSubClient: + """ + Client for Twitch EventSub WebSocket connections. + + Handles real-time stream events like chat messages, follows, raids, etc. + + TODO: Implement real OAuth 2.0 token exchange flow + TODO: Implement WebSocket connection to Twitch EventSub + TODO: Handle subscription management (follow, subscribe, cheer, raid events) + TODO: Implement heartbeat and reconnection logic + """ + + def __init__(self, client_id: str, access_token: str): + """ + Initialize EventSub client. + + Args: + client_id: Twitch application client ID + access_token: OAuth token for API calls + """ + self.client_id = client_id + self.access_token = access_token + self.connected = False + self.event_handlers: dict[str, Callable] = {} + + async def connect(self, channel_id: str) -> bool: + """ + Establish WebSocket connection to Twitch EventSub. + + Args: + channel_id: Twitch channel ID to monitor + + Returns: + True if connection successful + + TODO: Implement WebSocket handshake + TODO: Subscribe to stream.online, stream.offline + """ + logger.info(f"Attempting to connect to EventSub for channel {channel_id}") + self.connected = True + return True + + async def disconnect(self) -> None: + """ + Close EventSub connection gracefully. + + TODO: Send close frame to WebSocket + TODO: Clean up subscriptions + """ + logger.info("Disconnecting from EventSub") + self.connected = False + + async def listen(self) -> None: + """ + Listen for incoming EventSub events (blocking call). + + Should run in a background task and emit events to registered handlers. + + TODO: Implement WebSocket message loop + TODO: Parse and dispatch events to registered handlers + TODO: Handle reconnection on failure + """ + logger.info("EventSub listener started (stub)") + pass + + def on(self, event_type: str) -> Callable: + """ + Register an event handler for a specific event type. + + Args: + event_type: Type of event (e.g., 'stream.online', 'channel.follow') + + Returns: + Decorator function + """ + def decorator(func: Callable) -> Callable: + self.event_handlers[event_type] = func + logger.debug(f"Registered handler for {event_type}") + return func + return decorator + + async def emit_event(self, event_type: str, data: dict) -> None: + """ + Emit an event to registered handlers (internal use). + + Args: + event_type: Type of event + data: Event data payload + """ + if event_type in self.event_handlers: + handler = self.event_handlers[event_type] + try: + await handler(data) + except Exception as e: + logger.error(f"Error in event handler for {event_type}: {e}") diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..74fd5b0 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,64 @@ +version: '3.9' + +services: + sanctum-db: + image: postgres:16-alpine + container_name: sanctum-db + environment: + POSTGRES_DB: sanctum + POSTGRES_USER: sanctum + POSTGRES_PASSWORD: ${DB_PASSWORD:-password} + volumes: + - sanctum_data:/var/lib/postgresql/data + ports: + - "5432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U sanctum -d sanctum"] + interval: 10s + timeout: 5s + retries: 5 + networks: + - sanctum-net + + sanctum-agent: + build: + context: . + dockerfile: Dockerfile + container_name: sanctum-agent + depends_on: + sanctum-db: + condition: service_healthy + environment: + APP_NAME: "Sanctum Chronicler" + APP_ENV: ${APP_ENV:-development} + DEBUG: ${DEBUG:-false} + DATABASE_URL: postgresql+asyncpg://sanctum:${DB_PASSWORD:-password}@sanctum-db:5432/sanctum + TWITCH_CLIENT_ID: ${TWITCH_CLIENT_ID:-} + TWITCH_CLIENT_SECRET: ${TWITCH_CLIENT_SECRET:-} + TWITCH_BOT_USERNAME: ${TWITCH_BOT_USERNAME:-} + TWITCH_CHANNEL_NAME: ${TWITCH_CHANNEL_NAME:-} + LLM_PROVIDER: ${LLM_PROVIDER:-} + LLM_BASE_URL: ${LLM_BASE_URL:-} + LLM_API_KEY: ${LLM_API_KEY:-} + LLM_MODEL: ${LLM_MODEL:-gpt-3.5-turbo} + EXPORT_PATH: /app/exports + volumes: + - ./exports:/app/exports + ports: + - "8000:8000" + networks: + - sanctum-net + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8000/health"] + interval: 30s + timeout: 3s + retries: 3 + start_period: 10s + +volumes: + sanctum_data: + driver: local + +networks: + sanctum-net: + driver: bridge diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..45777ff --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +fastapi==0.104.1 +uvicorn[standard]==0.24.0 +pydantic==2.5.0 +pydantic-settings==2.1.0 +sqlalchemy==2.0.23 +asyncpg==0.29.0 +psycopg2-binary==2.9.9 +httpx==0.25.2 +python-dotenv==1.0.0