diff --git a/app/agent/orchestrator.py b/app/agent/orchestrator.py index 6faa5ad..e65a9e3 100644 --- a/app/agent/orchestrator.py +++ b/app/agent/orchestrator.py @@ -88,6 +88,47 @@ class AgentOrchestrator: logger.info(f"Started session {session_id} for {channel_name}") return session_id + async def ensure_single_active_session_for_channel(self, channel_name: str) -> str: + """Return one active session for a channel and end duplicate active sessions.""" + normalized_channel = channel_name.lower() + matching_sessions = [ + (session_id, session) + for session_id, session in self.active_sessions.items() + if session.get("channel_name", "").lower() == normalized_channel + ] + + if not matching_sessions: + return await self.start_session(channel_name) + + keep_session_id, _ = max( + matching_sessions, + key=lambda item: item[1].get("started_at", datetime.min), + ) + duplicate_session_ids = [ + session_id + for session_id, _ in matching_sessions + if session_id != keep_session_id + ] + + if duplicate_session_ids: + async for db_session in get_session(): + repo = Repository(db_session) + for session_id in duplicate_session_ids: + await repo.end_session(session_id) + + for session_id in duplicate_session_ids: + self.active_sessions.pop(session_id, None) + self.chat_activity.clear_activity(session_id) + + logger.warning( + "Ended %s duplicate active session(s) for channel %s; keeping %s", + len(duplicate_session_ids), + channel_name, + keep_session_id, + ) + + return keep_session_id + async def restore_active_sessions(self) -> int: """Restore active sessions from the database after app startup.""" restored_count = 0 diff --git a/app/agent/policies.py b/app/agent/policies.py index 65a8854..50d4259 100644 --- a/app/agent/policies.py +++ b/app/agent/policies.py @@ -27,6 +27,10 @@ class ChatActivityPolicy: """Get the most recent chat activity time for a session.""" return self.last_message_time.get(session_id) + def clear_activity(self, session_id: str) -> None: + """Stop tracking activity for an ended session.""" + self.last_message_time.pop(session_id, None) + def minutes_since_activity(self, session_id: str) -> int: """Get minutes since last chat message.""" if session_id not in self.last_message_time: diff --git a/app/main.py b/app/main.py index 555d3d4..142c244 100644 --- a/app/main.py +++ b/app/main.py @@ -92,20 +92,7 @@ async def get_or_create_twitch_session(channel_name: str) -> str: if not orchestrator: raise RuntimeError("Orchestrator not initialized") - normalized_channel = channel_name.lower() - matching_sessions = [ - (session_id, session) - for session_id, session in orchestrator.active_sessions.items() - if session.get("channel_name", "").lower() == normalized_channel - ] - if matching_sessions: - session_id, _ = max( - matching_sessions, - key=lambda item: item[1].get("started_at", datetime.min), - ) - return session_id - - return await orchestrator.start_session(channel_name) + return await orchestrator.ensure_single_active_session_for_channel(channel_name) async def handle_twitch_chat_message(message: TwitchChatMessage) -> None: @@ -159,8 +146,8 @@ async def startup_event(): loop_interval_seconds=settings.AGENT_LOOP_INTERVAL_SECONDS ) await orchestrator.restore_active_sessions() - agent_loop_task = asyncio.create_task(agent_loop()) await start_twitch_chat() + agent_loop_task = asyncio.create_task(agent_loop()) logger.info("Application started successfully") except Exception as e: logger.error(f"Failed to start application: {e}")