mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-28 23:11:37 +08:00
Compare commits
1 Commits
skill/gith
...
feat/rate-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1ddb03b76f |
173
agent/rate_limiter.py
Normal file
173
agent/rate_limiter.py
Normal file
@@ -0,0 +1,173 @@
|
||||
"""Per-model rate limit handler with stepped cooldown.
|
||||
|
||||
Tracks 429 / rate-limit errors per model and applies a stepped cooldown
|
||||
ladder:
|
||||
|
||||
1st hit → 30 s
|
||||
2nd hit → 60 s
|
||||
3rd+ hit → 300 s (5 min)
|
||||
|
||||
The step counter resets automatically after 10 minutes of *no* rate-limit
|
||||
hits for a given model.
|
||||
|
||||
Thread-safe: all mutable state is guarded by a single ``threading.Lock``.
|
||||
|
||||
Usage example (inside an API retry loop)::
|
||||
|
||||
from agent.rate_limiter import rate_limiter
|
||||
|
||||
# Before calling the API – honour any active cooldown
|
||||
remaining = rate_limiter.check_rate_limit(model)
|
||||
if remaining > 0:
|
||||
time.sleep(remaining)
|
||||
|
||||
try:
|
||||
response = client.chat.completions.create(...)
|
||||
except RateLimitError:
|
||||
cooldown = rate_limiter.record_rate_limit(model)
|
||||
print(f"Rate limited on {model}, cooling down for {cooldown}s")
|
||||
time.sleep(cooldown)
|
||||
# … retry …
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Dict
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Constants
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Stepped cooldown ladder (seconds)
|
||||
_COOLDOWN_STEPS: tuple[int, ...] = (30, 60, 300)
|
||||
|
||||
# After this many seconds with no new rate-limit hits the step counter resets.
|
||||
_RESET_WINDOW: float = 600.0 # 10 minutes
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Internal per-model state
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class _ModelCooldownState:
|
||||
"""Mutable cooldown state for a single model."""
|
||||
|
||||
# How many consecutive rate-limit hits (1-indexed).
|
||||
step: int = 0
|
||||
|
||||
# ``time.monotonic()`` timestamp when the current cooldown ends.
|
||||
cooldown_until: float = 0.0
|
||||
|
||||
# ``time.monotonic()`` of the last hit – used for the reset window.
|
||||
last_hit: float = 0.0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API – singleton ``RateLimiter``
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class RateLimiter:
|
||||
"""Thread-safe, per-model rate-limit handler with stepped cooldown."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
cooldown_steps: tuple[int, ...] = _COOLDOWN_STEPS,
|
||||
reset_window: float = _RESET_WINDOW,
|
||||
) -> None:
|
||||
self._cooldown_steps = cooldown_steps
|
||||
self._reset_window = reset_window
|
||||
self._lock = threading.Lock()
|
||||
self._models: Dict[str, _ModelCooldownState] = {}
|
||||
|
||||
# -- helpers ----------------------------------------------------------
|
||||
|
||||
def _get_state(self, model: str) -> _ModelCooldownState:
|
||||
"""Return (or create) the state object for *model*. Caller must hold ``_lock``."""
|
||||
if model not in self._models:
|
||||
self._models[model] = _ModelCooldownState()
|
||||
return self._models[model]
|
||||
|
||||
def _maybe_reset(self, state: _ModelCooldownState, now: float) -> None:
|
||||
"""Reset the step counter if the reset window has elapsed since the last hit.
|
||||
|
||||
Caller must hold ``_lock``.
|
||||
"""
|
||||
if state.last_hit and (now - state.last_hit) >= self._reset_window:
|
||||
state.step = 0
|
||||
|
||||
# -- public interface -------------------------------------------------
|
||||
|
||||
def check_rate_limit(self, model: str) -> float:
|
||||
"""Return remaining cooldown seconds for *model*, or ``0`` if none."""
|
||||
now = time.monotonic()
|
||||
with self._lock:
|
||||
state = self._get_state(model)
|
||||
remaining = max(0.0, state.cooldown_until - now)
|
||||
return remaining
|
||||
|
||||
def record_rate_limit(self, model: str) -> float:
|
||||
"""Record a rate-limit hit for *model* and return the cooldown duration (seconds).
|
||||
|
||||
The returned value is the number of seconds to wait before the next
|
||||
attempt.
|
||||
"""
|
||||
now = time.monotonic()
|
||||
with self._lock:
|
||||
state = self._get_state(model)
|
||||
|
||||
# Reset step counter if the reset window elapsed.
|
||||
self._maybe_reset(state, now)
|
||||
|
||||
# Advance the step (clamped to the ladder length).
|
||||
state.step = min(state.step + 1, len(self._cooldown_steps))
|
||||
|
||||
# Look up the cooldown for this step (1-indexed → 0-indexed).
|
||||
cooldown = self._cooldown_steps[state.step - 1]
|
||||
|
||||
state.cooldown_until = now + cooldown
|
||||
state.last_hit = now
|
||||
|
||||
return float(cooldown)
|
||||
|
||||
def get_step(self, model: str) -> int:
|
||||
"""Return the current step number for *model* (0 means no active cooldown)."""
|
||||
now = time.monotonic()
|
||||
with self._lock:
|
||||
state = self._get_state(model)
|
||||
self._maybe_reset(state, now)
|
||||
return state.step
|
||||
|
||||
def get_cooldown_status(self) -> Dict[str, Dict[str, float]]:
|
||||
"""Return a snapshot of all models with an active cooldown.
|
||||
|
||||
Returns a dict mapping model name → ``{"remaining": <secs>, "step": <int>}``.
|
||||
Models whose cooldown has already expired are omitted.
|
||||
"""
|
||||
now = time.monotonic()
|
||||
result: Dict[str, Dict[str, float]] = {}
|
||||
with self._lock:
|
||||
for model, state in self._models.items():
|
||||
remaining = max(0.0, state.cooldown_until - now)
|
||||
if remaining > 0:
|
||||
result[model] = {
|
||||
"remaining": round(remaining, 2),
|
||||
"step": state.step,
|
||||
}
|
||||
return result
|
||||
|
||||
def reset(self, model: str | None = None) -> None:
|
||||
"""Reset cooldown state. If *model* is ``None``, reset everything."""
|
||||
with self._lock:
|
||||
if model is None:
|
||||
self._models.clear()
|
||||
elif model in self._models:
|
||||
del self._models[model]
|
||||
|
||||
|
||||
# Module-level singleton for convenient import.
|
||||
rate_limiter = RateLimiter()
|
||||
56
run_agent.py
56
run_agent.py
@@ -100,6 +100,7 @@ from agent.trajectory import (
|
||||
convert_scratchpad_to_think, has_incomplete_scratchpad,
|
||||
save_trajectory as _save_trajectory_to_file,
|
||||
)
|
||||
from agent.rate_limiter import rate_limiter as _rate_limiter
|
||||
from utils import atomic_json_write
|
||||
|
||||
HONCHO_TOOL_NAMES = {
|
||||
@@ -6515,7 +6516,7 @@ class AIAgent:
|
||||
elif not isinstance(content_blocks, list):
|
||||
response_invalid = True
|
||||
error_details.append("response.content is not a list")
|
||||
elif len(content_blocks) == 0:
|
||||
elif len(content_blocks) == 0 and getattr(response, "stop_reason", None) != "sensitive":
|
||||
response_invalid = True
|
||||
error_details.append("response.content is empty")
|
||||
else:
|
||||
@@ -6631,11 +6632,14 @@ class AIAgent:
|
||||
else:
|
||||
finish_reason = "stop"
|
||||
elif self.api_mode == "anthropic_messages":
|
||||
stop_reason_map = {"end_turn": "stop", "tool_use": "tool_calls", "max_tokens": "length", "stop_sequence": "stop"}
|
||||
stop_reason_map = {"end_turn": "stop", "tool_use": "tool_calls", "max_tokens": "length", "stop_sequence": "stop", "sensitive": "content_filter"}
|
||||
finish_reason = stop_reason_map.get(response.stop_reason, "stop")
|
||||
else:
|
||||
finish_reason = response.choices[0].finish_reason
|
||||
|
||||
if finish_reason == "content_filter":
|
||||
self._vprint(f"{self.log_prefix}⚠️ Response filtered by content policy (stop_reason='sensitive')", force=True)
|
||||
|
||||
if finish_reason == "length":
|
||||
self._vprint(f"{self.log_prefix}⚠️ Response truncated (finish_reason='length') - model hit max output tokens", force=True)
|
||||
|
||||
@@ -7015,6 +7019,54 @@ class AIAgent:
|
||||
retry_count = 0
|
||||
continue
|
||||
|
||||
# --- Stepped rate-limit cooldown --------------------------
|
||||
# If we're rate-limited (and fallback either isn't available
|
||||
# or is already active), use the per-model stepped cooldown
|
||||
# instead of the generic exponential backoff.
|
||||
if is_rate_limited:
|
||||
_rl_model = getattr(self, "model", "unknown") or "unknown"
|
||||
_rl_cooldown = _rate_limiter.record_rate_limit(_rl_model)
|
||||
_rl_step = _rate_limiter.get_step(_rl_model)
|
||||
_rl_max_step = len(_rate_limiter._cooldown_steps)
|
||||
self._vprint(
|
||||
f"{self.log_prefix}🚦 Rate limited on {_rl_model}, "
|
||||
f"cooling down for {_rl_cooldown:.0f}s "
|
||||
f"(step {_rl_step}/{_rl_max_step})",
|
||||
force=True,
|
||||
)
|
||||
self._emit_status(
|
||||
f"🚦 Rate limited — cooling down {_rl_cooldown:.0f}s "
|
||||
f"(step {_rl_step}/{_rl_max_step})..."
|
||||
)
|
||||
logging.warning(
|
||||
"%sRate limited on %s — stepped cooldown %ss (step %s/%s)",
|
||||
self.log_prefix, _rl_model, _rl_cooldown,
|
||||
_rl_step, _rl_max_step,
|
||||
)
|
||||
# Sleep in small increments for interrupt responsiveness
|
||||
_rl_end = time.time() + _rl_cooldown
|
||||
while time.time() < _rl_end:
|
||||
if self._interrupt_requested:
|
||||
self._vprint(
|
||||
f"{self.log_prefix}⚡ Interrupt during rate-limit cooldown.",
|
||||
force=True,
|
||||
)
|
||||
self._persist_session(messages, conversation_history)
|
||||
self.clear_interrupt()
|
||||
return {
|
||||
"final_response": (
|
||||
f"Operation interrupted: rate-limit cooldown "
|
||||
f"on {_rl_model} (step {_rl_step}/{_rl_max_step})."
|
||||
),
|
||||
"messages": messages,
|
||||
"api_calls": api_call_count,
|
||||
"completed": False,
|
||||
"interrupted": True,
|
||||
}
|
||||
time.sleep(0.2)
|
||||
continue # retry the API call after cooldown
|
||||
# ----------------------------------------------------------
|
||||
|
||||
is_payload_too_large = (
|
||||
status_code == 413
|
||||
or 'request entity too large' in error_msg
|
||||
|
||||
320
tests/agent/test_rate_limiter.py
Normal file
320
tests/agent/test_rate_limiter.py
Normal file
@@ -0,0 +1,320 @@
|
||||
"""Tests for agent.rate_limiter – per-model stepped cooldown."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
import time
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
|
||||
from agent.rate_limiter import RateLimiter, _COOLDOWN_STEPS, _RESET_WINDOW
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_limiter(steps: tuple[int, ...] = _COOLDOWN_STEPS, reset_window: float = _RESET_WINDOW) -> RateLimiter:
|
||||
"""Create a fresh RateLimiter (not the module-level singleton)."""
|
||||
return RateLimiter(cooldown_steps=steps, reset_window=reset_window)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test stepped cooldown escalation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSteppedCooldown:
|
||||
"""The cooldown should escalate through the ladder: 30s → 60s → 300s."""
|
||||
|
||||
def test_first_hit_returns_30s(self) -> None:
|
||||
rl = _make_limiter()
|
||||
assert rl.record_rate_limit("gpt-4") == 30
|
||||
|
||||
def test_second_hit_returns_60s(self) -> None:
|
||||
rl = _make_limiter()
|
||||
rl.record_rate_limit("gpt-4")
|
||||
assert rl.record_rate_limit("gpt-4") == 60
|
||||
|
||||
def test_third_hit_returns_300s(self) -> None:
|
||||
rl = _make_limiter()
|
||||
rl.record_rate_limit("gpt-4")
|
||||
rl.record_rate_limit("gpt-4")
|
||||
assert rl.record_rate_limit("gpt-4") == 300
|
||||
|
||||
def test_fourth_hit_stays_at_max(self) -> None:
|
||||
rl = _make_limiter()
|
||||
for _ in range(3):
|
||||
rl.record_rate_limit("gpt-4")
|
||||
# 4th hit should stay clamped at step 3 (300s)
|
||||
assert rl.record_rate_limit("gpt-4") == 300
|
||||
|
||||
def test_step_number_increments(self) -> None:
|
||||
rl = _make_limiter()
|
||||
assert rl.get_step("gpt-4") == 0
|
||||
rl.record_rate_limit("gpt-4")
|
||||
assert rl.get_step("gpt-4") == 1
|
||||
rl.record_rate_limit("gpt-4")
|
||||
assert rl.get_step("gpt-4") == 2
|
||||
rl.record_rate_limit("gpt-4")
|
||||
assert rl.get_step("gpt-4") == 3
|
||||
# Stays clamped
|
||||
rl.record_rate_limit("gpt-4")
|
||||
assert rl.get_step("gpt-4") == 3
|
||||
|
||||
def test_custom_steps(self) -> None:
|
||||
rl = _make_limiter(steps=(5, 10))
|
||||
assert rl.record_rate_limit("m") == 5
|
||||
assert rl.record_rate_limit("m") == 10
|
||||
assert rl.record_rate_limit("m") == 10 # clamped
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test cooldown reset after no hits
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCooldownReset:
|
||||
"""Step counter should reset after reset_window seconds of no hits."""
|
||||
|
||||
def test_reset_after_window(self) -> None:
|
||||
rl = _make_limiter(reset_window=10.0)
|
||||
|
||||
# Bump to step 2
|
||||
rl.record_rate_limit("gpt-4")
|
||||
rl.record_rate_limit("gpt-4")
|
||||
assert rl.get_step("gpt-4") == 2
|
||||
|
||||
# Simulate 10+ seconds passing by manipulating last_hit
|
||||
with rl._lock:
|
||||
state = rl._models["gpt-4"]
|
||||
state.last_hit = time.monotonic() - 11.0
|
||||
state.cooldown_until = 0 # clear active cooldown too
|
||||
|
||||
# Next recording should start from step 1 again (reset happened)
|
||||
assert rl.record_rate_limit("gpt-4") == 30
|
||||
assert rl.get_step("gpt-4") == 1
|
||||
|
||||
def test_no_reset_within_window(self) -> None:
|
||||
rl = _make_limiter(reset_window=600.0)
|
||||
|
||||
rl.record_rate_limit("gpt-4")
|
||||
rl.record_rate_limit("gpt-4")
|
||||
assert rl.get_step("gpt-4") == 2
|
||||
|
||||
# No time manipulation → still within window
|
||||
assert rl.record_rate_limit("gpt-4") == 300
|
||||
assert rl.get_step("gpt-4") == 3
|
||||
|
||||
def test_get_step_resets_when_window_elapsed(self) -> None:
|
||||
rl = _make_limiter(reset_window=5.0)
|
||||
rl.record_rate_limit("x")
|
||||
assert rl.get_step("x") == 1
|
||||
|
||||
with rl._lock:
|
||||
rl._models["x"].last_hit = time.monotonic() - 6.0
|
||||
assert rl.get_step("x") == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test per-model isolation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestPerModelIsolation:
|
||||
"""Each model should have its own independent cooldown state."""
|
||||
|
||||
def test_different_models_are_independent(self) -> None:
|
||||
rl = _make_limiter()
|
||||
|
||||
rl.record_rate_limit("gpt-4")
|
||||
rl.record_rate_limit("gpt-4")
|
||||
|
||||
# Claude has not been hit yet → should start at step 1
|
||||
assert rl.record_rate_limit("claude-3") == 30
|
||||
assert rl.get_step("claude-3") == 1
|
||||
|
||||
# GPT-4 should still be at step 2 (plus the third hit now)
|
||||
assert rl.record_rate_limit("gpt-4") == 300
|
||||
assert rl.get_step("gpt-4") == 3
|
||||
|
||||
def test_reset_single_model(self) -> None:
|
||||
rl = _make_limiter()
|
||||
rl.record_rate_limit("a")
|
||||
rl.record_rate_limit("b")
|
||||
|
||||
rl.reset("a")
|
||||
assert rl.get_step("a") == 0
|
||||
assert rl.get_step("b") == 1
|
||||
|
||||
def test_reset_all(self) -> None:
|
||||
rl = _make_limiter()
|
||||
rl.record_rate_limit("a")
|
||||
rl.record_rate_limit("b")
|
||||
rl.reset()
|
||||
assert rl.get_step("a") == 0
|
||||
assert rl.get_step("b") == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test check_rate_limit returns correct remaining time
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCheckRateLimit:
|
||||
"""check_rate_limit should return remaining cooldown or 0."""
|
||||
|
||||
def test_no_cooldown_initially(self) -> None:
|
||||
rl = _make_limiter()
|
||||
assert rl.check_rate_limit("gpt-4") == 0.0
|
||||
|
||||
def test_remaining_time_after_hit(self) -> None:
|
||||
rl = _make_limiter()
|
||||
rl.record_rate_limit("gpt-4") # 30s cooldown
|
||||
|
||||
remaining = rl.check_rate_limit("gpt-4")
|
||||
# Should be very close to 30 (within a small tolerance)
|
||||
assert 28.0 < remaining <= 30.0
|
||||
|
||||
def test_remaining_decreases_over_time(self) -> None:
|
||||
rl = _make_limiter()
|
||||
rl.record_rate_limit("gpt-4")
|
||||
|
||||
# Simulate 10 seconds passing by adjusting cooldown_until
|
||||
with rl._lock:
|
||||
rl._models["gpt-4"].cooldown_until = time.monotonic() + 20.0
|
||||
|
||||
remaining = rl.check_rate_limit("gpt-4")
|
||||
assert 18.0 < remaining <= 20.0
|
||||
|
||||
def test_returns_zero_after_cooldown_expires(self) -> None:
|
||||
rl = _make_limiter()
|
||||
rl.record_rate_limit("gpt-4")
|
||||
|
||||
# Expire the cooldown
|
||||
with rl._lock:
|
||||
rl._models["gpt-4"].cooldown_until = time.monotonic() - 1.0
|
||||
|
||||
assert rl.check_rate_limit("gpt-4") == 0.0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test get_cooldown_status
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestGetCooldownStatus:
|
||||
"""get_cooldown_status should report all models with active cooldowns."""
|
||||
|
||||
def test_empty_when_no_hits(self) -> None:
|
||||
rl = _make_limiter()
|
||||
assert rl.get_cooldown_status() == {}
|
||||
|
||||
def test_shows_active_cooldowns(self) -> None:
|
||||
rl = _make_limiter()
|
||||
rl.record_rate_limit("gpt-4")
|
||||
rl.record_rate_limit("claude-3")
|
||||
|
||||
status = rl.get_cooldown_status()
|
||||
assert "gpt-4" in status
|
||||
assert "claude-3" in status
|
||||
assert status["gpt-4"]["step"] == 1
|
||||
assert status["gpt-4"]["remaining"] > 0
|
||||
|
||||
def test_omits_expired_cooldowns(self) -> None:
|
||||
rl = _make_limiter()
|
||||
rl.record_rate_limit("old")
|
||||
rl.record_rate_limit("new")
|
||||
|
||||
# Expire "old"
|
||||
with rl._lock:
|
||||
rl._models["old"].cooldown_until = time.monotonic() - 1.0
|
||||
|
||||
status = rl.get_cooldown_status()
|
||||
assert "old" not in status
|
||||
assert "new" in status
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test thread safety
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestThreadSafety:
|
||||
"""Concurrent access should not corrupt state."""
|
||||
|
||||
def test_concurrent_record(self) -> None:
|
||||
rl = _make_limiter()
|
||||
errors: list[Exception] = []
|
||||
|
||||
def _hit(model: str, n: int) -> None:
|
||||
try:
|
||||
for _ in range(n):
|
||||
rl.record_rate_limit(model)
|
||||
except Exception as exc:
|
||||
errors.append(exc)
|
||||
|
||||
threads = []
|
||||
for i in range(10):
|
||||
t = threading.Thread(target=_hit, args=(f"model-{i % 3}", 50))
|
||||
threads.append(t)
|
||||
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join(timeout=5)
|
||||
|
||||
assert not errors, f"Thread errors: {errors}"
|
||||
|
||||
# Each of the 3 models should have a valid step (clamped to max)
|
||||
for i in range(3):
|
||||
step = rl.get_step(f"model-{i}")
|
||||
assert 1 <= step <= len(rl._cooldown_steps)
|
||||
|
||||
def test_concurrent_check_and_record(self) -> None:
|
||||
rl = _make_limiter()
|
||||
errors: list[Exception] = []
|
||||
|
||||
def _checker(model: str) -> None:
|
||||
try:
|
||||
for _ in range(100):
|
||||
remaining = rl.check_rate_limit(model)
|
||||
assert remaining >= 0
|
||||
except Exception as exc:
|
||||
errors.append(exc)
|
||||
|
||||
def _recorder(model: str) -> None:
|
||||
try:
|
||||
for _ in range(50):
|
||||
cd = rl.record_rate_limit(model)
|
||||
assert cd > 0
|
||||
except Exception as exc:
|
||||
errors.append(exc)
|
||||
|
||||
threads = [
|
||||
threading.Thread(target=_checker, args=("m",)),
|
||||
threading.Thread(target=_checker, args=("m",)),
|
||||
threading.Thread(target=_recorder, args=("m",)),
|
||||
threading.Thread(target=_recorder, args=("m",)),
|
||||
]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join(timeout=5)
|
||||
|
||||
assert not errors, f"Thread errors: {errors}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test module-level singleton
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSingleton:
|
||||
"""The module-level ``rate_limiter`` should be usable directly."""
|
||||
|
||||
def test_singleton_import(self) -> None:
|
||||
from agent.rate_limiter import rate_limiter
|
||||
assert isinstance(rate_limiter, RateLimiter)
|
||||
|
||||
def test_singleton_records(self) -> None:
|
||||
from agent.rate_limiter import rate_limiter
|
||||
# Reset to avoid pollution from other tests
|
||||
rate_limiter.reset()
|
||||
cd = rate_limiter.record_rate_limit("test-singleton-model")
|
||||
assert cd == 30
|
||||
rate_limiter.reset("test-singleton-model")
|
||||
Reference in New Issue
Block a user