diff --git a/agent/rate_limit_tracker.py b/agent/rate_limit_tracker.py new file mode 100644 index 0000000000..c87e096a1d --- /dev/null +++ b/agent/rate_limit_tracker.py @@ -0,0 +1,242 @@ +"""Rate limit tracking for inference API responses. + +Captures x-ratelimit-* headers from provider responses and provides +formatted display for the /usage slash command. Currently supports +the Nous Portal header format (also used by OpenRouter and OpenAI-compatible +APIs that follow the same convention). + +Header schema (12 headers total): + x-ratelimit-limit-requests RPM cap + x-ratelimit-limit-requests-1h RPH cap + x-ratelimit-limit-tokens TPM cap + x-ratelimit-limit-tokens-1h TPH cap + x-ratelimit-remaining-requests requests left in minute window + x-ratelimit-remaining-requests-1h requests left in hour window + x-ratelimit-remaining-tokens tokens left in minute window + x-ratelimit-remaining-tokens-1h tokens left in hour window + x-ratelimit-reset-requests seconds until minute request window resets + x-ratelimit-reset-requests-1h seconds until hour request window resets + x-ratelimit-reset-tokens seconds until minute token window resets + x-ratelimit-reset-tokens-1h seconds until hour token window resets +""" + +from __future__ import annotations + +import time +from dataclasses import dataclass, field +from typing import Any, Dict, Mapping, Optional + + +@dataclass +class RateLimitBucket: + """One rate-limit window (e.g. requests per minute).""" + + limit: int = 0 + remaining: int = 0 + reset_seconds: float = 0.0 + captured_at: float = 0.0 # time.time() when this was captured + + @property + def used(self) -> int: + return max(0, self.limit - self.remaining) + + @property + def usage_pct(self) -> float: + if self.limit <= 0: + return 0.0 + return (self.used / self.limit) * 100.0 + + @property + def remaining_seconds_now(self) -> float: + """Estimated seconds remaining until reset, adjusted for elapsed time.""" + elapsed = time.time() - self.captured_at + return max(0.0, self.reset_seconds - elapsed) + + +@dataclass +class RateLimitState: + """Full rate-limit state parsed from response headers.""" + + requests_min: RateLimitBucket = field(default_factory=RateLimitBucket) + requests_hour: RateLimitBucket = field(default_factory=RateLimitBucket) + tokens_min: RateLimitBucket = field(default_factory=RateLimitBucket) + tokens_hour: RateLimitBucket = field(default_factory=RateLimitBucket) + captured_at: float = 0.0 # when the headers were captured + provider: str = "" + + @property + def has_data(self) -> bool: + return self.captured_at > 0 + + @property + def age_seconds(self) -> float: + if not self.has_data: + return float("inf") + return time.time() - self.captured_at + + +def _safe_int(value: Any, default: int = 0) -> int: + try: + return int(float(value)) + except (TypeError, ValueError): + return default + + +def _safe_float(value: Any, default: float = 0.0) -> float: + try: + return float(value) + except (TypeError, ValueError): + return default + + +def parse_rate_limit_headers( + headers: Mapping[str, str], + provider: str = "", +) -> Optional[RateLimitState]: + """Parse x-ratelimit-* headers into a RateLimitState. + + Returns None if no rate limit headers are present. + """ + # Quick check: at least one rate limit header must exist + has_any = any(k.lower().startswith("x-ratelimit-") for k in headers) + if not has_any: + return None + + now = time.time() + + def _bucket(resource: str, suffix: str = "") -> RateLimitBucket: + # e.g. resource="requests", suffix="" -> per-minute + # resource="tokens", suffix="-1h" -> per-hour + tag = f"{resource}{suffix}" + return RateLimitBucket( + limit=_safe_int(headers.get(f"x-ratelimit-limit-{tag}")), + remaining=_safe_int(headers.get(f"x-ratelimit-remaining-{tag}")), + reset_seconds=_safe_float(headers.get(f"x-ratelimit-reset-{tag}")), + captured_at=now, + ) + + return RateLimitState( + requests_min=_bucket("requests"), + requests_hour=_bucket("requests", "-1h"), + tokens_min=_bucket("tokens"), + tokens_hour=_bucket("tokens", "-1h"), + captured_at=now, + provider=provider, + ) + + +# ── Formatting ────────────────────────────────────────────────────────── + + +def _fmt_count(n: int) -> str: + """Human-friendly number: 7999856 -> '8.0M', 33599 -> '33.6K', 799 -> '799'.""" + if n >= 1_000_000: + return f"{n / 1_000_000:.1f}M" + if n >= 10_000: + return f"{n / 1_000:.1f}K" + if n >= 1_000: + return f"{n / 1_000:.1f}K" + return str(n) + + +def _fmt_seconds(seconds: float) -> str: + """Seconds -> human-friendly duration: '58s', '2m 14s', '58m 57s', '1h 2m'.""" + s = max(0, int(seconds)) + if s < 60: + return f"{s}s" + if s < 3600: + m, sec = divmod(s, 60) + return f"{m}m {sec}s" if sec else f"{m}m" + h, remainder = divmod(s, 3600) + m = remainder // 60 + return f"{h}h {m}m" if m else f"{h}h" + + +def _bar(pct: float, width: int = 20) -> str: + """ASCII progress bar: [████████░░░░░░░░░░░░] 40%.""" + filled = int(pct / 100.0 * width) + filled = max(0, min(width, filled)) + empty = width - filled + return f"[{'█' * filled}{'░' * empty}]" + + +def _bucket_line(label: str, bucket: RateLimitBucket, label_width: int = 14) -> str: + """Format one bucket as a single line.""" + if bucket.limit <= 0: + return f" {label:<{label_width}} (no data)" + + pct = bucket.usage_pct + used = _fmt_count(bucket.used) + limit = _fmt_count(bucket.limit) + remaining = _fmt_count(bucket.remaining) + reset = _fmt_seconds(bucket.remaining_seconds_now) + + bar = _bar(pct) + return f" {label:<{label_width}} {bar} {pct:5.1f}% {used}/{limit} used ({remaining} left, resets in {reset})" + + +def format_rate_limit_display(state: RateLimitState) -> str: + """Format rate limit state for terminal/chat display.""" + if not state.has_data: + return "No rate limit data yet — make an API request first." + + age = state.age_seconds + if age < 5: + freshness = "just now" + elif age < 60: + freshness = f"{int(age)}s ago" + else: + freshness = f"{_fmt_seconds(age)} ago" + + provider_label = state.provider.title() if state.provider else "Provider" + + lines = [ + f"{provider_label} Rate Limits (captured {freshness}):", + "", + _bucket_line("Requests/min", state.requests_min), + _bucket_line("Requests/hr", state.requests_hour), + "", + _bucket_line("Tokens/min", state.tokens_min), + _bucket_line("Tokens/hr", state.tokens_hour), + ] + + # Add warnings if any bucket is getting hot + warnings = [] + for label, bucket in [ + ("requests/min", state.requests_min), + ("requests/hr", state.requests_hour), + ("tokens/min", state.tokens_min), + ("tokens/hr", state.tokens_hour), + ]: + if bucket.limit > 0 and bucket.usage_pct >= 80: + reset = _fmt_seconds(bucket.remaining_seconds_now) + warnings.append(f" ⚠ {label} at {bucket.usage_pct:.0f}% — resets in {reset}") + + if warnings: + lines.append("") + lines.extend(warnings) + + return "\n".join(lines) + + +def format_rate_limit_compact(state: RateLimitState) -> str: + """One-line compact summary for status bars / gateway messages.""" + if not state.has_data: + return "No rate limit data." + + rm = state.requests_min + tm = state.tokens_min + rh = state.requests_hour + th = state.tokens_hour + + parts = [] + if rm.limit > 0: + parts.append(f"RPM: {rm.remaining}/{rm.limit}") + if rh.limit > 0: + parts.append(f"RPH: {_fmt_count(rh.remaining)}/{_fmt_count(rh.limit)} (resets {_fmt_seconds(rh.remaining_seconds_now)})") + if tm.limit > 0: + parts.append(f"TPM: {_fmt_count(tm.remaining)}/{_fmt_count(tm.limit)}") + if th.limit > 0: + parts.append(f"TPH: {_fmt_count(th.remaining)}/{_fmt_count(th.limit)} (resets {_fmt_seconds(th.remaining_seconds_now)})") + + return " | ".join(parts) diff --git a/cli.py b/cli.py index 324bb05690..fa32ae9119 100644 --- a/cli.py +++ b/cli.py @@ -5409,12 +5409,27 @@ class HermesCLI: print(f" ❌ Compression failed: {e}") def _show_usage(self): - """Show cumulative token usage for the current session.""" + """Show rate limits (if available) and session token usage.""" if not self.agent: print("(._.) No active agent -- send a message first.") return agent = self.agent + calls = agent.session_api_calls + + if calls == 0: + print("(._.) No API calls made yet in this session.") + return + + # ── Rate limits (shown first when available) ──────────────── + rl_state = agent.get_rate_limit_state() + if rl_state and rl_state.has_data: + from agent.rate_limit_tracker import format_rate_limit_display + print() + print(format_rate_limit_display(rl_state)) + print() + + # ── Session token usage ───────────────────────────────────── input_tokens = getattr(agent, "session_input_tokens", 0) or 0 output_tokens = getattr(agent, "session_output_tokens", 0) or 0 cache_read_tokens = getattr(agent, "session_cache_read_tokens", 0) or 0 @@ -5422,13 +5437,7 @@ class HermesCLI: prompt = agent.session_prompt_tokens completion = agent.session_completion_tokens total = agent.session_total_tokens - calls = agent.session_api_calls - if calls == 0: - print("(._.) No API calls made yet in this session.") - return - - # Current context window state compressor = agent.context_compressor last_prompt = compressor.last_prompt_tokens ctx_len = compressor.context_length diff --git a/gateway/run.py b/gateway/run.py index 27703a1024..339954f5be 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -5280,19 +5280,28 @@ class GatewayRunner: agent = self._running_agents.get(session_key) if agent and hasattr(agent, "session_total_tokens") and agent.session_api_calls > 0: - lines = [ - "📊 **Session Token Usage**", - f"Prompt (input): {agent.session_prompt_tokens:,}", - f"Completion (output): {agent.session_completion_tokens:,}", - f"Total: {agent.session_total_tokens:,}", - f"API calls: {agent.session_api_calls}", - ] + lines = [] + + # Rate limits first (when available from provider headers) + rl_state = agent.get_rate_limit_state() + if rl_state and rl_state.has_data: + from agent.rate_limit_tracker import format_rate_limit_compact + lines.append(f"⏱️ **Rate Limits:** {format_rate_limit_compact(rl_state)}") + lines.append("") + + # Session token usage + lines.append("📊 **Session Token Usage**") + lines.append(f"Prompt (input): {agent.session_prompt_tokens:,}") + lines.append(f"Completion (output): {agent.session_completion_tokens:,}") + lines.append(f"Total: {agent.session_total_tokens:,}") + lines.append(f"API calls: {agent.session_api_calls}") ctx = agent.context_compressor if ctx.last_prompt_tokens: pct = min(100, ctx.last_prompt_tokens / ctx.context_length * 100) if ctx.context_length else 0 lines.append(f"Context: {ctx.last_prompt_tokens:,} / {ctx.context_length:,} ({pct:.0f}%)") if ctx.compression_count: lines.append(f"Compressions: {ctx.compression_count}") + return "\n".join(lines) # No running agent -- check session history for a rough count diff --git a/hermes_cli/commands.py b/hermes_cli/commands.py index 39dc4569cd..70d9cb8aa3 100644 --- a/hermes_cli/commands.py +++ b/hermes_cli/commands.py @@ -129,7 +129,7 @@ COMMAND_REGISTRY: list[CommandDef] = [ CommandDef("commands", "Browse all commands and skills (paginated)", "Info", gateway_only=True, args_hint="[page]"), CommandDef("help", "Show available commands", "Info"), - CommandDef("usage", "Show token usage for the current session", "Info"), + CommandDef("usage", "Show token usage and rate limits for the current session", "Info"), CommandDef("insights", "Show usage insights and analytics", "Info", args_hint="[days]"), CommandDef("platforms", "Show gateway/messaging platform status", "Info", diff --git a/run_agent.py b/run_agent.py index 3c5661a112..ecd0be656f 100644 --- a/run_agent.py +++ b/run_agent.py @@ -692,6 +692,10 @@ class AIAgent: self._current_tool: str | None = None self._api_call_count: int = 0 + # Rate limit tracking — updated from x-ratelimit-* response headers + # after each API call. Accessed by /usage slash command. + self._rate_limit_state: Optional["RateLimitState"] = None + # Centralized logging — agent.log (INFO+) and errors.log (WARNING+) # both live under ~/.hermes/logs/. Idempotent, so gateway mode # (which creates a new AIAgent per message) won't duplicate handlers. @@ -2545,6 +2549,29 @@ class AIAgent: self._last_activity_ts = time.time() self._last_activity_desc = desc + def _capture_rate_limits(self, http_response: Any) -> None: + """Parse x-ratelimit-* headers from an HTTP response and cache the state. + + Called after each streaming API call. The httpx Response object is + available on the OpenAI SDK Stream via ``stream.response``. + """ + if http_response is None: + return + headers = getattr(http_response, "headers", None) + if not headers: + return + try: + from agent.rate_limit_tracker import parse_rate_limit_headers + state = parse_rate_limit_headers(headers, provider=self.provider) + if state is not None: + self._rate_limit_state = state + except Exception: + pass # Never let header parsing break the agent loop + + def get_rate_limit_state(self): + """Return the last captured RateLimitState, or None.""" + return self._rate_limit_state + def get_activity_summary(self) -> dict: """Return a snapshot of the agent's current activity for diagnostics. @@ -4399,6 +4426,11 @@ class AIAgent: self._touch_activity("waiting for provider response (streaming)") stream = request_client_holder["client"].chat.completions.create(**stream_kwargs) + # Capture rate limit headers from the initial HTTP response. + # The OpenAI SDK Stream object exposes the underlying httpx + # response via .response before any chunks are consumed. + self._capture_rate_limits(getattr(stream, "response", None)) + content_parts: list = [] tool_calls_acc: dict = {} tool_gen_notified: set = set() diff --git a/tests/agent/test_rate_limit_tracker.py b/tests/agent/test_rate_limit_tracker.py new file mode 100644 index 0000000000..caef785678 --- /dev/null +++ b/tests/agent/test_rate_limit_tracker.py @@ -0,0 +1,212 @@ +"""Tests for agent.rate_limit_tracker — header parsing and formatting.""" + +import time +import pytest +from agent.rate_limit_tracker import ( + RateLimitBucket, + RateLimitState, + parse_rate_limit_headers, + format_rate_limit_display, + format_rate_limit_compact, + _fmt_count, + _fmt_seconds, + _bar, +) + + +# ── Sample headers from Nous inference API ────────────────────────────── + +NOUS_HEADERS = { + "x-ratelimit-limit-requests": "800", + "x-ratelimit-limit-requests-1h": "33600", + "x-ratelimit-limit-tokens": "8000000", + "x-ratelimit-limit-tokens-1h": "336000000", + "x-ratelimit-remaining-requests": "795", + "x-ratelimit-remaining-requests-1h": "33590", + "x-ratelimit-remaining-tokens": "7999500", + "x-ratelimit-remaining-tokens-1h": "335999000", + "x-ratelimit-reset-requests": "45.5", + "x-ratelimit-reset-requests-1h": "3500.0", + "x-ratelimit-reset-tokens": "42.3", + "x-ratelimit-reset-tokens-1h": "3490.0", +} + + +class TestParseHeaders: + def test_basic_parsing(self): + state = parse_rate_limit_headers(NOUS_HEADERS, provider="nous") + assert state is not None + assert state.provider == "nous" + assert state.has_data + + assert state.requests_min.limit == 800 + assert state.requests_min.remaining == 795 + assert state.requests_min.reset_seconds == 45.5 + + assert state.requests_hour.limit == 33600 + assert state.requests_hour.remaining == 33590 + + assert state.tokens_min.limit == 8000000 + assert state.tokens_min.remaining == 7999500 + + assert state.tokens_hour.limit == 336000000 + assert state.tokens_hour.remaining == 335999000 + assert state.tokens_hour.reset_seconds == 3490.0 + + def test_no_headers(self): + state = parse_rate_limit_headers({}) + assert state is None + + def test_partial_headers(self): + headers = { + "x-ratelimit-limit-requests": "100", + "x-ratelimit-remaining-requests": "50", + } + state = parse_rate_limit_headers(headers) + assert state is not None + assert state.requests_min.limit == 100 + assert state.requests_min.remaining == 50 + # Missing fields default to 0 + assert state.tokens_min.limit == 0 + + def test_non_rate_limit_headers_ignored(self): + headers = { + "content-type": "application/json", + "server": "nginx", + } + state = parse_rate_limit_headers(headers) + assert state is None + + def test_malformed_values(self): + headers = { + "x-ratelimit-limit-requests": "not-a-number", + "x-ratelimit-remaining-requests": "", + "x-ratelimit-reset-requests": "abc", + } + state = parse_rate_limit_headers(headers) + assert state is not None + assert state.requests_min.limit == 0 + assert state.requests_min.remaining == 0 + assert state.requests_min.reset_seconds == 0.0 + + +class TestBucket: + def test_used(self): + b = RateLimitBucket(limit=800, remaining=795, reset_seconds=45.0, captured_at=time.time()) + assert b.used == 5 + + def test_usage_pct(self): + b = RateLimitBucket(limit=100, remaining=20, reset_seconds=30.0, captured_at=time.time()) + assert b.usage_pct == pytest.approx(80.0) + + def test_usage_pct_zero_limit(self): + b = RateLimitBucket(limit=0, remaining=0) + assert b.usage_pct == 0.0 + + def test_remaining_seconds_now(self): + now = time.time() + b = RateLimitBucket(limit=800, remaining=795, reset_seconds=60.0, captured_at=now - 10) + # ~50 seconds should remain + assert 49 <= b.remaining_seconds_now <= 51 + + def test_remaining_seconds_expired(self): + b = RateLimitBucket(limit=800, remaining=795, reset_seconds=30.0, captured_at=time.time() - 60) + assert b.remaining_seconds_now == 0.0 + + +class TestFormatting: + def test_fmt_count_millions(self): + assert _fmt_count(8000000) == "8.0M" + assert _fmt_count(336000000) == "336.0M" + + def test_fmt_count_thousands(self): + assert _fmt_count(33600) == "33.6K" + assert _fmt_count(1500) == "1.5K" + + def test_fmt_count_small(self): + assert _fmt_count(800) == "800" + assert _fmt_count(0) == "0" + + def test_fmt_seconds_short(self): + assert _fmt_seconds(45) == "45s" + assert _fmt_seconds(0) == "0s" + + def test_fmt_seconds_minutes(self): + assert _fmt_seconds(125) == "2m 5s" + assert _fmt_seconds(120) == "2m" + + def test_fmt_seconds_hours(self): + assert _fmt_seconds(3660) == "1h 1m" + assert _fmt_seconds(3600) == "1h" + + def test_bar(self): + bar = _bar(50.0, width=10) + assert bar == "[█████░░░░░]" + assert _bar(0.0, width=10) == "[░░░░░░░░░░]" + assert _bar(100.0, width=10) == "[██████████]" + + def test_format_display_no_data(self): + state = RateLimitState() + result = format_rate_limit_display(state) + assert "No rate limit data" in result + + def test_format_display_with_data(self): + state = parse_rate_limit_headers(NOUS_HEADERS, provider="nous") + result = format_rate_limit_display(state) + assert "Nous" in result + assert "Requests/min" in result + assert "Requests/hr" in result + assert "Tokens/min" in result + assert "Tokens/hr" in result + assert "resets in" in result + + def test_format_display_warning_on_high_usage(self): + headers = { + **NOUS_HEADERS, + "x-ratelimit-remaining-requests": "50", # 750/800 used = 93.75% + } + state = parse_rate_limit_headers(headers) + result = format_rate_limit_display(state) + assert "⚠" in result + + def test_format_compact(self): + state = parse_rate_limit_headers(NOUS_HEADERS, provider="nous") + result = format_rate_limit_compact(state) + assert "RPM:" in result + assert "RPH:" in result + assert "TPM:" in result + assert "TPH:" in result + assert "resets" in result + + def test_format_compact_no_data(self): + state = RateLimitState() + result = format_rate_limit_compact(state) + assert "No rate limit data" in result + + +class TestAgentIntegration: + """Test that AIAgent captures rate limit state correctly.""" + + def test_capture_rate_limits_from_headers(self): + """Simulate the header capture path without a real API call.""" + import sys + import os + # Use a mock httpx-like response + class MockResponse: + headers = NOUS_HEADERS + + # Import AIAgent minimally + from unittest.mock import MagicMock, patch + + # Test the parsing directly + state = parse_rate_limit_headers(MockResponse.headers, provider="nous") + assert state is not None + assert state.requests_min.limit == 800 + assert state.tokens_hour.limit == 336000000 + + def test_capture_rate_limits_none_response(self): + """_capture_rate_limits should handle None gracefully.""" + from agent.rate_limit_tracker import parse_rate_limit_headers + # None should not crash + result = parse_rate_limit_headers({}) + assert result is None