Compare commits

...

1 Commits

Author SHA1 Message Date
Teknium
1ddb03b76f feat: per-model rate limit handler with stepped cooldown (inspired by OpenClaw)
Add agent/rate_limiter.py — a thread-safe per-model rate limit handler
with a stepped cooldown ladder:
  1st hit: 30s cooldown
  2nd hit: 60s cooldown
  3rd+ hits: 5min cooldown
  Resets after 10min of no hits

Wired into run_agent.py's API error handling. When a 429 is caught and
no fallback provider is available, the rate limiter kicks in with
escalating backoff instead of immediately failing.

23 new tests.
2026-03-30 01:15:26 -07:00
3 changed files with 547 additions and 2 deletions

173
agent/rate_limiter.py Normal file
View File

@@ -0,0 +1,173 @@
"""Per-model rate limit handler with stepped cooldown.
Tracks 429 / rate-limit errors per model and applies a stepped cooldown
ladder:
1st hit → 30 s
2nd hit → 60 s
3rd+ hit → 300 s (5 min)
The step counter resets automatically after 10 minutes of *no* rate-limit
hits for a given model.
Thread-safe: all mutable state is guarded by a single ``threading.Lock``.
Usage example (inside an API retry loop)::
from agent.rate_limiter import rate_limiter
# Before calling the API honour any active cooldown
remaining = rate_limiter.check_rate_limit(model)
if remaining > 0:
time.sleep(remaining)
try:
response = client.chat.completions.create(...)
except RateLimitError:
cooldown = rate_limiter.record_rate_limit(model)
print(f"Rate limited on {model}, cooling down for {cooldown}s")
time.sleep(cooldown)
# … retry …
"""
from __future__ import annotations
import threading
import time
from dataclasses import dataclass, field
from typing import Dict
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
# Stepped cooldown ladder (seconds)
_COOLDOWN_STEPS: tuple[int, ...] = (30, 60, 300)
# After this many seconds with no new rate-limit hits the step counter resets.
_RESET_WINDOW: float = 600.0 # 10 minutes
# ---------------------------------------------------------------------------
# Internal per-model state
# ---------------------------------------------------------------------------
@dataclass
class _ModelCooldownState:
"""Mutable cooldown state for a single model."""
# How many consecutive rate-limit hits (1-indexed).
step: int = 0
# ``time.monotonic()`` timestamp when the current cooldown ends.
cooldown_until: float = 0.0
# ``time.monotonic()`` of the last hit used for the reset window.
last_hit: float = 0.0
# ---------------------------------------------------------------------------
# Public API singleton ``RateLimiter``
# ---------------------------------------------------------------------------
class RateLimiter:
"""Thread-safe, per-model rate-limit handler with stepped cooldown."""
def __init__(
self,
cooldown_steps: tuple[int, ...] = _COOLDOWN_STEPS,
reset_window: float = _RESET_WINDOW,
) -> None:
self._cooldown_steps = cooldown_steps
self._reset_window = reset_window
self._lock = threading.Lock()
self._models: Dict[str, _ModelCooldownState] = {}
# -- helpers ----------------------------------------------------------
def _get_state(self, model: str) -> _ModelCooldownState:
"""Return (or create) the state object for *model*. Caller must hold ``_lock``."""
if model not in self._models:
self._models[model] = _ModelCooldownState()
return self._models[model]
def _maybe_reset(self, state: _ModelCooldownState, now: float) -> None:
"""Reset the step counter if the reset window has elapsed since the last hit.
Caller must hold ``_lock``.
"""
if state.last_hit and (now - state.last_hit) >= self._reset_window:
state.step = 0
# -- public interface -------------------------------------------------
def check_rate_limit(self, model: str) -> float:
"""Return remaining cooldown seconds for *model*, or ``0`` if none."""
now = time.monotonic()
with self._lock:
state = self._get_state(model)
remaining = max(0.0, state.cooldown_until - now)
return remaining
def record_rate_limit(self, model: str) -> float:
"""Record a rate-limit hit for *model* and return the cooldown duration (seconds).
The returned value is the number of seconds to wait before the next
attempt.
"""
now = time.monotonic()
with self._lock:
state = self._get_state(model)
# Reset step counter if the reset window elapsed.
self._maybe_reset(state, now)
# Advance the step (clamped to the ladder length).
state.step = min(state.step + 1, len(self._cooldown_steps))
# Look up the cooldown for this step (1-indexed → 0-indexed).
cooldown = self._cooldown_steps[state.step - 1]
state.cooldown_until = now + cooldown
state.last_hit = now
return float(cooldown)
def get_step(self, model: str) -> int:
"""Return the current step number for *model* (0 means no active cooldown)."""
now = time.monotonic()
with self._lock:
state = self._get_state(model)
self._maybe_reset(state, now)
return state.step
def get_cooldown_status(self) -> Dict[str, Dict[str, float]]:
"""Return a snapshot of all models with an active cooldown.
Returns a dict mapping model name → ``{"remaining": <secs>, "step": <int>}``.
Models whose cooldown has already expired are omitted.
"""
now = time.monotonic()
result: Dict[str, Dict[str, float]] = {}
with self._lock:
for model, state in self._models.items():
remaining = max(0.0, state.cooldown_until - now)
if remaining > 0:
result[model] = {
"remaining": round(remaining, 2),
"step": state.step,
}
return result
def reset(self, model: str | None = None) -> None:
"""Reset cooldown state. If *model* is ``None``, reset everything."""
with self._lock:
if model is None:
self._models.clear()
elif model in self._models:
del self._models[model]
# Module-level singleton for convenient import.
rate_limiter = RateLimiter()

View File

@@ -100,6 +100,7 @@ from agent.trajectory import (
convert_scratchpad_to_think, has_incomplete_scratchpad,
save_trajectory as _save_trajectory_to_file,
)
from agent.rate_limiter import rate_limiter as _rate_limiter
from utils import atomic_json_write
HONCHO_TOOL_NAMES = {
@@ -6515,7 +6516,7 @@ class AIAgent:
elif not isinstance(content_blocks, list):
response_invalid = True
error_details.append("response.content is not a list")
elif len(content_blocks) == 0:
elif len(content_blocks) == 0 and getattr(response, "stop_reason", None) != "sensitive":
response_invalid = True
error_details.append("response.content is empty")
else:
@@ -6631,11 +6632,14 @@ class AIAgent:
else:
finish_reason = "stop"
elif self.api_mode == "anthropic_messages":
stop_reason_map = {"end_turn": "stop", "tool_use": "tool_calls", "max_tokens": "length", "stop_sequence": "stop"}
stop_reason_map = {"end_turn": "stop", "tool_use": "tool_calls", "max_tokens": "length", "stop_sequence": "stop", "sensitive": "content_filter"}
finish_reason = stop_reason_map.get(response.stop_reason, "stop")
else:
finish_reason = response.choices[0].finish_reason
if finish_reason == "content_filter":
self._vprint(f"{self.log_prefix}⚠️ Response filtered by content policy (stop_reason='sensitive')", force=True)
if finish_reason == "length":
self._vprint(f"{self.log_prefix}⚠️ Response truncated (finish_reason='length') - model hit max output tokens", force=True)
@@ -7015,6 +7019,54 @@ class AIAgent:
retry_count = 0
continue
# --- Stepped rate-limit cooldown --------------------------
# If we're rate-limited (and fallback either isn't available
# or is already active), use the per-model stepped cooldown
# instead of the generic exponential backoff.
if is_rate_limited:
_rl_model = getattr(self, "model", "unknown") or "unknown"
_rl_cooldown = _rate_limiter.record_rate_limit(_rl_model)
_rl_step = _rate_limiter.get_step(_rl_model)
_rl_max_step = len(_rate_limiter._cooldown_steps)
self._vprint(
f"{self.log_prefix}🚦 Rate limited on {_rl_model}, "
f"cooling down for {_rl_cooldown:.0f}s "
f"(step {_rl_step}/{_rl_max_step})",
force=True,
)
self._emit_status(
f"🚦 Rate limited — cooling down {_rl_cooldown:.0f}s "
f"(step {_rl_step}/{_rl_max_step})..."
)
logging.warning(
"%sRate limited on %s — stepped cooldown %ss (step %s/%s)",
self.log_prefix, _rl_model, _rl_cooldown,
_rl_step, _rl_max_step,
)
# Sleep in small increments for interrupt responsiveness
_rl_end = time.time() + _rl_cooldown
while time.time() < _rl_end:
if self._interrupt_requested:
self._vprint(
f"{self.log_prefix}⚡ Interrupt during rate-limit cooldown.",
force=True,
)
self._persist_session(messages, conversation_history)
self.clear_interrupt()
return {
"final_response": (
f"Operation interrupted: rate-limit cooldown "
f"on {_rl_model} (step {_rl_step}/{_rl_max_step})."
),
"messages": messages,
"api_calls": api_call_count,
"completed": False,
"interrupted": True,
}
time.sleep(0.2)
continue # retry the API call after cooldown
# ----------------------------------------------------------
is_payload_too_large = (
status_code == 413
or 'request entity too large' in error_msg

View File

@@ -0,0 +1,320 @@
"""Tests for agent.rate_limiter per-model stepped cooldown."""
from __future__ import annotations
import threading
import time
from unittest import mock
import pytest
from agent.rate_limiter import RateLimiter, _COOLDOWN_STEPS, _RESET_WINDOW
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_limiter(steps: tuple[int, ...] = _COOLDOWN_STEPS, reset_window: float = _RESET_WINDOW) -> RateLimiter:
"""Create a fresh RateLimiter (not the module-level singleton)."""
return RateLimiter(cooldown_steps=steps, reset_window=reset_window)
# ---------------------------------------------------------------------------
# Test stepped cooldown escalation
# ---------------------------------------------------------------------------
class TestSteppedCooldown:
"""The cooldown should escalate through the ladder: 30s → 60s → 300s."""
def test_first_hit_returns_30s(self) -> None:
rl = _make_limiter()
assert rl.record_rate_limit("gpt-4") == 30
def test_second_hit_returns_60s(self) -> None:
rl = _make_limiter()
rl.record_rate_limit("gpt-4")
assert rl.record_rate_limit("gpt-4") == 60
def test_third_hit_returns_300s(self) -> None:
rl = _make_limiter()
rl.record_rate_limit("gpt-4")
rl.record_rate_limit("gpt-4")
assert rl.record_rate_limit("gpt-4") == 300
def test_fourth_hit_stays_at_max(self) -> None:
rl = _make_limiter()
for _ in range(3):
rl.record_rate_limit("gpt-4")
# 4th hit should stay clamped at step 3 (300s)
assert rl.record_rate_limit("gpt-4") == 300
def test_step_number_increments(self) -> None:
rl = _make_limiter()
assert rl.get_step("gpt-4") == 0
rl.record_rate_limit("gpt-4")
assert rl.get_step("gpt-4") == 1
rl.record_rate_limit("gpt-4")
assert rl.get_step("gpt-4") == 2
rl.record_rate_limit("gpt-4")
assert rl.get_step("gpt-4") == 3
# Stays clamped
rl.record_rate_limit("gpt-4")
assert rl.get_step("gpt-4") == 3
def test_custom_steps(self) -> None:
rl = _make_limiter(steps=(5, 10))
assert rl.record_rate_limit("m") == 5
assert rl.record_rate_limit("m") == 10
assert rl.record_rate_limit("m") == 10 # clamped
# ---------------------------------------------------------------------------
# Test cooldown reset after no hits
# ---------------------------------------------------------------------------
class TestCooldownReset:
"""Step counter should reset after reset_window seconds of no hits."""
def test_reset_after_window(self) -> None:
rl = _make_limiter(reset_window=10.0)
# Bump to step 2
rl.record_rate_limit("gpt-4")
rl.record_rate_limit("gpt-4")
assert rl.get_step("gpt-4") == 2
# Simulate 10+ seconds passing by manipulating last_hit
with rl._lock:
state = rl._models["gpt-4"]
state.last_hit = time.monotonic() - 11.0
state.cooldown_until = 0 # clear active cooldown too
# Next recording should start from step 1 again (reset happened)
assert rl.record_rate_limit("gpt-4") == 30
assert rl.get_step("gpt-4") == 1
def test_no_reset_within_window(self) -> None:
rl = _make_limiter(reset_window=600.0)
rl.record_rate_limit("gpt-4")
rl.record_rate_limit("gpt-4")
assert rl.get_step("gpt-4") == 2
# No time manipulation → still within window
assert rl.record_rate_limit("gpt-4") == 300
assert rl.get_step("gpt-4") == 3
def test_get_step_resets_when_window_elapsed(self) -> None:
rl = _make_limiter(reset_window=5.0)
rl.record_rate_limit("x")
assert rl.get_step("x") == 1
with rl._lock:
rl._models["x"].last_hit = time.monotonic() - 6.0
assert rl.get_step("x") == 0
# ---------------------------------------------------------------------------
# Test per-model isolation
# ---------------------------------------------------------------------------
class TestPerModelIsolation:
"""Each model should have its own independent cooldown state."""
def test_different_models_are_independent(self) -> None:
rl = _make_limiter()
rl.record_rate_limit("gpt-4")
rl.record_rate_limit("gpt-4")
# Claude has not been hit yet → should start at step 1
assert rl.record_rate_limit("claude-3") == 30
assert rl.get_step("claude-3") == 1
# GPT-4 should still be at step 2 (plus the third hit now)
assert rl.record_rate_limit("gpt-4") == 300
assert rl.get_step("gpt-4") == 3
def test_reset_single_model(self) -> None:
rl = _make_limiter()
rl.record_rate_limit("a")
rl.record_rate_limit("b")
rl.reset("a")
assert rl.get_step("a") == 0
assert rl.get_step("b") == 1
def test_reset_all(self) -> None:
rl = _make_limiter()
rl.record_rate_limit("a")
rl.record_rate_limit("b")
rl.reset()
assert rl.get_step("a") == 0
assert rl.get_step("b") == 0
# ---------------------------------------------------------------------------
# Test check_rate_limit returns correct remaining time
# ---------------------------------------------------------------------------
class TestCheckRateLimit:
"""check_rate_limit should return remaining cooldown or 0."""
def test_no_cooldown_initially(self) -> None:
rl = _make_limiter()
assert rl.check_rate_limit("gpt-4") == 0.0
def test_remaining_time_after_hit(self) -> None:
rl = _make_limiter()
rl.record_rate_limit("gpt-4") # 30s cooldown
remaining = rl.check_rate_limit("gpt-4")
# Should be very close to 30 (within a small tolerance)
assert 28.0 < remaining <= 30.0
def test_remaining_decreases_over_time(self) -> None:
rl = _make_limiter()
rl.record_rate_limit("gpt-4")
# Simulate 10 seconds passing by adjusting cooldown_until
with rl._lock:
rl._models["gpt-4"].cooldown_until = time.monotonic() + 20.0
remaining = rl.check_rate_limit("gpt-4")
assert 18.0 < remaining <= 20.0
def test_returns_zero_after_cooldown_expires(self) -> None:
rl = _make_limiter()
rl.record_rate_limit("gpt-4")
# Expire the cooldown
with rl._lock:
rl._models["gpt-4"].cooldown_until = time.monotonic() - 1.0
assert rl.check_rate_limit("gpt-4") == 0.0
# ---------------------------------------------------------------------------
# Test get_cooldown_status
# ---------------------------------------------------------------------------
class TestGetCooldownStatus:
"""get_cooldown_status should report all models with active cooldowns."""
def test_empty_when_no_hits(self) -> None:
rl = _make_limiter()
assert rl.get_cooldown_status() == {}
def test_shows_active_cooldowns(self) -> None:
rl = _make_limiter()
rl.record_rate_limit("gpt-4")
rl.record_rate_limit("claude-3")
status = rl.get_cooldown_status()
assert "gpt-4" in status
assert "claude-3" in status
assert status["gpt-4"]["step"] == 1
assert status["gpt-4"]["remaining"] > 0
def test_omits_expired_cooldowns(self) -> None:
rl = _make_limiter()
rl.record_rate_limit("old")
rl.record_rate_limit("new")
# Expire "old"
with rl._lock:
rl._models["old"].cooldown_until = time.monotonic() - 1.0
status = rl.get_cooldown_status()
assert "old" not in status
assert "new" in status
# ---------------------------------------------------------------------------
# Test thread safety
# ---------------------------------------------------------------------------
class TestThreadSafety:
"""Concurrent access should not corrupt state."""
def test_concurrent_record(self) -> None:
rl = _make_limiter()
errors: list[Exception] = []
def _hit(model: str, n: int) -> None:
try:
for _ in range(n):
rl.record_rate_limit(model)
except Exception as exc:
errors.append(exc)
threads = []
for i in range(10):
t = threading.Thread(target=_hit, args=(f"model-{i % 3}", 50))
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join(timeout=5)
assert not errors, f"Thread errors: {errors}"
# Each of the 3 models should have a valid step (clamped to max)
for i in range(3):
step = rl.get_step(f"model-{i}")
assert 1 <= step <= len(rl._cooldown_steps)
def test_concurrent_check_and_record(self) -> None:
rl = _make_limiter()
errors: list[Exception] = []
def _checker(model: str) -> None:
try:
for _ in range(100):
remaining = rl.check_rate_limit(model)
assert remaining >= 0
except Exception as exc:
errors.append(exc)
def _recorder(model: str) -> None:
try:
for _ in range(50):
cd = rl.record_rate_limit(model)
assert cd > 0
except Exception as exc:
errors.append(exc)
threads = [
threading.Thread(target=_checker, args=("m",)),
threading.Thread(target=_checker, args=("m",)),
threading.Thread(target=_recorder, args=("m",)),
threading.Thread(target=_recorder, args=("m",)),
]
for t in threads:
t.start()
for t in threads:
t.join(timeout=5)
assert not errors, f"Thread errors: {errors}"
# ---------------------------------------------------------------------------
# Test module-level singleton
# ---------------------------------------------------------------------------
class TestSingleton:
"""The module-level ``rate_limiter`` should be usable directly."""
def test_singleton_import(self) -> None:
from agent.rate_limiter import rate_limiter
assert isinstance(rate_limiter, RateLimiter)
def test_singleton_records(self) -> None:
from agent.rate_limiter import rate_limiter
# Reset to avoid pollution from other tests
rate_limiter.reset()
cd = rate_limiter.record_rate_limit("test-singleton-model")
assert cd == 30
rate_limiter.reset("test-singleton-model")