Compare commits

...

1 Commits

Author SHA1 Message Date
kshitijk4poor
d0964c5150 feat(dashboard): log HTTP access + WebSocket lifecycle to the gui surface
The messaging gateway logs every inbound message to gateway.log, but its
dashboard/TUI twin (gui.log) was nearly silent: the dashboard FastAPI app had
host-header/auth-gate/auth middlewares but no access log, and 3 of 4 WebSocket
endpoints logged nothing at all. Worst of these, /api/pty logged 'pty accepted'
on connect but was completely silent on close — a PTY EOF (backend crash), a
send failure, or a client drop left no trace, so user-reported 'chat
disconnected / TUI froze' was unreproducible.

Extend the convention tui_gateway/ws.py::handle_ws already establishes (a
structured 'ws closed peer=... reason=... <counters>' line) across the whole
surface, at the same INFO granularity as gateway.log, into the gui.log that is
already sized for it (10MB x5):

- HTTP access-log middleware (registered LIFO-outermost so it captures the
  final status, including 400/401 from the middlewares above): one INFO line
  per request with method, path, status, latency, request id, peer. Path only,
  never the query string (tokens ride in query on some routes). UA/referer at
  DEBUG (-v). Reads/echoes X-Request-ID for client/proxy correlation.
- /api/pty: structured close line covering all exit paths
  (client_disconnect | pty_eof | send_failed | error) with duration and
  bytes_in/out counters.
- /api/pub + /api/events: accept + structured close (reason/duration/frames)
  + all reject paths.
- /api/ws: reject paths logged; request id threaded into handle_ws and stamped
  on its accept/close lines so a WS session correlates with the HTTP upgrade.

Metadata only — no request/response bodies, no WS frame payloads, no
headers/cookies on the INFO lines. Opt-in body capture is a separate change so
this stays clear of the debug-share privacy surface.

handle_ws gains an optional rid=None arg, backward-compatible with the stdio
entry-point (tui_gateway.entry) which calls handle_ws(ws).

Tests (behavior-contract style, not frozen strings): HTTP access line shape +
query-string redaction + 401-still-logged + X-Request-ID round-trip; WS
accept/close lines for /api/pub and /api/events; WS reject logging; rid
propagation through handle_ws.
2026-06-19 17:45:17 +05:30
5 changed files with 312 additions and 8 deletions

View File

@@ -458,6 +458,72 @@ async def auth_middleware(request: Request, call_next):
return await call_next(request)
# ---------------------------------------------------------------------------
# Access log — one INFO line per HTTP request to the ``gui`` log surface.
#
# The messaging gateway logs every inbound message to ``gateway.log``; this is
# the dashboard/TUI twin for ``gui.log``. Registered LAST so FastAPI runs it
# OUTERMOST (middleware is LIFO): it observes the final status code, including
# 400/401 rejections produced by the host-header and auth middlewares above.
#
# Metadata only — method, path (never query string: tokens ride in query on
# some routes), status, latency, a short request id, and peer. Request/response
# bodies are deliberately never captured here (see the opt-in body-capture
# surface). The request id is read from an inbound ``X-Request-ID`` when a
# reverse proxy or the SPA supplies one, else minted, and echoed back on the
# response so a client can correlate its call with this log line and with the
# WebSocket session it subsequently opens.
# ---------------------------------------------------------------------------
def _new_request_id() -> str:
"""Short, log-friendly correlation id (not security-sensitive)."""
import secrets
return secrets.token_hex(4)
@app.middleware("http")
async def access_log_middleware(request: Request, call_next):
import time as _time
rid = request.headers.get("x-request-id") or _new_request_id()
request.state.rid = rid
peer = request.client.host if request.client else "?"
start = _time.monotonic()
status = 500
response = None
try:
response = await call_next(request)
status = response.status_code
return response
finally:
dur_ms = int((_time.monotonic() - start) * 1000)
# INFO: the always-on shape/timing line (matches gateway.log granularity).
_log.info(
"http method=%s path=%s status=%s dur_ms=%d rid=%s peer=%s",
request.method,
request.url.path,
status,
dur_ms,
rid,
peer,
)
# DEBUG (-v only): noisier fields kept off the default line.
if _log.isEnabledFor(logging.DEBUG):
_log.debug(
"http detail rid=%s ua=%r referer=%r",
rid,
request.headers.get("user-agent", ""),
request.headers.get("referer", ""),
)
if response is not None:
try:
response.headers["X-Request-ID"] = rid
except Exception:
pass
# ---------------------------------------------------------------------------
# Config schema — auto-generated from DEFAULT_CONFIG
# ---------------------------------------------------------------------------
@@ -11116,19 +11182,27 @@ async def pty_ws(ws: WebSocket) -> None:
loop = asyncio.get_running_loop()
# --- reader task: PTY master → WebSocket ----------------------------
# Counters/reason live in a dict so the nested pump can mutate them
# (a closure can't rebind outer ints). Reported in the close line below.
_stats = {"bytes_in": 0, "bytes_out": 0, "reason": "client_disconnect"}
_pty_start = time.monotonic()
async def pump_pty_to_ws() -> None:
while True:
chunk = await loop.run_in_executor(
None, bridge.read, _PTY_READ_CHUNK_TIMEOUT
)
if chunk is None: # EOF
if chunk is None: # EOF — PTY child exited (backend gone)
_stats["reason"] = "pty_eof"
return
if not chunk: # no data this tick; yield control and retry
await asyncio.sleep(0)
continue
try:
await ws.send_bytes(chunk)
_stats["bytes_out"] += len(chunk)
except Exception:
_stats["reason"] = "send_failed"
return
reader_task = asyncio.create_task(pump_pty_to_ws())
@@ -11156,8 +11230,14 @@ async def pty_ws(ws: WebSocket) -> None:
continue
bridge.write(raw)
_stats["bytes_in"] += len(raw)
except WebSocketDisconnect:
pass
except Exception:
# Unexpected server-side failure in the writer loop — record it so the
# close line distinguishes a crash from a clean client disconnect.
_stats["reason"] = "error"
_log.exception("pty writer loop failed peer=%s", peer)
finally:
reader_task.cancel()
try:
@@ -11165,6 +11245,17 @@ async def pty_ws(ws: WebSocket) -> None:
except (asyncio.CancelledError, Exception):
pass
bridge.close()
# Close line — the dashboard/TUI twin of handle_ws's "ws closed".
# Silent before: a PTY-EOF (backend crash) or send failure left no
# trace, so user-reported "chat dropped" was unreproducible.
_log.info(
"pty closed peer=%s reason=%s dur_s=%.1f bytes_in=%d bytes_out=%d",
peer,
_stats["reason"],
time.monotonic() - _pty_start,
_stats["bytes_in"],
_stats["bytes_out"],
)
# ---------------------------------------------------------------------------
@@ -11180,21 +11271,29 @@ async def pty_ws(ws: WebSocket) -> None:
@app.websocket("/api/ws")
async def gateway_ws(ws: WebSocket) -> None:
peer = ws.client.host if ws.client else "?"
if not _DASHBOARD_EMBEDDED_CHAT_ENABLED:
_log.info("ws refused: embedded chat disabled peer=%s", peer)
await ws.close(code=4403)
return
if not _ws_auth_ok(ws):
_log.warning("ws auth rejected peer=%s", peer)
await ws.close(code=4401)
return
if not _ws_request_is_allowed(ws):
_log.warning("ws refused: host/origin/peer not allowed peer=%s", peer)
await ws.close(code=4403)
return
from tui_gateway.ws import handle_ws
await handle_ws(ws)
# Correlate this WS session with the HTTP upgrade request that opened it
# (the SPA/proxy may carry X-Request-ID); handle_ws stamps it on its
# accepted/closed lines so the chain reconstructs across surfaces.
rid = ws.headers.get("x-request-id")
await handle_ws(ws, rid=rid)
# ---------------------------------------------------------------------------
@@ -11211,52 +11310,77 @@ async def gateway_ws(ws: WebSocket) -> None:
@app.websocket("/api/pub")
async def pub_ws(ws: WebSocket) -> None:
peer = ws.client.host if ws.client else "?"
if not _DASHBOARD_EMBEDDED_CHAT_ENABLED:
_log.info("pub refused: embedded chat disabled peer=%s", peer)
await ws.close(code=4403)
return
if not _ws_auth_ok(ws):
_log.warning("pub auth rejected peer=%s", peer)
await ws.close(code=4401)
return
if not _ws_request_is_allowed(ws):
_log.warning("pub refused: host/origin/peer not allowed peer=%s", peer)
await ws.close(code=4403)
return
channel = _channel_or_close_code(ws)
if not channel:
_log.warning("pub refused: missing/invalid channel peer=%s", peer)
await ws.close(code=4400)
return
await ws.accept()
_log.info("pub accepted peer=%s channel=%s", peer, channel)
started = time.monotonic()
reason = "client_disconnect"
frames = 0
try:
while True:
await _broadcast_event(ws.app, channel, await ws.receive_text())
frames += 1
except WebSocketDisconnect:
pass
except Exception:
reason = "error"
_log.exception("pub loop failed peer=%s channel=%s", peer, channel)
finally:
_log.info(
"pub closed peer=%s channel=%s reason=%s dur_s=%.1f frames=%d",
peer, channel, reason, time.monotonic() - started, frames,
)
@app.websocket("/api/events")
async def events_ws(ws: WebSocket) -> None:
peer = ws.client.host if ws.client else "?"
if not _DASHBOARD_EMBEDDED_CHAT_ENABLED:
_log.info("events refused: embedded chat disabled peer=%s", peer)
await ws.close(code=4403)
return
if not _ws_auth_ok(ws):
_log.warning("events auth rejected peer=%s", peer)
await ws.close(code=4401)
return
if not _ws_request_is_allowed(ws):
_log.warning("events refused: host/origin/peer not allowed peer=%s", peer)
await ws.close(code=4403)
return
channel = _channel_or_close_code(ws)
if not channel:
_log.warning("events refused: missing/invalid channel peer=%s", peer)
await ws.close(code=4400)
return
await ws.accept()
_log.info("events accepted peer=%s channel=%s", peer, channel)
started = time.monotonic()
reason = "client_disconnect"
event_channels, event_lock = _get_event_state(ws.app)
async with event_lock:
@@ -11270,6 +11394,9 @@ async def events_ws(ws: WebSocket) -> None:
await ws.receive_text()
except WebSocketDisconnect:
pass
except Exception:
reason = "error"
_log.exception("events loop failed peer=%s channel=%s", peer, channel)
finally:
async with event_lock:
subs = event_channels.get(channel)
@@ -11279,6 +11406,10 @@ async def events_ws(ws: WebSocket) -> None:
if not subs:
event_channels.pop(channel, None)
_log.info(
"events closed peer=%s channel=%s reason=%s dur_s=%.1f",
peer, channel, reason, time.monotonic() - started,
)
def _normalise_prefix(raw: Optional[str]) -> str:

View File

@@ -5843,3 +5843,137 @@ class TestDesktopCronTicker:
with self._client():
assert not called.wait(0.5), "ticker must not run outside the desktop app"
# ---------------------------------------------------------------------------
# Dashboard/TUI observability: HTTP access log + WS lifecycle logging.
#
# The gateway logs every inbound message to gateway.log; these assert the
# dashboard/TUI twin emits the same INFO-granularity shape/timing lines to the
# gui surface (hermes_cli.web_server logger). Metadata only — never bodies.
# ---------------------------------------------------------------------------
class TestDashboardObservability:
"""HTTP access-log middleware + WS lifecycle lines on the gui surface."""
@pytest.fixture(autouse=True)
def _setup(self, monkeypatch, _isolate_hermes_home):
try:
from starlette.testclient import TestClient
except ImportError:
pytest.skip("fastapi/starlette not installed")
import hermes_state
from hermes_constants import get_hermes_home
from hermes_cli.web_server import app, _SESSION_HEADER_NAME, _SESSION_TOKEN
monkeypatch.setattr(hermes_state, "DEFAULT_DB_PATH", get_hermes_home() / "state.db")
self.client = TestClient(app)
self.client.headers[_SESSION_HEADER_NAME] = _SESSION_TOKEN
def test_http_access_log_line_and_request_id_roundtrip(self, caplog):
import logging
with caplog.at_level(logging.INFO, logger="hermes_cli.web_server"):
resp = self.client.get("/api/status")
# Response echoes a correlation id the client/proxy can log against.
assert resp.headers.get("X-Request-ID")
access = [r.getMessage() for r in caplog.records if r.getMessage().startswith("http ")]
assert access, "expected an 'http ...' access-log line"
line = access[-1]
# Behavior contract: the line carries method/path/status/dur/rid/peer,
# not a frozen string (no change-detector test).
for field in ("method=GET", "path=/api/status", "status=200", "dur_ms=", "rid=", "peer="):
assert field in line, f"missing {field!r} in {line!r}"
def test_http_access_log_never_contains_query_string(self, caplog):
import logging
with caplog.at_level(logging.INFO, logger="hermes_cli.web_server"):
# Query carries tokens on some routes; the access line must log
# path only, never the query string.
self.client.get("/api/status?secret=topsecrettoken")
access = [r.getMessage() for r in caplog.records if r.getMessage().startswith("http ")]
assert access
assert "topsecrettoken" not in access[-1]
assert "?" not in access[-1].split("path=", 1)[1].split(" ", 1)[0]
def test_inbound_request_id_is_propagated(self, caplog):
import logging
with caplog.at_level(logging.INFO, logger="hermes_cli.web_server"):
resp = self.client.get("/api/status", headers={"X-Request-ID": "deadbeef"})
assert resp.headers.get("X-Request-ID") == "deadbeef"
access = [r.getMessage() for r in caplog.records if r.getMessage().startswith("http ")]
assert any("rid=deadbeef" in m for m in access)
def test_unauthorized_request_is_still_logged_with_status(self, caplog):
import logging
from starlette.testclient import TestClient
from hermes_cli.web_server import app
unauth = TestClient(app)
with caplog.at_level(logging.INFO, logger="hermes_cli.web_server"):
resp = unauth.get("/api/env")
assert resp.status_code == 401
# The access log is outermost, so it records the rejection too.
access = [r.getMessage() for r in caplog.records if r.getMessage().startswith("http ")]
assert any("path=/api/env" in m and "status=401" in m for m in access)
def test_events_ws_logs_accept_and_close(self, caplog):
import logging
from hermes_cli.web_server import _SESSION_TOKEN
with caplog.at_level(logging.INFO, logger="hermes_cli.web_server"):
with self.client.websocket_connect(
f"/api/events?channel=obs_test&token={_SESSION_TOKEN}"
) as conn:
conn.close()
msgs = [r.getMessage() for r in caplog.records]
assert any(m.startswith("events accepted ") and "channel=obs_test" in m for m in msgs)
close = [m for m in msgs if m.startswith("events closed ")]
assert close, "expected an 'events closed' line"
for field in ("channel=obs_test", "reason=", "dur_s="):
assert field in close[-1], f"missing {field!r} in {close[-1]!r}"
def test_pub_ws_logs_accept_and_close(self, caplog):
import logging
from hermes_cli.web_server import _SESSION_TOKEN
with caplog.at_level(logging.INFO, logger="hermes_cli.web_server"):
with self.client.websocket_connect(
f"/api/pub?channel=obs_pub&token={_SESSION_TOKEN}"
) as conn:
conn.close()
msgs = [r.getMessage() for r in caplog.records]
assert any(m.startswith("pub accepted ") and "channel=obs_pub" in m for m in msgs)
close = [m for m in msgs if m.startswith("pub closed ")]
assert close
for field in ("channel=obs_pub", "reason=", "dur_s=", "frames="):
assert field in close[-1]
def test_ws_reject_is_logged(self, caplog):
import logging
from starlette.testclient import TestClient
from starlette.websockets import WebSocketDisconnect
from hermes_cli.web_server import app
unauth = TestClient(app) # no session header/token
with caplog.at_level(logging.INFO, logger="hermes_cli.web_server"):
try:
with unauth.websocket_connect("/api/events?channel=x"):
pass
except WebSocketDisconnect:
pass
except Exception:
pass
msgs = [r.getMessage() for r in caplog.records]
assert any(m.startswith("events ") and ("auth rejected" in m or "refused" in m) for m in msgs)

View File

@@ -126,3 +126,34 @@ def test_ws_write_loop_stall_does_not_latch_transport(monkeypatch):
loop.call_soon_threadsafe(loop.stop)
thread.join(timeout=2)
loop.close()
def test_handle_ws_stamps_rid_on_accept_and_close(monkeypatch, caplog):
"""rid from the HTTP upgrade is stamped on the accepted/closed lines so the
WS session correlates with the dashboard HTTP access-log line.
Backward compat: the stdio entry-point calls handle_ws(ws) with no rid;
the default rid=None must keep working (asserted by the existing tests
above, which all call handle_ws(FakeWS())).
"""
import logging
class FakeWS:
async def accept(self):
pass
async def send_text(self, line):
pass
async def receive_text(self):
raise ws_mod._WebSocketDisconnect()
async def close(self):
pass
with caplog.at_level(logging.INFO, logger="tui_gateway.ws"):
asyncio.run(ws_mod.handle_ws(FakeWS(), rid="cafe1234"))
msgs = [r.getMessage() for r in caplog.records]
assert any(m.startswith("ws accepted ") and "rid=cafe1234" in m for m in msgs)
assert any(m.startswith("ws closed ") and "rid=cafe1234" in m for m in msgs)

View File

@@ -170,8 +170,15 @@ def _disable_nagle(ws: Any) -> None:
_log.debug("ws TCP_NODELAY skip: %s", exc)
async def handle_ws(ws: Any) -> None:
"""Run one WebSocket session. Wire-compatible with ``tui_gateway.entry``."""
async def handle_ws(ws: Any, rid: str | None = None) -> None:
"""Run one WebSocket session. Wire-compatible with ``tui_gateway.entry``.
``rid`` is an optional correlation id propagated from the HTTP upgrade
request (``X-Request-ID``) by the dashboard's ``/api/ws`` route. It is
stamped on the accepted/closed lines so a WS session ties back to the
HTTP access-log line that opened it. ``None`` when launched over stdio
(``tui_gateway.entry``), where there is no HTTP request.
"""
peer = _ws_peer_label(ws)
transport: WSTransport | None = None
messages = 0
@@ -186,7 +193,7 @@ async def handle_ws(ws: Any) -> None:
# Push small streamed frames out immediately instead of letting Nagle
# batch them — keeps the live token cadence intact for GUI clients.
_disable_nagle(ws)
_log.info("ws accepted peer=%s", peer)
_log.info("ws accepted peer=%s rid=%s", peer, rid)
transport = WSTransport(ws, asyncio.get_running_loop(), peer=peer)
@@ -327,9 +334,10 @@ async def handle_ws(ws: Any) -> None:
except Exception as exc:
_log.debug("ws close failed peer=%s error=%s", peer, exc)
_log.info(
"ws closed peer=%s reason=%s messages=%d parse_errors=%d "
"ws closed peer=%s rid=%s reason=%s messages=%d parse_errors=%d "
"dispatch_crashes=%d send_failures=%d reaped_sessions=%d detached_sessions=%d",
peer,
rid,
disconnect_reason,
messages,
parse_errors,

View File

@@ -853,7 +853,7 @@ View, tail, and filter Hermes log files. All logs are stored in `~/.hermes/logs/
| `agent` (default) | `agent.log` | All agent activity — API calls, tool dispatch, session lifecycle (INFO and above) |
| `errors` | `errors.log` | Warnings and errors only — a filtered subset of agent.log |
| `gateway` | `gateway.log` | Messaging gateway activity — platform connections, message dispatch, webhook events |
| `gui` | `gui.log` | Dashboard / TUI-gateway / PTY-bridge / websocket events |
| `gui` | `gui.log` | Dashboard / TUI-gateway / PTY-bridge / websocket events — HTTP access log (method, path, status, latency, request id) and WebSocket lifecycle (accept, reject reason, close reason, duration, byte/message counters) |
| `desktop` | `desktop.log` | Electron desktop app — boot, backend spawn output, and recent Python tracebacks |
### Options