diff --git a/gateway/run.py b/gateway/run.py index 877313047b9..c809cb62304 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1807,9 +1807,22 @@ class GatewayRunner: _STALE_TTL = (_raw_stale_timeout + 60) if _raw_stale_timeout > 0 else float("inf") _stale_ts = self._running_agents_ts.get(_quick_key, 0) if _quick_key in self._running_agents and _stale_ts and (time.time() - _stale_ts) > _STALE_TTL: + _stale_age = time.time() - _stale_ts + _stale_agent = self._running_agents.get(_quick_key) + _stale_detail = "" + if _stale_agent and hasattr(_stale_agent, "get_activity_summary"): + try: + _sa = _stale_agent.get_activity_summary() + _stale_detail = ( + f" | last_activity={_sa.get('last_activity_desc', 'unknown')} " + f"({_sa.get('seconds_since_activity', 0):.0f}s ago) " + f"| iteration={_sa.get('api_call_count', 0)}/{_sa.get('max_iterations', 0)}" + ) + except Exception: + pass logger.warning( - "Evicting stale _running_agents entry for %s (age: %.0fs)", - _quick_key[:30], time.time() - _stale_ts, + "Evicting stale _running_agents entry for %s (age: %.0fs, TTL: %.0fs)%s", + _quick_key[:30], _stale_age, _STALE_TTL, _stale_detail, ) del self._running_agents[_quick_key] self._running_agents_ts.pop(_quick_key, None) @@ -6727,10 +6740,24 @@ class GatewayRunner: while True: await asyncio.sleep(_NOTIFY_INTERVAL) _elapsed_mins = int((time.time() - _notify_start) // 60) + # Include agent activity context if available. + _agent_ref = agent_holder[0] + _status_detail = "" + if _agent_ref and hasattr(_agent_ref, "get_activity_summary"): + try: + _a = _agent_ref.get_activity_summary() + _parts = [f"iteration {_a['api_call_count']}/{_a['max_iterations']}"] + if _a.get("current_tool"): + _parts.append(f"running: {_a['current_tool']}") + else: + _parts.append(_a.get("last_activity_desc", "")) + _status_detail = " — " + ", ".join(_parts) + except Exception: + pass try: await _notify_adapter.send( source.chat_id, - f"⏳ Still working... ({_elapsed_mins} minutes elapsed)", + f"⏳ Still working... ({_elapsed_mins} min elapsed{_status_detail})", metadata=_status_thread_metadata, ) except Exception as _ne: @@ -6752,26 +6779,66 @@ class GatewayRunner: timeout=_agent_timeout, ) except asyncio.TimeoutError: + # Build a diagnostic summary from the agent's activity tracker. + _timed_out_agent = agent_holder[0] + _activity = {} + if _timed_out_agent and hasattr(_timed_out_agent, "get_activity_summary"): + try: + _activity = _timed_out_agent.get_activity_summary() + except Exception: + pass + + _last_desc = _activity.get("last_activity_desc", "unknown") + _secs_ago = _activity.get("seconds_since_activity", 0) + _cur_tool = _activity.get("current_tool") + _iter_n = _activity.get("api_call_count", 0) + _iter_max = _activity.get("max_iterations", 0) + logger.error( - "Agent execution timed out after %.0fs for session %s", + "Agent execution timed out after %.0fs for session %s " + "| last_activity=%.0fs ago (%s) | iteration=%s/%s | tool=%s", _agent_timeout, session_key, + _secs_ago, _last_desc, _iter_n, _iter_max, + _cur_tool or "none", ) + # Interrupt the agent if it's still running so the thread # pool worker is freed. - _timed_out_agent = agent_holder[0] if _timed_out_agent and hasattr(_timed_out_agent, "interrupt"): _timed_out_agent.interrupt("Execution timed out") + _timeout_mins = int(_agent_timeout // 60) + + # Construct a user-facing message with diagnostic context. + _diag_lines = [f"⏱️ Request timed out after {_timeout_mins} minutes."] + if _secs_ago < 30: + _diag_lines.append( + f"The agent was actively working when the timeout fired " + f"(last activity: {_last_desc}, {_secs_ago:.0f}s ago, " + f"iteration {_iter_n}/{_iter_max})." + ) + elif _cur_tool: + _diag_lines.append( + f"The agent appears stuck on tool `{_cur_tool}` " + f"({_secs_ago:.0f}s since last activity, " + f"iteration {_iter_n}/{_iter_max})." + ) + else: + _diag_lines.append( + f"Last activity: {_last_desc} ({_secs_ago:.0f}s ago, " + f"iteration {_iter_n}/{_iter_max}). " + "The agent may have been waiting on an API response." + ) + _diag_lines.append( + "To increase the limit, set HERMES_AGENT_TIMEOUT in your .env " + "(value in seconds, 0 = no limit) and restart the gateway.\n" + "Try again, or use /reset to start fresh." + ) + response = { - "final_response": ( - f"⏱️ Request timed out after {_timeout_mins} minutes. " - "The agent may have been stuck on a tool or API call.\n" - "To increase the limit, set HERMES_AGENT_TIMEOUT in your .env " - "(value in seconds, 0 = no limit) and restart the gateway.\n" - "Try again, or use /reset to start fresh." - ), + "final_response": "\n".join(_diag_lines), "messages": result_holder[0].get("messages", []) if result_holder[0] else [], - "api_calls": 0, + "api_calls": _iter_n, "tools": tools_holder[0] or [], "history_offset": 0, "failed": True, diff --git a/run_agent.py b/run_agent.py index af40344df5e..619796c9756 100644 --- a/run_agent.py +++ b/run_agent.py @@ -707,6 +707,15 @@ class AIAgent: # status_callback for gateway platforms. Does NOT inject into messages. self._context_pressure_warned = False + # Activity tracking — updated on each API call, tool execution, and + # stream chunk. Used by the gateway timeout handler to report what the + # agent was doing when it was killed, and by the "still working" + # notifications to show progress. + self._last_activity_ts: float = time.time() + self._last_activity_desc: str = "initializing" + self._current_tool: str | None = None + self._api_call_count: int = 0 + # Persistent error log -- always writes WARNING+ to ~/.hermes/logs/errors.log # so tool failures, API errors, etc. are inspectable after the fact. # In gateway mode, each incoming message creates a new AIAgent instance, @@ -2617,6 +2626,29 @@ class AIAgent: self._interrupt_message = None _set_interrupt(False) + def _touch_activity(self, desc: str) -> None: + """Update the last-activity timestamp and description (thread-safe).""" + self._last_activity_ts = time.time() + self._last_activity_desc = desc + + def get_activity_summary(self) -> dict: + """Return a snapshot of the agent's current activity for diagnostics. + + Called by the gateway timeout handler to report what the agent was doing + when it was killed, and by the periodic "still working" notifications. + """ + elapsed = time.time() - self._last_activity_ts + return { + "last_activity_ts": self._last_activity_ts, + "last_activity_desc": self._last_activity_desc, + "seconds_since_activity": round(elapsed, 1), + "current_tool": self._current_tool, + "api_call_count": self._api_call_count, + "max_iterations": self.max_iterations, + "budget_used": self.iteration_budget.used, + "budget_max": self.iteration_budget.max_total, + } + def shutdown_memory_provider(self, messages: list = None) -> None: """Shut down the memory provider — call at actual session boundaries. @@ -4354,6 +4386,7 @@ class AIAgent: # Reset stale-stream timer so the detector measures from this # attempt's start, not a previous attempt's last chunk. last_chunk_time["t"] = time.time() + self._touch_activity("waiting for provider response (streaming)") stream = request_client_holder["client"].chat.completions.create(**stream_kwargs) content_parts: list = [] @@ -4374,8 +4407,12 @@ class AIAgent: # knows whether reasoning was already displayed during streaming. self._reasoning_deltas_fired = False + _first_chunk_seen = False for chunk in stream: last_chunk_time["t"] = time.time() + if not _first_chunk_seen: + _first_chunk_seen = True + self._touch_activity("receiving stream response") if self._interrupt_requested: break @@ -4726,10 +4763,20 @@ class AIAgent: # Detect stale streams: connections kept alive by SSE pings # but delivering no real chunks. Kill the client so the # inner retry loop can start a fresh connection. - if time.time() - last_chunk_time["t"] > _stream_stale_timeout: + _stale_elapsed = time.time() - last_chunk_time["t"] + if _stale_elapsed > _stream_stale_timeout: + _est_ctx = sum(len(str(v)) for v in api_kwargs.get("messages", [])) // 4 logger.warning( - "Stream stale for %.0fs — no chunks received. Killing connection.", - _stream_stale_timeout, + "Stream stale for %.0fs (threshold %.0fs) — no chunks received. " + "model=%s context=~%s tokens. Killing connection.", + _stale_elapsed, _stream_stale_timeout, + api_kwargs.get("model", "unknown"), f"{_est_ctx:,}", + ) + self._emit_status( + f"⚠️ No response from provider for {int(_stale_elapsed)}s " + f"(model: {api_kwargs.get('model', 'unknown')}, " + f"context: ~{_est_ctx:,} tokens). " + f"Reconnecting..." ) try: rc = request_client_holder.get("client") @@ -6153,6 +6200,9 @@ class AIAgent: response_preview = function_result[:self.log_prefix_chars] + "..." if len(function_result) > self.log_prefix_chars else function_result print(f" ✅ Tool {i+1} completed in {tool_duration:.2f}s - {response_preview}") + self._current_tool = None + self._touch_activity(f"tool completed: {name} ({tool_duration:.1f}s)") + if self.tool_complete_callback: try: self.tool_complete_callback(tc.id, name, args, function_result) @@ -6238,6 +6288,9 @@ class AIAgent: args_preview = args_str[:self.log_prefix_chars] + "..." if len(args_str) > self.log_prefix_chars else args_str print(f" 📞 Tool {i}: {function_name}({list(function_args.keys())}) - {args_preview}") + self._current_tool = function_name + self._touch_activity(f"executing tool: {function_name}") + if self.tool_progress_callback: try: preview = _build_tool_preview(function_name, function_args) @@ -6437,6 +6490,9 @@ class AIAgent: except Exception as cb_err: logging.debug(f"Tool progress callback error: {cb_err}") + self._current_tool = None + self._touch_activity(f"tool completed: {function_name} ({tool_duration:.1f}s)") + if self.verbose_logging: logging.debug(f"Tool {function_name} completed in {tool_duration:.2f}s") logging.debug(f"Tool result ({len(function_result)} chars): {function_result}") @@ -7033,6 +7089,8 @@ class AIAgent: break api_call_count += 1 + self._api_call_count = api_call_count + self._touch_activity(f"starting API call #{api_call_count}") if not self.iteration_budget.consume(): if not self.quiet_mode: self._safe_print(f"\n⚠️ Iteration budget exhausted ({self.iteration_budget.used}/{self.iteration_budget.max_total} iterations used)") @@ -7634,6 +7692,7 @@ class AIAgent: self._vprint(f"{self.log_prefix} 💾 Cache: {cached:,}/{prompt:,} tokens ({hit_pct:.0f}% hit, {written:,} written)") has_retried_429 = False # Reset on success + self._touch_activity(f"API call #{api_call_count} completed") break # Success, exit retry loop except InterruptedError: @@ -8008,7 +8067,7 @@ class AIAgent: "error": f"Context length exceeded: max compression attempts ({max_compression_attempts}) reached.", "partial": True } - self._vprint(f"{self.log_prefix} 🗜️ Context compression attempt {compression_attempts}/{max_compression_attempts}...") + self._emit_status(f"🗜️ Context too large (~{approx_tokens:,} tokens) — compressing ({compression_attempts}/{max_compression_attempts})...") original_len = len(messages) messages, active_system_prompt = self._compress_context( @@ -8076,6 +8135,10 @@ class AIAgent: self._dump_api_request_debug( api_kwargs, reason="non_retryable_client_error", error=api_error, ) + self._emit_status( + f"❌ Non-retryable error (HTTP {status_code}): " + f"{self._summarize_api_error(api_error)}" + ) self._vprint(f"{self.log_prefix}❌ Non-retryable client error (HTTP {status_code}). Aborting.", force=True) self._vprint(f"{self.log_prefix} 🔌 Provider: {_provider} Model: {_model}", force=True) self._vprint(f"{self.log_prefix} 🌐 Endpoint: {_base}", force=True) @@ -8129,9 +8192,9 @@ class AIAgent: continue _final_summary = self._summarize_api_error(api_error) if is_rate_limited: - self._vprint(f"{self.log_prefix}❌ Rate limit persisted after {max_retries} retries. Please try again later.", force=True) + self._emit_status(f"❌ Rate limited after {max_retries} retries — {_final_summary}") else: - self._vprint(f"{self.log_prefix}❌ Max retries ({max_retries}) exceeded. Giving up.", force=True) + self._emit_status(f"❌ API failed after {max_retries} retries — {_final_summary}") self._vprint(f"{self.log_prefix} 💀 Final error: {_final_summary}", force=True) # Detect SSE stream-drop pattern (e.g. "Network