mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-21 09:31:31 +08:00
Compare commits
1 Commits
ethie/e2e
...
feat/dashb
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d0964c5150 |
@@ -458,6 +458,72 @@ async def auth_middleware(request: Request, call_next):
|
||||
return await call_next(request)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Access log — one INFO line per HTTP request to the ``gui`` log surface.
|
||||
#
|
||||
# The messaging gateway logs every inbound message to ``gateway.log``; this is
|
||||
# the dashboard/TUI twin for ``gui.log``. Registered LAST so FastAPI runs it
|
||||
# OUTERMOST (middleware is LIFO): it observes the final status code, including
|
||||
# 400/401 rejections produced by the host-header and auth middlewares above.
|
||||
#
|
||||
# Metadata only — method, path (never query string: tokens ride in query on
|
||||
# some routes), status, latency, a short request id, and peer. Request/response
|
||||
# bodies are deliberately never captured here (see the opt-in body-capture
|
||||
# surface). The request id is read from an inbound ``X-Request-ID`` when a
|
||||
# reverse proxy or the SPA supplies one, else minted, and echoed back on the
|
||||
# response so a client can correlate its call with this log line and with the
|
||||
# WebSocket session it subsequently opens.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _new_request_id() -> str:
|
||||
"""Short, log-friendly correlation id (not security-sensitive)."""
|
||||
import secrets
|
||||
|
||||
return secrets.token_hex(4)
|
||||
|
||||
|
||||
@app.middleware("http")
|
||||
async def access_log_middleware(request: Request, call_next):
|
||||
import time as _time
|
||||
|
||||
rid = request.headers.get("x-request-id") or _new_request_id()
|
||||
request.state.rid = rid
|
||||
peer = request.client.host if request.client else "?"
|
||||
start = _time.monotonic()
|
||||
status = 500
|
||||
response = None
|
||||
try:
|
||||
response = await call_next(request)
|
||||
status = response.status_code
|
||||
return response
|
||||
finally:
|
||||
dur_ms = int((_time.monotonic() - start) * 1000)
|
||||
# INFO: the always-on shape/timing line (matches gateway.log granularity).
|
||||
_log.info(
|
||||
"http method=%s path=%s status=%s dur_ms=%d rid=%s peer=%s",
|
||||
request.method,
|
||||
request.url.path,
|
||||
status,
|
||||
dur_ms,
|
||||
rid,
|
||||
peer,
|
||||
)
|
||||
# DEBUG (-v only): noisier fields kept off the default line.
|
||||
if _log.isEnabledFor(logging.DEBUG):
|
||||
_log.debug(
|
||||
"http detail rid=%s ua=%r referer=%r",
|
||||
rid,
|
||||
request.headers.get("user-agent", ""),
|
||||
request.headers.get("referer", ""),
|
||||
)
|
||||
if response is not None:
|
||||
try:
|
||||
response.headers["X-Request-ID"] = rid
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Config schema — auto-generated from DEFAULT_CONFIG
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -11116,19 +11182,27 @@ async def pty_ws(ws: WebSocket) -> None:
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
# --- reader task: PTY master → WebSocket ----------------------------
|
||||
# Counters/reason live in a dict so the nested pump can mutate them
|
||||
# (a closure can't rebind outer ints). Reported in the close line below.
|
||||
_stats = {"bytes_in": 0, "bytes_out": 0, "reason": "client_disconnect"}
|
||||
_pty_start = time.monotonic()
|
||||
|
||||
async def pump_pty_to_ws() -> None:
|
||||
while True:
|
||||
chunk = await loop.run_in_executor(
|
||||
None, bridge.read, _PTY_READ_CHUNK_TIMEOUT
|
||||
)
|
||||
if chunk is None: # EOF
|
||||
if chunk is None: # EOF — PTY child exited (backend gone)
|
||||
_stats["reason"] = "pty_eof"
|
||||
return
|
||||
if not chunk: # no data this tick; yield control and retry
|
||||
await asyncio.sleep(0)
|
||||
continue
|
||||
try:
|
||||
await ws.send_bytes(chunk)
|
||||
_stats["bytes_out"] += len(chunk)
|
||||
except Exception:
|
||||
_stats["reason"] = "send_failed"
|
||||
return
|
||||
|
||||
reader_task = asyncio.create_task(pump_pty_to_ws())
|
||||
@@ -11156,8 +11230,14 @@ async def pty_ws(ws: WebSocket) -> None:
|
||||
continue
|
||||
|
||||
bridge.write(raw)
|
||||
_stats["bytes_in"] += len(raw)
|
||||
except WebSocketDisconnect:
|
||||
pass
|
||||
except Exception:
|
||||
# Unexpected server-side failure in the writer loop — record it so the
|
||||
# close line distinguishes a crash from a clean client disconnect.
|
||||
_stats["reason"] = "error"
|
||||
_log.exception("pty writer loop failed peer=%s", peer)
|
||||
finally:
|
||||
reader_task.cancel()
|
||||
try:
|
||||
@@ -11165,6 +11245,17 @@ async def pty_ws(ws: WebSocket) -> None:
|
||||
except (asyncio.CancelledError, Exception):
|
||||
pass
|
||||
bridge.close()
|
||||
# Close line — the dashboard/TUI twin of handle_ws's "ws closed".
|
||||
# Silent before: a PTY-EOF (backend crash) or send failure left no
|
||||
# trace, so user-reported "chat dropped" was unreproducible.
|
||||
_log.info(
|
||||
"pty closed peer=%s reason=%s dur_s=%.1f bytes_in=%d bytes_out=%d",
|
||||
peer,
|
||||
_stats["reason"],
|
||||
time.monotonic() - _pty_start,
|
||||
_stats["bytes_in"],
|
||||
_stats["bytes_out"],
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -11180,21 +11271,29 @@ async def pty_ws(ws: WebSocket) -> None:
|
||||
|
||||
@app.websocket("/api/ws")
|
||||
async def gateway_ws(ws: WebSocket) -> None:
|
||||
peer = ws.client.host if ws.client else "?"
|
||||
if not _DASHBOARD_EMBEDDED_CHAT_ENABLED:
|
||||
_log.info("ws refused: embedded chat disabled peer=%s", peer)
|
||||
await ws.close(code=4403)
|
||||
return
|
||||
|
||||
if not _ws_auth_ok(ws):
|
||||
_log.warning("ws auth rejected peer=%s", peer)
|
||||
await ws.close(code=4401)
|
||||
return
|
||||
|
||||
if not _ws_request_is_allowed(ws):
|
||||
_log.warning("ws refused: host/origin/peer not allowed peer=%s", peer)
|
||||
await ws.close(code=4403)
|
||||
return
|
||||
|
||||
from tui_gateway.ws import handle_ws
|
||||
|
||||
await handle_ws(ws)
|
||||
# Correlate this WS session with the HTTP upgrade request that opened it
|
||||
# (the SPA/proxy may carry X-Request-ID); handle_ws stamps it on its
|
||||
# accepted/closed lines so the chain reconstructs across surfaces.
|
||||
rid = ws.headers.get("x-request-id")
|
||||
await handle_ws(ws, rid=rid)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -11211,52 +11310,77 @@ async def gateway_ws(ws: WebSocket) -> None:
|
||||
|
||||
@app.websocket("/api/pub")
|
||||
async def pub_ws(ws: WebSocket) -> None:
|
||||
peer = ws.client.host if ws.client else "?"
|
||||
if not _DASHBOARD_EMBEDDED_CHAT_ENABLED:
|
||||
_log.info("pub refused: embedded chat disabled peer=%s", peer)
|
||||
await ws.close(code=4403)
|
||||
return
|
||||
|
||||
if not _ws_auth_ok(ws):
|
||||
_log.warning("pub auth rejected peer=%s", peer)
|
||||
await ws.close(code=4401)
|
||||
return
|
||||
|
||||
if not _ws_request_is_allowed(ws):
|
||||
_log.warning("pub refused: host/origin/peer not allowed peer=%s", peer)
|
||||
await ws.close(code=4403)
|
||||
return
|
||||
|
||||
channel = _channel_or_close_code(ws)
|
||||
if not channel:
|
||||
_log.warning("pub refused: missing/invalid channel peer=%s", peer)
|
||||
await ws.close(code=4400)
|
||||
return
|
||||
|
||||
await ws.accept()
|
||||
|
||||
_log.info("pub accepted peer=%s channel=%s", peer, channel)
|
||||
started = time.monotonic()
|
||||
reason = "client_disconnect"
|
||||
frames = 0
|
||||
try:
|
||||
while True:
|
||||
await _broadcast_event(ws.app, channel, await ws.receive_text())
|
||||
frames += 1
|
||||
except WebSocketDisconnect:
|
||||
pass
|
||||
except Exception:
|
||||
reason = "error"
|
||||
_log.exception("pub loop failed peer=%s channel=%s", peer, channel)
|
||||
finally:
|
||||
_log.info(
|
||||
"pub closed peer=%s channel=%s reason=%s dur_s=%.1f frames=%d",
|
||||
peer, channel, reason, time.monotonic() - started, frames,
|
||||
)
|
||||
|
||||
|
||||
@app.websocket("/api/events")
|
||||
async def events_ws(ws: WebSocket) -> None:
|
||||
peer = ws.client.host if ws.client else "?"
|
||||
if not _DASHBOARD_EMBEDDED_CHAT_ENABLED:
|
||||
_log.info("events refused: embedded chat disabled peer=%s", peer)
|
||||
await ws.close(code=4403)
|
||||
return
|
||||
|
||||
if not _ws_auth_ok(ws):
|
||||
_log.warning("events auth rejected peer=%s", peer)
|
||||
await ws.close(code=4401)
|
||||
return
|
||||
|
||||
if not _ws_request_is_allowed(ws):
|
||||
_log.warning("events refused: host/origin/peer not allowed peer=%s", peer)
|
||||
await ws.close(code=4403)
|
||||
return
|
||||
|
||||
channel = _channel_or_close_code(ws)
|
||||
if not channel:
|
||||
_log.warning("events refused: missing/invalid channel peer=%s", peer)
|
||||
await ws.close(code=4400)
|
||||
return
|
||||
|
||||
await ws.accept()
|
||||
_log.info("events accepted peer=%s channel=%s", peer, channel)
|
||||
started = time.monotonic()
|
||||
reason = "client_disconnect"
|
||||
|
||||
event_channels, event_lock = _get_event_state(ws.app)
|
||||
async with event_lock:
|
||||
@@ -11270,6 +11394,9 @@ async def events_ws(ws: WebSocket) -> None:
|
||||
await ws.receive_text()
|
||||
except WebSocketDisconnect:
|
||||
pass
|
||||
except Exception:
|
||||
reason = "error"
|
||||
_log.exception("events loop failed peer=%s channel=%s", peer, channel)
|
||||
finally:
|
||||
async with event_lock:
|
||||
subs = event_channels.get(channel)
|
||||
@@ -11279,6 +11406,10 @@ async def events_ws(ws: WebSocket) -> None:
|
||||
|
||||
if not subs:
|
||||
event_channels.pop(channel, None)
|
||||
_log.info(
|
||||
"events closed peer=%s channel=%s reason=%s dur_s=%.1f",
|
||||
peer, channel, reason, time.monotonic() - started,
|
||||
)
|
||||
|
||||
|
||||
def _normalise_prefix(raw: Optional[str]) -> str:
|
||||
|
||||
@@ -5843,3 +5843,137 @@ class TestDesktopCronTicker:
|
||||
|
||||
with self._client():
|
||||
assert not called.wait(0.5), "ticker must not run outside the desktop app"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dashboard/TUI observability: HTTP access log + WS lifecycle logging.
|
||||
#
|
||||
# The gateway logs every inbound message to gateway.log; these assert the
|
||||
# dashboard/TUI twin emits the same INFO-granularity shape/timing lines to the
|
||||
# gui surface (hermes_cli.web_server logger). Metadata only — never bodies.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDashboardObservability:
|
||||
"""HTTP access-log middleware + WS lifecycle lines on the gui surface."""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _setup(self, monkeypatch, _isolate_hermes_home):
|
||||
try:
|
||||
from starlette.testclient import TestClient
|
||||
except ImportError:
|
||||
pytest.skip("fastapi/starlette not installed")
|
||||
import hermes_state
|
||||
from hermes_constants import get_hermes_home
|
||||
from hermes_cli.web_server import app, _SESSION_HEADER_NAME, _SESSION_TOKEN
|
||||
|
||||
monkeypatch.setattr(hermes_state, "DEFAULT_DB_PATH", get_hermes_home() / "state.db")
|
||||
self.client = TestClient(app)
|
||||
self.client.headers[_SESSION_HEADER_NAME] = _SESSION_TOKEN
|
||||
|
||||
def test_http_access_log_line_and_request_id_roundtrip(self, caplog):
|
||||
import logging
|
||||
|
||||
with caplog.at_level(logging.INFO, logger="hermes_cli.web_server"):
|
||||
resp = self.client.get("/api/status")
|
||||
|
||||
# Response echoes a correlation id the client/proxy can log against.
|
||||
assert resp.headers.get("X-Request-ID")
|
||||
|
||||
access = [r.getMessage() for r in caplog.records if r.getMessage().startswith("http ")]
|
||||
assert access, "expected an 'http ...' access-log line"
|
||||
line = access[-1]
|
||||
# Behavior contract: the line carries method/path/status/dur/rid/peer,
|
||||
# not a frozen string (no change-detector test).
|
||||
for field in ("method=GET", "path=/api/status", "status=200", "dur_ms=", "rid=", "peer="):
|
||||
assert field in line, f"missing {field!r} in {line!r}"
|
||||
|
||||
def test_http_access_log_never_contains_query_string(self, caplog):
|
||||
import logging
|
||||
|
||||
with caplog.at_level(logging.INFO, logger="hermes_cli.web_server"):
|
||||
# Query carries tokens on some routes; the access line must log
|
||||
# path only, never the query string.
|
||||
self.client.get("/api/status?secret=topsecrettoken")
|
||||
|
||||
access = [r.getMessage() for r in caplog.records if r.getMessage().startswith("http ")]
|
||||
assert access
|
||||
assert "topsecrettoken" not in access[-1]
|
||||
assert "?" not in access[-1].split("path=", 1)[1].split(" ", 1)[0]
|
||||
|
||||
def test_inbound_request_id_is_propagated(self, caplog):
|
||||
import logging
|
||||
|
||||
with caplog.at_level(logging.INFO, logger="hermes_cli.web_server"):
|
||||
resp = self.client.get("/api/status", headers={"X-Request-ID": "deadbeef"})
|
||||
|
||||
assert resp.headers.get("X-Request-ID") == "deadbeef"
|
||||
access = [r.getMessage() for r in caplog.records if r.getMessage().startswith("http ")]
|
||||
assert any("rid=deadbeef" in m for m in access)
|
||||
|
||||
def test_unauthorized_request_is_still_logged_with_status(self, caplog):
|
||||
import logging
|
||||
from starlette.testclient import TestClient
|
||||
from hermes_cli.web_server import app
|
||||
|
||||
unauth = TestClient(app)
|
||||
with caplog.at_level(logging.INFO, logger="hermes_cli.web_server"):
|
||||
resp = unauth.get("/api/env")
|
||||
|
||||
assert resp.status_code == 401
|
||||
# The access log is outermost, so it records the rejection too.
|
||||
access = [r.getMessage() for r in caplog.records if r.getMessage().startswith("http ")]
|
||||
assert any("path=/api/env" in m and "status=401" in m for m in access)
|
||||
|
||||
def test_events_ws_logs_accept_and_close(self, caplog):
|
||||
import logging
|
||||
from hermes_cli.web_server import _SESSION_TOKEN
|
||||
|
||||
with caplog.at_level(logging.INFO, logger="hermes_cli.web_server"):
|
||||
with self.client.websocket_connect(
|
||||
f"/api/events?channel=obs_test&token={_SESSION_TOKEN}"
|
||||
) as conn:
|
||||
conn.close()
|
||||
|
||||
msgs = [r.getMessage() for r in caplog.records]
|
||||
assert any(m.startswith("events accepted ") and "channel=obs_test" in m for m in msgs)
|
||||
close = [m for m in msgs if m.startswith("events closed ")]
|
||||
assert close, "expected an 'events closed' line"
|
||||
for field in ("channel=obs_test", "reason=", "dur_s="):
|
||||
assert field in close[-1], f"missing {field!r} in {close[-1]!r}"
|
||||
|
||||
def test_pub_ws_logs_accept_and_close(self, caplog):
|
||||
import logging
|
||||
from hermes_cli.web_server import _SESSION_TOKEN
|
||||
|
||||
with caplog.at_level(logging.INFO, logger="hermes_cli.web_server"):
|
||||
with self.client.websocket_connect(
|
||||
f"/api/pub?channel=obs_pub&token={_SESSION_TOKEN}"
|
||||
) as conn:
|
||||
conn.close()
|
||||
|
||||
msgs = [r.getMessage() for r in caplog.records]
|
||||
assert any(m.startswith("pub accepted ") and "channel=obs_pub" in m for m in msgs)
|
||||
close = [m for m in msgs if m.startswith("pub closed ")]
|
||||
assert close
|
||||
for field in ("channel=obs_pub", "reason=", "dur_s=", "frames="):
|
||||
assert field in close[-1]
|
||||
|
||||
def test_ws_reject_is_logged(self, caplog):
|
||||
import logging
|
||||
from starlette.testclient import TestClient
|
||||
from starlette.websockets import WebSocketDisconnect
|
||||
from hermes_cli.web_server import app
|
||||
|
||||
unauth = TestClient(app) # no session header/token
|
||||
with caplog.at_level(logging.INFO, logger="hermes_cli.web_server"):
|
||||
try:
|
||||
with unauth.websocket_connect("/api/events?channel=x"):
|
||||
pass
|
||||
except WebSocketDisconnect:
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
msgs = [r.getMessage() for r in caplog.records]
|
||||
assert any(m.startswith("events ") and ("auth rejected" in m or "refused" in m) for m in msgs)
|
||||
|
||||
@@ -126,3 +126,34 @@ def test_ws_write_loop_stall_does_not_latch_transport(monkeypatch):
|
||||
loop.call_soon_threadsafe(loop.stop)
|
||||
thread.join(timeout=2)
|
||||
loop.close()
|
||||
|
||||
|
||||
def test_handle_ws_stamps_rid_on_accept_and_close(monkeypatch, caplog):
|
||||
"""rid from the HTTP upgrade is stamped on the accepted/closed lines so the
|
||||
WS session correlates with the dashboard HTTP access-log line.
|
||||
|
||||
Backward compat: the stdio entry-point calls handle_ws(ws) with no rid;
|
||||
the default rid=None must keep working (asserted by the existing tests
|
||||
above, which all call handle_ws(FakeWS())).
|
||||
"""
|
||||
import logging
|
||||
|
||||
class FakeWS:
|
||||
async def accept(self):
|
||||
pass
|
||||
|
||||
async def send_text(self, line):
|
||||
pass
|
||||
|
||||
async def receive_text(self):
|
||||
raise ws_mod._WebSocketDisconnect()
|
||||
|
||||
async def close(self):
|
||||
pass
|
||||
|
||||
with caplog.at_level(logging.INFO, logger="tui_gateway.ws"):
|
||||
asyncio.run(ws_mod.handle_ws(FakeWS(), rid="cafe1234"))
|
||||
|
||||
msgs = [r.getMessage() for r in caplog.records]
|
||||
assert any(m.startswith("ws accepted ") and "rid=cafe1234" in m for m in msgs)
|
||||
assert any(m.startswith("ws closed ") and "rid=cafe1234" in m for m in msgs)
|
||||
|
||||
@@ -170,8 +170,15 @@ def _disable_nagle(ws: Any) -> None:
|
||||
_log.debug("ws TCP_NODELAY skip: %s", exc)
|
||||
|
||||
|
||||
async def handle_ws(ws: Any) -> None:
|
||||
"""Run one WebSocket session. Wire-compatible with ``tui_gateway.entry``."""
|
||||
async def handle_ws(ws: Any, rid: str | None = None) -> None:
|
||||
"""Run one WebSocket session. Wire-compatible with ``tui_gateway.entry``.
|
||||
|
||||
``rid`` is an optional correlation id propagated from the HTTP upgrade
|
||||
request (``X-Request-ID``) by the dashboard's ``/api/ws`` route. It is
|
||||
stamped on the accepted/closed lines so a WS session ties back to the
|
||||
HTTP access-log line that opened it. ``None`` when launched over stdio
|
||||
(``tui_gateway.entry``), where there is no HTTP request.
|
||||
"""
|
||||
peer = _ws_peer_label(ws)
|
||||
transport: WSTransport | None = None
|
||||
messages = 0
|
||||
@@ -186,7 +193,7 @@ async def handle_ws(ws: Any) -> None:
|
||||
# Push small streamed frames out immediately instead of letting Nagle
|
||||
# batch them — keeps the live token cadence intact for GUI clients.
|
||||
_disable_nagle(ws)
|
||||
_log.info("ws accepted peer=%s", peer)
|
||||
_log.info("ws accepted peer=%s rid=%s", peer, rid)
|
||||
|
||||
transport = WSTransport(ws, asyncio.get_running_loop(), peer=peer)
|
||||
|
||||
@@ -327,9 +334,10 @@ async def handle_ws(ws: Any) -> None:
|
||||
except Exception as exc:
|
||||
_log.debug("ws close failed peer=%s error=%s", peer, exc)
|
||||
_log.info(
|
||||
"ws closed peer=%s reason=%s messages=%d parse_errors=%d "
|
||||
"ws closed peer=%s rid=%s reason=%s messages=%d parse_errors=%d "
|
||||
"dispatch_crashes=%d send_failures=%d reaped_sessions=%d detached_sessions=%d",
|
||||
peer,
|
||||
rid,
|
||||
disconnect_reason,
|
||||
messages,
|
||||
parse_errors,
|
||||
|
||||
@@ -853,7 +853,7 @@ View, tail, and filter Hermes log files. All logs are stored in `~/.hermes/logs/
|
||||
| `agent` (default) | `agent.log` | All agent activity — API calls, tool dispatch, session lifecycle (INFO and above) |
|
||||
| `errors` | `errors.log` | Warnings and errors only — a filtered subset of agent.log |
|
||||
| `gateway` | `gateway.log` | Messaging gateway activity — platform connections, message dispatch, webhook events |
|
||||
| `gui` | `gui.log` | Dashboard / TUI-gateway / PTY-bridge / websocket events |
|
||||
| `gui` | `gui.log` | Dashboard / TUI-gateway / PTY-bridge / websocket events — HTTP access log (method, path, status, latency, request id) and WebSocket lifecycle (accept, reject reason, close reason, duration, byte/message counters) |
|
||||
| `desktop` | `desktop.log` | Electron desktop app — boot, backend spawn output, and recent Python tracebacks |
|
||||
|
||||
### Options
|
||||
|
||||
Reference in New Issue
Block a user