Prevent duplicate Twitch session prompts
This commit is contained in:
@@ -88,6 +88,47 @@ class AgentOrchestrator:
|
|||||||
logger.info(f"Started session {session_id} for {channel_name}")
|
logger.info(f"Started session {session_id} for {channel_name}")
|
||||||
return session_id
|
return session_id
|
||||||
|
|
||||||
|
async def ensure_single_active_session_for_channel(self, channel_name: str) -> str:
|
||||||
|
"""Return one active session for a channel and end duplicate active sessions."""
|
||||||
|
normalized_channel = channel_name.lower()
|
||||||
|
matching_sessions = [
|
||||||
|
(session_id, session)
|
||||||
|
for session_id, session in self.active_sessions.items()
|
||||||
|
if session.get("channel_name", "").lower() == normalized_channel
|
||||||
|
]
|
||||||
|
|
||||||
|
if not matching_sessions:
|
||||||
|
return await self.start_session(channel_name)
|
||||||
|
|
||||||
|
keep_session_id, _ = max(
|
||||||
|
matching_sessions,
|
||||||
|
key=lambda item: item[1].get("started_at", datetime.min),
|
||||||
|
)
|
||||||
|
duplicate_session_ids = [
|
||||||
|
session_id
|
||||||
|
for session_id, _ in matching_sessions
|
||||||
|
if session_id != keep_session_id
|
||||||
|
]
|
||||||
|
|
||||||
|
if duplicate_session_ids:
|
||||||
|
async for db_session in get_session():
|
||||||
|
repo = Repository(db_session)
|
||||||
|
for session_id in duplicate_session_ids:
|
||||||
|
await repo.end_session(session_id)
|
||||||
|
|
||||||
|
for session_id in duplicate_session_ids:
|
||||||
|
self.active_sessions.pop(session_id, None)
|
||||||
|
self.chat_activity.clear_activity(session_id)
|
||||||
|
|
||||||
|
logger.warning(
|
||||||
|
"Ended %s duplicate active session(s) for channel %s; keeping %s",
|
||||||
|
len(duplicate_session_ids),
|
||||||
|
channel_name,
|
||||||
|
keep_session_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
return keep_session_id
|
||||||
|
|
||||||
async def restore_active_sessions(self) -> int:
|
async def restore_active_sessions(self) -> int:
|
||||||
"""Restore active sessions from the database after app startup."""
|
"""Restore active sessions from the database after app startup."""
|
||||||
restored_count = 0
|
restored_count = 0
|
||||||
|
|||||||
@@ -27,6 +27,10 @@ class ChatActivityPolicy:
|
|||||||
"""Get the most recent chat activity time for a session."""
|
"""Get the most recent chat activity time for a session."""
|
||||||
return self.last_message_time.get(session_id)
|
return self.last_message_time.get(session_id)
|
||||||
|
|
||||||
|
def clear_activity(self, session_id: str) -> None:
|
||||||
|
"""Stop tracking activity for an ended session."""
|
||||||
|
self.last_message_time.pop(session_id, None)
|
||||||
|
|
||||||
def minutes_since_activity(self, session_id: str) -> int:
|
def minutes_since_activity(self, session_id: str) -> int:
|
||||||
"""Get minutes since last chat message."""
|
"""Get minutes since last chat message."""
|
||||||
if session_id not in self.last_message_time:
|
if session_id not in self.last_message_time:
|
||||||
|
|||||||
17
app/main.py
17
app/main.py
@@ -92,20 +92,7 @@ async def get_or_create_twitch_session(channel_name: str) -> str:
|
|||||||
if not orchestrator:
|
if not orchestrator:
|
||||||
raise RuntimeError("Orchestrator not initialized")
|
raise RuntimeError("Orchestrator not initialized")
|
||||||
|
|
||||||
normalized_channel = channel_name.lower()
|
return await orchestrator.ensure_single_active_session_for_channel(channel_name)
|
||||||
matching_sessions = [
|
|
||||||
(session_id, session)
|
|
||||||
for session_id, session in orchestrator.active_sessions.items()
|
|
||||||
if session.get("channel_name", "").lower() == normalized_channel
|
|
||||||
]
|
|
||||||
if matching_sessions:
|
|
||||||
session_id, _ = max(
|
|
||||||
matching_sessions,
|
|
||||||
key=lambda item: item[1].get("started_at", datetime.min),
|
|
||||||
)
|
|
||||||
return session_id
|
|
||||||
|
|
||||||
return await orchestrator.start_session(channel_name)
|
|
||||||
|
|
||||||
|
|
||||||
async def handle_twitch_chat_message(message: TwitchChatMessage) -> None:
|
async def handle_twitch_chat_message(message: TwitchChatMessage) -> None:
|
||||||
@@ -159,8 +146,8 @@ async def startup_event():
|
|||||||
loop_interval_seconds=settings.AGENT_LOOP_INTERVAL_SECONDS
|
loop_interval_seconds=settings.AGENT_LOOP_INTERVAL_SECONDS
|
||||||
)
|
)
|
||||||
await orchestrator.restore_active_sessions()
|
await orchestrator.restore_active_sessions()
|
||||||
agent_loop_task = asyncio.create_task(agent_loop())
|
|
||||||
await start_twitch_chat()
|
await start_twitch_chat()
|
||||||
|
agent_loop_task = asyncio.create_task(agent_loop())
|
||||||
logger.info("Application started successfully")
|
logger.info("Application started successfully")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to start application: {e}")
|
logger.error(f"Failed to start application: {e}")
|
||||||
|
|||||||
Reference in New Issue
Block a user