mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-05 18:27:04 +08:00
Compare commits
1 Commits
dependabot
...
cline-port
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6366fb9c8b |
230
gateway/memory_monitor.py
Normal file
230
gateway/memory_monitor.py
Normal file
@@ -0,0 +1,230 @@
|
||||
"""Periodic process memory usage logging for the gateway.
|
||||
|
||||
Ported from cline/cline#10343 (src/standalone/memory-monitor.ts).
|
||||
|
||||
The gateway is a long-lived process that accumulates memory as it caches
|
||||
agent instances, session transcripts, tool schemas, memory providers, MCP
|
||||
connections, etc. A slow leak in any of those subsystems is invisible
|
||||
in a single log line — you only see it by watching RSS climb over hours.
|
||||
|
||||
This module emits a single structured ``[MEMORY] ...`` line every N
|
||||
minutes (default 5) so maintainers investigating a suspected leak can
|
||||
grep ``agent.log`` / ``gateway.log`` for a time series of RSS + Python
|
||||
GC stats. The timer runs in a background thread and shuts down cleanly
|
||||
with the gateway.
|
||||
|
||||
Design notes (parity with the Cline port):
|
||||
* Grep-friendly single-line format beginning ``[MEMORY]``.
|
||||
* Final snapshot logged on shutdown so "last RSS before exit" is
|
||||
always in the log.
|
||||
* Baseline snapshot logged immediately on start.
|
||||
* Daemon thread — never blocks process exit.
|
||||
* Uses ``resource`` (stdlib, Linux/macOS) first and falls back to
|
||||
``psutil`` when ``resource`` isn't available (Windows). Both are
|
||||
optional; when neither works we emit a single WARNING and disable
|
||||
the monitor rather than crashing the gateway.
|
||||
|
||||
Config: ``logging.memory_monitor`` in ``config.yaml`` — see
|
||||
``hermes_cli/config.py`` for the defaults block.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import gc
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_BYTES_TO_MB = 1024 * 1024
|
||||
|
||||
_monitor_thread: Optional[threading.Thread] = None
|
||||
_stop_event: Optional[threading.Event] = None
|
||||
_start_time: Optional[float] = None
|
||||
_interval_seconds: float = 300.0 # 5 minutes
|
||||
_lock = threading.Lock()
|
||||
|
||||
|
||||
def _get_rss_mb() -> Optional[int]:
|
||||
"""Return current process resident set size in MB, or None if unavailable.
|
||||
|
||||
Tries ``resource.getrusage`` first (Linux/macOS, no extra deps), then
|
||||
falls back to ``psutil`` which is an optional hermes-agent dep.
|
||||
"""
|
||||
# Linux / macOS — resource is stdlib. On Linux ru_maxrss is in KB,
|
||||
# on macOS it is in bytes (yes, really). We use it as a cheap
|
||||
# "current" RSS — ru_maxrss reports the high-water mark for the
|
||||
# process, which is what you actually want for leak detection.
|
||||
try:
|
||||
import resource
|
||||
|
||||
maxrss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
|
||||
if sys.platform == "darwin":
|
||||
return int(maxrss / _BYTES_TO_MB)
|
||||
# Linux / other unices: KB
|
||||
return int(maxrss / 1024)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Fallback: psutil (Windows, or unusual unix without resource).
|
||||
try:
|
||||
import psutil # type: ignore
|
||||
|
||||
rss = psutil.Process(os.getpid()).memory_info().rss
|
||||
return int(rss / _BYTES_TO_MB)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def log_memory_usage(prefix: str = "") -> None:
|
||||
"""Log current memory usage in a grep-friendly ``[MEMORY] ...`` line.
|
||||
|
||||
Safe to call on-demand from any thread at important lifecycle
|
||||
moments (after shutdown, after context compression, etc.).
|
||||
|
||||
Parameters
|
||||
----------
|
||||
prefix
|
||||
Optional extra tag inserted after ``[MEMORY]`` — e.g.
|
||||
``"baseline"``, ``"shutdown"``.
|
||||
"""
|
||||
rss = _get_rss_mb()
|
||||
uptime = int(time.monotonic() - _start_time) if _start_time else 0
|
||||
# gc.get_stats() returns per-generation collection counts; the sum
|
||||
# is a cheap proxy for "how much garbage have we created".
|
||||
try:
|
||||
gc_counts = gc.get_count() # (gen0, gen1, gen2)
|
||||
except Exception:
|
||||
gc_counts = (0, 0, 0)
|
||||
# Thread count is a handy correlate when diagnosing thread leaks.
|
||||
try:
|
||||
thread_count = threading.active_count()
|
||||
except Exception:
|
||||
thread_count = 0
|
||||
|
||||
tag = f"{prefix} " if prefix else ""
|
||||
if rss is None:
|
||||
logger.info(
|
||||
"[MEMORY] %srss=unavailable gc=%s threads=%d uptime=%ds",
|
||||
tag,
|
||||
gc_counts,
|
||||
thread_count,
|
||||
uptime,
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
"[MEMORY] %srss=%dMB gc=%s threads=%d uptime=%ds",
|
||||
tag,
|
||||
rss,
|
||||
gc_counts,
|
||||
thread_count,
|
||||
uptime,
|
||||
)
|
||||
|
||||
|
||||
def _monitor_loop(stop_event: threading.Event, interval: float) -> None:
|
||||
"""Background thread body — log every ``interval`` seconds until stopped."""
|
||||
while not stop_event.wait(interval):
|
||||
try:
|
||||
log_memory_usage()
|
||||
except Exception as e:
|
||||
# Never let the monitor crash the gateway; just log and carry on.
|
||||
logger.debug("Memory monitor iteration failed: %s", e)
|
||||
|
||||
|
||||
def start_memory_monitoring(interval_seconds: float = 300.0) -> bool:
|
||||
"""Start periodic memory usage logging in a daemon thread.
|
||||
|
||||
Logs immediately to capture a baseline, then every ``interval_seconds``.
|
||||
Safe to call multiple times — subsequent calls are no-ops while the
|
||||
first monitor is still running.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
interval_seconds
|
||||
How often to log. Default 300s (5 minutes), matching the
|
||||
upstream cline/cline implementation.
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool
|
||||
True if a fresh monitor thread was started, False if one was
|
||||
already running or if memory introspection isn't available.
|
||||
"""
|
||||
global _monitor_thread, _stop_event, _start_time, _interval_seconds
|
||||
|
||||
with _lock:
|
||||
if _monitor_thread is not None and _monitor_thread.is_alive():
|
||||
return False
|
||||
|
||||
# Sanity-check that we can read RSS at all. If neither resource
|
||||
# nor psutil works, no point spinning a thread that can only log
|
||||
# "rss=unavailable" forever — warn once and bail.
|
||||
if _get_rss_mb() is None:
|
||||
logger.warning(
|
||||
"[MEMORY] Memory monitoring unavailable: neither resource.getrusage "
|
||||
"nor psutil could read process RSS — skipping periodic logging.",
|
||||
)
|
||||
return False
|
||||
|
||||
_start_time = time.monotonic()
|
||||
_interval_seconds = float(interval_seconds)
|
||||
_stop_event = threading.Event()
|
||||
|
||||
# Baseline snapshot before the loop starts.
|
||||
log_memory_usage(prefix="baseline")
|
||||
|
||||
_monitor_thread = threading.Thread(
|
||||
target=_monitor_loop,
|
||||
args=(_stop_event, _interval_seconds),
|
||||
name="gateway-memory-monitor",
|
||||
daemon=True,
|
||||
)
|
||||
_monitor_thread.start()
|
||||
|
||||
logger.info(
|
||||
"[MEMORY] Periodic memory monitoring started (interval: %ds)",
|
||||
int(_interval_seconds),
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
def stop_memory_monitoring(timeout: float = 2.0) -> None:
|
||||
"""Stop the monitor thread and log a final snapshot.
|
||||
|
||||
Safe to call even if ``start_memory_monitoring()`` was never called.
|
||||
"""
|
||||
global _monitor_thread, _stop_event
|
||||
|
||||
with _lock:
|
||||
if _stop_event is None or _monitor_thread is None:
|
||||
return
|
||||
|
||||
# Final snapshot before teardown so "last RSS" is always in the log.
|
||||
try:
|
||||
log_memory_usage(prefix="shutdown")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
_stop_event.set()
|
||||
thread = _monitor_thread
|
||||
_monitor_thread = None
|
||||
_stop_event = None
|
||||
|
||||
# Join outside the lock so a stuck log call can't deadlock shutdown.
|
||||
try:
|
||||
thread.join(timeout=timeout)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
logger.info("[MEMORY] Periodic memory monitoring stopped")
|
||||
|
||||
|
||||
def is_running() -> bool:
|
||||
"""True if the background monitor thread is alive."""
|
||||
with _lock:
|
||||
return _monitor_thread is not None and _monitor_thread.is_alive()
|
||||
@@ -11989,6 +11989,33 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
|
||||
from hermes_logging import setup_logging
|
||||
setup_logging(hermes_home=_hermes_home, mode="gateway")
|
||||
|
||||
# Periodic process memory usage logging (gateway only) — emits a
|
||||
# grep-friendly "[MEMORY] rss=...MB ..." line every N minutes so
|
||||
# slow leaks in the long-lived gateway process show up as a time
|
||||
# series in agent.log / gateway.log. Ported from cline/cline#10343.
|
||||
# Controlled by the logging.memory_monitor section in config.yaml.
|
||||
try:
|
||||
from gateway import memory_monitor as _memory_monitor
|
||||
|
||||
_mm_cfg = {}
|
||||
try:
|
||||
# config is loaded a few lines up; re-read the logging section
|
||||
# here so we pick up user overrides without coupling to local
|
||||
# variable names inside the start_gateway body.
|
||||
from hermes_cli.config import load_config as _load_cli_config
|
||||
|
||||
_mm_cfg = (_load_cli_config() or {}).get("logging", {}).get("memory_monitor", {}) or {}
|
||||
except Exception:
|
||||
_mm_cfg = {}
|
||||
if _mm_cfg.get("enabled", True):
|
||||
try:
|
||||
_mm_interval = float(_mm_cfg.get("interval_seconds", 300))
|
||||
except (TypeError, ValueError):
|
||||
_mm_interval = 300.0
|
||||
_memory_monitor.start_memory_monitoring(interval_seconds=_mm_interval)
|
||||
except Exception as _mm_exc:
|
||||
logger.debug("Failed to start memory monitor: %s", _mm_exc)
|
||||
|
||||
# Optional stderr handler — level driven by -v/-q flags on the CLI.
|
||||
# verbosity=None (-q/--quiet): no stderr output
|
||||
# verbosity=0 (default): WARNING and above
|
||||
@@ -12166,6 +12193,16 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Stop the periodic memory monitor (if it was started above).
|
||||
# This also emits one final "[MEMORY] shutdown rss=..." line so the
|
||||
# last RSS reading before gateway exit is always in the log.
|
||||
try:
|
||||
from gateway import memory_monitor as _memory_monitor
|
||||
|
||||
_memory_monitor.stop_memory_monitoring(timeout=2.0)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if runner.exit_code is not None:
|
||||
raise SystemExit(runner.exit_code)
|
||||
|
||||
|
||||
@@ -1090,6 +1090,15 @@ DEFAULT_CONFIG = {
|
||||
"level": "INFO", # Minimum level for agent.log: DEBUG, INFO, WARNING
|
||||
"max_size_mb": 5, # Max size per log file before rotation
|
||||
"backup_count": 3, # Number of rotated backup files to keep
|
||||
# Periodic process memory usage logging (gateway only). Emits a
|
||||
# grep-friendly "[MEMORY] rss=...MB ..." line at the configured
|
||||
# interval so slow leaks in the long-lived gateway are visible
|
||||
# in agent.log / gateway.log as a time series. Ported from
|
||||
# cline/cline#10343.
|
||||
"memory_monitor": {
|
||||
"enabled": True, # Flip to false to silence the periodic line
|
||||
"interval_seconds": 300, # Default: every 5 minutes
|
||||
},
|
||||
},
|
||||
|
||||
# Remotely-hosted model catalog manifest. When enabled, the CLI fetches
|
||||
|
||||
122
tests/gateway/test_memory_monitor.py
Normal file
122
tests/gateway/test_memory_monitor.py
Normal file
@@ -0,0 +1,122 @@
|
||||
"""Tests for gateway.memory_monitor — periodic process memory logging.
|
||||
|
||||
Ported from cline/cline#10343. The module logs a structured
|
||||
``[MEMORY] rss=...MB ...`` line periodically so long-running gateway
|
||||
leaks show up as a time series in agent.log / gateway.log.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway import memory_monitor as mm
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _ensure_monitor_stopped():
|
||||
"""Every test starts from a clean state and leaves one behind."""
|
||||
mm.stop_memory_monitoring(timeout=1.0)
|
||||
yield
|
||||
mm.stop_memory_monitoring(timeout=1.0)
|
||||
|
||||
|
||||
def test_log_memory_usage_emits_memory_line(caplog):
|
||||
caplog.set_level(logging.INFO, logger="gateway.memory_monitor")
|
||||
mm.log_memory_usage()
|
||||
memory_lines = [r for r in caplog.records if "[MEMORY]" in r.getMessage()]
|
||||
assert memory_lines, "expected at least one [MEMORY] log record"
|
||||
|
||||
|
||||
def test_log_memory_usage_has_grep_friendly_format(caplog):
|
||||
caplog.set_level(logging.INFO, logger="gateway.memory_monitor")
|
||||
mm.log_memory_usage()
|
||||
msg = caplog.records[-1].getMessage()
|
||||
# Grep-friendly contract: line starts with [MEMORY] and carries RSS
|
||||
# (or 'unavailable'), GC counts, thread count, uptime.
|
||||
assert msg.startswith("[MEMORY]"), msg
|
||||
assert "rss=" in msg
|
||||
assert "gc=" in msg
|
||||
assert "threads=" in msg
|
||||
assert "uptime=" in msg
|
||||
|
||||
|
||||
def test_log_memory_usage_with_prefix(caplog):
|
||||
caplog.set_level(logging.INFO, logger="gateway.memory_monitor")
|
||||
mm.log_memory_usage(prefix="baseline")
|
||||
msg = caplog.records[-1].getMessage()
|
||||
assert "[MEMORY] baseline " in msg
|
||||
|
||||
|
||||
def test_start_logs_baseline_and_returns_true(caplog):
|
||||
caplog.set_level(logging.INFO, logger="gateway.memory_monitor")
|
||||
# Large interval so the background timer never fires during the test —
|
||||
# we're only checking the synchronous baseline behavior here.
|
||||
started = mm.start_memory_monitoring(interval_seconds=3600.0)
|
||||
assert started is True
|
||||
assert mm.is_running() is True
|
||||
|
||||
messages = [r.getMessage() for r in caplog.records]
|
||||
assert any("[MEMORY] baseline " in m for m in messages), messages
|
||||
assert any("Periodic memory monitoring started" in m for m in messages), messages
|
||||
|
||||
|
||||
def test_double_start_is_noop():
|
||||
assert mm.start_memory_monitoring(interval_seconds=3600.0) is True
|
||||
assert mm.start_memory_monitoring(interval_seconds=3600.0) is False
|
||||
assert mm.is_running() is True
|
||||
|
||||
|
||||
def test_stop_logs_shutdown_snapshot(caplog):
|
||||
mm.start_memory_monitoring(interval_seconds=3600.0)
|
||||
caplog.clear()
|
||||
caplog.set_level(logging.INFO, logger="gateway.memory_monitor")
|
||||
mm.stop_memory_monitoring(timeout=1.0)
|
||||
assert mm.is_running() is False
|
||||
|
||||
messages = [r.getMessage() for r in caplog.records]
|
||||
assert any("[MEMORY] shutdown " in m for m in messages), messages
|
||||
assert any("Periodic memory monitoring stopped" in m for m in messages), messages
|
||||
|
||||
|
||||
def test_stop_without_start_is_noop():
|
||||
# Must not raise, must not log shutdown snapshot.
|
||||
mm.stop_memory_monitoring(timeout=0.5)
|
||||
assert mm.is_running() is False
|
||||
|
||||
|
||||
def test_periodic_timer_fires(caplog):
|
||||
caplog.set_level(logging.INFO, logger="gateway.memory_monitor")
|
||||
# Short interval so we can observe multiple ticks inside the test budget.
|
||||
mm.start_memory_monitoring(interval_seconds=0.1)
|
||||
time.sleep(0.45)
|
||||
mm.stop_memory_monitoring(timeout=1.0)
|
||||
|
||||
periodic = [
|
||||
r for r in caplog.records
|
||||
if r.getMessage().startswith("[MEMORY] rss=") or r.getMessage().startswith("[MEMORY] rss=unavailable")
|
||||
]
|
||||
# baseline + at least 2 periodic + shutdown — but shutdown has the
|
||||
# "shutdown " prefix so it won't match the strict "[MEMORY] rss=" start.
|
||||
# We expect >= 3 bare "[MEMORY] rss=..." lines.
|
||||
assert len(periodic) >= 3, [r.getMessage() for r in caplog.records]
|
||||
|
||||
|
||||
def test_thread_is_daemon():
|
||||
mm.start_memory_monitoring(interval_seconds=3600.0)
|
||||
assert mm._monitor_thread is not None
|
||||
assert mm._monitor_thread.daemon is True, (
|
||||
"memory monitor thread must be daemon so it can never block process exit"
|
||||
)
|
||||
|
||||
|
||||
def test_unavailable_rss_warns_and_does_not_start(caplog, monkeypatch):
|
||||
# Force both backends to claim unavailable; start should bail.
|
||||
monkeypatch.setattr(mm, "_get_rss_mb", lambda: None)
|
||||
caplog.set_level(logging.WARNING, logger="gateway.memory_monitor")
|
||||
started = mm.start_memory_monitoring(interval_seconds=3600.0)
|
||||
assert started is False
|
||||
assert mm.is_running() is False
|
||||
assert any("Memory monitoring unavailable" in r.getMessage() for r in caplog.records)
|
||||
Reference in New Issue
Block a user