Merge PR #261: improve error handling and type hints in session_search_tool

Authored by aydnOktay. Adds TimeoutError handling for session summarization,
better exception specificity in _format_timestamp, defensive try/except in
_resolve_to_parent, and type hints.
This commit is contained in:
teknium1
2026-03-04 21:23:56 -08:00

View File

@@ -20,7 +20,7 @@ import concurrent.futures
import json import json
import os import os
import logging import logging
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional, Union
from openai import AsyncOpenAI, OpenAI from openai import AsyncOpenAI, OpenAI
@@ -33,8 +33,16 @@ MAX_SESSION_CHARS = 100_000
MAX_SUMMARY_TOKENS = 10000 MAX_SUMMARY_TOKENS = 10000
def _format_timestamp(ts) -> str: def _format_timestamp(ts: Optional[Any]) -> str:
"""Convert a Unix timestamp (float/int) or ISO string to a human-readable date.""" """
Convert a Unix timestamp (float/int) or ISO string to a human-readable date.
Args:
ts: Unix timestamp (int/float), ISO string, or None
Returns:
Human-readable date string or "unknown" if conversion fails
"""
if ts is None: if ts is None:
return "unknown" return "unknown"
try: try:
@@ -48,8 +56,11 @@ def _format_timestamp(ts) -> str:
dt = datetime.fromtimestamp(float(ts)) dt = datetime.fromtimestamp(float(ts))
return dt.strftime("%B %d, %Y at %I:%M %p") return dt.strftime("%B %d, %Y at %I:%M %p")
return ts return ts
except Exception: except (ValueError, OSError, OverflowError) as e:
pass # Log specific errors for debugging while gracefully handling edge cases
logging.debug("Failed to format timestamp %s: %s", ts, e)
except Exception as e:
logging.debug("Unexpected error formatting timestamp %s: %s", ts, e)
return str(ts) return str(ts)
@@ -225,18 +236,31 @@ def session_search(
# Resolve child sessions to their parent — delegation stores detailed # Resolve child sessions to their parent — delegation stores detailed
# content in child sessions, but the user's conversation is the parent. # content in child sessions, but the user's conversation is the parent.
def _resolve_to_parent(session_id): def _resolve_to_parent(session_id: str) -> Optional[str]:
"""
Resolve a session ID to its parent session ID, handling delegation chains.
Args:
session_id: The session ID to resolve
Returns:
Parent session ID or None if resolution fails
"""
visited = set() visited = set()
sid = session_id sid = session_id
while sid and sid not in visited: while sid and sid not in visited:
visited.add(sid) visited.add(sid)
session = db.get_session(sid) try:
if not session: session = db.get_session(sid)
break if not session:
parent = session.get("parent_session_id") break
if parent: parent = session.get("parent_session_id")
sid = parent if parent:
else: sid = parent
else:
break
except Exception as e:
logging.debug("Error resolving parent for session %s: %s", sid, e)
break break
return sid return sid
@@ -272,7 +296,8 @@ def session_search(
logging.warning(f"Failed to prepare session {session_id}: {e}") logging.warning(f"Failed to prepare session {session_id}: {e}")
# Summarize all sessions in parallel # Summarize all sessions in parallel
async def _summarize_all(): async def _summarize_all() -> List[Union[str, Exception]]:
"""Summarize all sessions in parallel."""
coros = [ coros = [
_summarize_session(text, query, meta) _summarize_session(text, query, meta)
for _, _, text, meta in tasks for _, _, text, meta in tasks
@@ -284,7 +309,14 @@ def session_search(
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
results = pool.submit(lambda: asyncio.run(_summarize_all())).result(timeout=60) results = pool.submit(lambda: asyncio.run(_summarize_all())).result(timeout=60)
except RuntimeError: except RuntimeError:
# No event loop running, create a new one
results = asyncio.run(_summarize_all()) results = asyncio.run(_summarize_all())
except concurrent.futures.TimeoutError:
logging.warning("Session summarization timed out after 60 seconds")
return json.dumps({
"success": False,
"error": "Session summarization timed out. Try a more specific query or reduce the limit.",
}, ensure_ascii=False)
summaries = [] summaries = []
for (session_id, match_info, _, _), result in zip(tasks, results): for (session_id, match_info, _, _), result in zip(tasks, results):