86 lines
2.5 KiB
Python
86 lines
2.5 KiB
Python
"""Warden Mode - Detects and flags suspicious content."""
|
|
|
|
import logging
|
|
from app.llm.client import LLMClient
|
|
from app.llm.prompts import PromptTemplates
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class WardenMode:
|
|
"""
|
|
Warden - The guardian against unwanted influences.
|
|
|
|
Purpose:
|
|
- Detects suspicious patterns (spam, scams, bot activity)
|
|
- Flags Discord growth schemes and link spam
|
|
- Monitors for manipulation or harmful content
|
|
- Provides data for moderation decisions
|
|
|
|
Policy:
|
|
- Runs on every message (always active)
|
|
- Never takes action directly (only flags)
|
|
- Patterns to detect:
|
|
* "Join our Discord"
|
|
* "Grow your channel"
|
|
* Multiple links
|
|
* Repeated messages (spam)
|
|
* Known scam keywords
|
|
- Flags are recorded for human review
|
|
"""
|
|
|
|
def __init__(self, llm_client: LLMClient):
|
|
"""Initialize Warden mode."""
|
|
self.llm_client = llm_client
|
|
self.suspicious_patterns = [
|
|
"join our discord",
|
|
"discord.gg",
|
|
"grow your channel",
|
|
"easy money",
|
|
"click here",
|
|
"limited offer",
|
|
"act now",
|
|
]
|
|
self.flagged_count = 0
|
|
|
|
async def analyze_message(self, message: str) -> dict:
|
|
"""Analyze a message for suspicious content."""
|
|
result = {
|
|
"is_suspicious": False,
|
|
"patterns_detected": [],
|
|
"severity": "safe",
|
|
}
|
|
|
|
# Simple pattern matching
|
|
message_lower = message.lower()
|
|
for pattern in self.suspicious_patterns:
|
|
if pattern in message_lower:
|
|
result["patterns_detected"].append(pattern)
|
|
result["is_suspicious"] = True
|
|
|
|
# Check for multiple links
|
|
link_count = message.count("http") + message.count("www")
|
|
if link_count > 1:
|
|
result["patterns_detected"].append("multiple_links")
|
|
result["is_suspicious"] = True
|
|
|
|
# Determine severity
|
|
if result["is_suspicious"]:
|
|
if len(result["patterns_detected"]) >= 2:
|
|
result["severity"] = "high"
|
|
else:
|
|
result["severity"] = "medium"
|
|
self.flagged_count += 1
|
|
logger.warning(
|
|
f"Warden flagged suspicious message: {result['patterns_detected']}"
|
|
)
|
|
|
|
return result
|
|
|
|
async def get_report(self) -> dict:
|
|
"""Get Warden's activity report."""
|
|
return {
|
|
"mode": "warden",
|
|
"total_flagged": self.flagged_count,
|
|
}
|