Compare commits

...

2 Commits

Author SHA1 Message Date
Brooklyn Nicholson
3d8be3e84d feat(lint): observability for the LSP bridge with steady-state silence
Adds per-call structured logging on the dedicated ``hermes.lint.lsp``
logger so an opt-in user can answer "did LSP fire on that edit?" with
``rg 'lsp\['`` against ``~/.hermes/logs/agent.log``. Levels are tuned
so a 1000-write session emits exactly ONE INFO line at the default
threshold, not 1000.

Level model
-----------
* ``DEBUG`` (invisible at the default INFO threshold) for every per-call
  steady-state event: ``clean``, ``feature off``, ``extension not
  mapped``, ``backend not local``, repeated ``no project root`` for an
  already-announced file, repeated ``server unavailable`` for an
  already-announced binary.
* ``INFO`` for state transitions worth surfacing once: ``active for
  <root>`` the first time a (language, project_root) client starts,
  ``no project root for <path>`` the first time we see that orphan
  file. Plus every ``N diags`` event — diagnostics are inherently rare
  per-edit and are exactly the failure signal users want to grep for.
* ``WARNING`` for action-required failures the first time per
  (language, binary): ``server unavailable`` (binary not on PATH),
  ``no server configured``. Per-call ``WARNING`` for timeouts, server
  errors, and unexpected bridge exceptions — these are inherently
  novel events, not steady state, and each one is its own signal.

Dedup is in-process module-level sets guarded by a lock. Sets grow at
most by the number of distinct (language, project_root) and (language,
binary) pairs touched in one Python process — a few hundred entries in
the most aggressive monorepo session, which is bytes of memory. A
bounded LRU was rejected because evicting an entry would risk
re-firing the WARNING/INFO line we explicitly want to suppress.

Why this matters
----------------
The previous draft logged every per-call event at INFO. ``agent.log``
caps at 5 MB × 3 backups (= 20 MB) via ``RotatingFileHandler``, so
nothing would crash, but a normal coding session would dwarf the actual
signal under hundreds of ``lsp[typescript] clean (...)`` lines. The
new model preserves the verification answer ("LSP active for <root>")
and the action-required signals while keeping clean steady state out
of the user's face.

Tests
-----
* ``TestLogLevelsSteadyState`` — feature off, unmapped extension, non-
  local backend, and repeated clean writes all stay at DEBUG. Exactly
  one INFO ("active for ...") survives across N calls.
* ``TestLogLevelsNovelEvents`` — diagnostics are INFO per call;
  ``active for`` fires once per (language, root).
* ``TestLogLevelsActionRequired`` — server unavailable warns once per
  binary; orphan files INFO once per path; timeouts WARN every time.
2026-05-12 00:28:30 -04:00
Brooklyn Nicholson
9a546d6b08 feat(lint): opt-in LSP-backed lint path in _check_lint
The post-edit lint hook in `tools/file_operations.py::_check_lint` has
historically resolved `.ts`/`.tsx` to `npx tsc --noEmit <single-file>`,
which has no view of the surrounding project and floods the agent with
phantom "Cannot find module" errors that disappear the moment
`tsconfig.json` is in scope. Same shape for `.go` (`go vet` orphan) and
`.rs` (`rustfmt --check` is style, not types).

This change adds an opt-in LSP path that runs *before* the in-process /
shell linter table:

- `tools/lsp_client.py` — stdio JSON-RPC client with Content-Length
  framing, per-(language, project_root, server_cmd) cache, idle reaper.
  Sync API (`diagnostics(path, content)`) returns one snapshot via
  `textDocument/didOpen` + `publishDiagnostics` with a configurable
  settle window for servers that emit "indexing" diagnostics first.
  Hand-rolled rather than pulled from `multilspy` / `sansio-lsp-client`
  / `pygls` — see the module docstring for the comparison.
- `tools/lsp_lint.py` — bridge between `_check_lint` and the client.
  Returns `None` (caller falls through to the existing shell linter)
  whenever LSP cannot or should not handle the file: feature off,
  language unmapped, env not local, no project root marker, server
  binary missing, request timed out. Never raises.
- `tools/file_operations.py::_check_lint` — calls the bridge first,
  falls through to the in-process and shell linters unchanged. With
  the flag off (the default) this is one extra `import` on the lint
  path and zero behaviour change.
- `hermes_cli/config.py` — new `lint.lsp.{enabled,servers,...}` block,
  off by default. New top-level key, so the deep-merge handles
  upgrades and no `_config_version` bump is required.

Out of scope for this PR (explicitly noted in the module docstrings):

- Container / SSH / Modal / Daytona backends. The server has to run
  inside the sandbox, which means baking it into the image or
  installing on first use; that's the gnarly half and gets its own PR.
- didChange / live editing. We re-open with the freshly written bytes
  on every call.
- Pull diagnostics (LSP 3.17). Push works on every server we care
  about today.

Tests:

- `test_lsp_client.py` — pure helpers (project root walk, URI, framing,
  Diagnostic conversion) + end-to-end coverage against an in-tree fake
  LSP server (Python script over stdio) for clean / dirty / settle /
  timeout / registry caching / shutdown.
- `test_lsp_lint_bridge.py` — feature flag, backend gating, project
  root gating, language-map gating, error containment.
- `test_check_lint_lsp_routing.py` — regression guard that the default
  (LSP disabled) path still hits in-process / shell linters, plus
  proof that an enabled bridge short-circuits the shell linter.
2026-05-12 00:13:12 -04:00
7 changed files with 1860 additions and 7 deletions

View File

@@ -1193,6 +1193,45 @@ DEFAULT_CONFIG = {
},
},
# Post-edit lint behaviour. Hermes runs a syntax check after every
# write/patch and surfaces only the errors that edit introduced (see
# ``ShellFileOperations._check_lint_delta`` in tools/file_operations.py).
# The defaults below leave the legacy shell linters (``npx tsc --noEmit``,
# ``go vet``, ``rustfmt --check``, ``py_compile``) in charge — the LSP
# path is opt-in until container/SSH backends grow a way to host the
# language server inside the sandbox.
"lint": {
"lsp": {
# Master switch. When false, ``_check_lint`` skips LSP entirely
# and the in-process / shell linter table runs as before.
"enabled": False,
# Per-language server launch command. Each value is either a
# whitespace-separated string or a list. Missing languages
# fall back to ``tools.lsp_lint._DEFAULT_SERVERS``.
"servers": {
"typescript": "typescript-language-server --stdio",
"typescriptreact": "typescript-language-server --stdio",
"javascript": "typescript-language-server --stdio",
"javascriptreact": "typescript-language-server --stdio",
"rust": "rust-analyzer",
"go": "gopls",
},
# Wall-clock timeout for a single ``didOpen`` → diagnostics
# round-trip. Servers that take longer than this on cold start
# (notably rust-analyzer indexing a fresh checkout) cause the
# caller to fall through to the shell linter.
"diagnostic_timeout": 10,
# Settle window (milliseconds). Typescript-language-server
# publishes an empty diagnostics batch while the program graph
# loads, then re-publishes once the file is fully analysed.
# Re-snapshotting after this delay catches the real verdict.
"settle_ms": 400,
# Shut down a long-lived server process after this many seconds
# of inactivity. The reaper runs in-process; no daemons.
"idle_shutdown": 600,
},
},
# Honcho AI-native memory -- reads ~/.honcho/config.json as single source of truth.
# This section is only needed for hermes-specific overrides; everything else
# (apiKey, workspace, peerName, sessions, enabled) comes from the global config.

View File

@@ -0,0 +1,106 @@
"""Tests for ``ShellFileOperations._check_lint`` LSP routing.
Two regression guarantees:
1. With LSP disabled (the default), behaviour is unchanged for every
extension — ``_check_lint`` still hits the in-process linter for
.py/.json/.yaml/.toml and the shell linter table for .ts/.go/.rs.
2. With LSP enabled and the bridge returning a verdict, ``_check_lint``
surfaces that verdict directly and skips the shell linter.
Both sides matter: a regression in (1) would break every existing user
who has not opted in; a regression in (2) would silently mean the
opt-in flag does nothing.
"""
from __future__ import annotations
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from tools import file_operations
from tools.file_operations import LintResult, ShellFileOperations
from tools.environments.local import LocalEnvironment
@pytest.fixture
def ops(tmp_path: Path) -> ShellFileOperations:
env = LocalEnvironment(cwd=str(tmp_path), timeout=5)
return ShellFileOperations(env, cwd=str(tmp_path))
# ---------------------------------------------------------------------------
# Default-off path
# ---------------------------------------------------------------------------
class TestDefaultOff:
def test_python_still_uses_in_process_linter(self, ops, tmp_path: Path) -> None:
path = tmp_path / "a.py"
# Don't actually call the LSP bridge; the in-process linter should
# short-circuit before it is even consulted.
with patch("tools.lsp_lint.maybe_lint_via_lsp", return_value=None) as bridge:
result = ops._check_lint(str(path), content="x = 1\n")
assert result.success is True
# Bridge is consulted (returning None) — that's the contract — but
# the in-process linter still runs and produces the verdict.
bridge.assert_called_once()
def test_typescript_falls_through_to_shell_when_bridge_returns_none(
self, ops, tmp_path: Path
) -> None:
path = tmp_path / "x.ts"
path.write_text("export {}\n")
# Bridge says "not handling this" → the existing shell-linter path
# must run. We don't actually exec tsc; we mock _has_command to
# report it absent so the shell branch produces a deterministic
# "skipped" result we can assert on.
with patch("tools.lsp_lint.maybe_lint_via_lsp", return_value=None), \
patch.object(ops, "_has_command", return_value=False):
result = ops._check_lint(str(path), content="export {}\n")
assert result.skipped is True
assert "not available" in result.message
# ---------------------------------------------------------------------------
# LSP-on path
# ---------------------------------------------------------------------------
class TestLspRouting:
def test_lsp_clean_short_circuits_shell_linter(self, ops, tmp_path: Path) -> None:
path = tmp_path / "x.ts"
path.write_text("export {}\n")
clean = LintResult(success=True, output="")
with patch("tools.lsp_lint.maybe_lint_via_lsp", return_value=clean), \
patch.object(ops, "_has_command") as has_command:
result = ops._check_lint(str(path), content="export {}\n")
assert result is clean
# Shell linter must not even probe for `tsc` when LSP returned a
# verdict; otherwise we'd be paying for both subprocess startups.
has_command.assert_not_called()
def test_lsp_errors_short_circuit_shell_linter(self, ops, tmp_path: Path) -> None:
path = tmp_path / "x.ts"
path.write_text("export {}\n")
dirty = LintResult(success=False, output=str(path) + ":1:1: error: boom")
with patch("tools.lsp_lint.maybe_lint_via_lsp", return_value=dirty), \
patch.object(ops, "_has_command") as has_command:
result = ops._check_lint(str(path), content="export {}\n")
assert result is dirty
has_command.assert_not_called()
def test_lsp_bridge_exception_falls_through_safely(self, ops, tmp_path: Path) -> None:
# Even if the bridge module itself throws (broken import, config
# parse error, etc.) the lint hook must never propagate — the
# write_file caller would otherwise see a synthetic error for an
# otherwise-successful write.
path = tmp_path / "x.ts"
path.write_text("export {}\n")
with patch("tools.lsp_lint.maybe_lint_via_lsp", side_effect=RuntimeError("oops")), \
patch.object(ops, "_has_command", return_value=False):
result = ops._check_lint(str(path), content="export {}\n")
assert result.skipped is True

View File

@@ -0,0 +1,332 @@
"""Tests for ``tools.lsp_client``.
The bulk of this module is covered by exercising the real JSON-RPC stack
against a *fake* language server we ship in-tree as a tiny Python script.
That keeps the tests hermetic — no system typescript-language-server / gopls
required — while still proving the framing, request/response demux, and
publishDiagnostics wait loop behave end-to-end.
Pure helper functions (path → URI, project-root walk, message framing) are
covered in plain unit tests above.
"""
from __future__ import annotations
import io
import json
import os
import shutil
import subprocess
import sys
import threading
import time
from pathlib import Path
from typing import Any
import pytest
from tools import lsp_client
from tools.lsp_client import (
Diagnostic,
LspClient,
_encode_message,
_read_message,
find_project_root,
format_diagnostics,
get_or_start_client,
path_to_uri,
shutdown_all_clients,
)
# ---------------------------------------------------------------------------
# Pure helpers
# ---------------------------------------------------------------------------
class TestProjectRoot:
def test_walks_up_to_tsconfig(self, tmp_path: Path) -> None:
root = tmp_path / "proj"
(root / "src" / "components").mkdir(parents=True)
(root / "tsconfig.json").write_text("{}")
deep = root / "src" / "components" / "Button.tsx"
deep.write_text("export {}\n")
assert find_project_root(str(deep), "typescript") == str(root.resolve())
def test_subproject_marker_wins_over_parent(self, tmp_path: Path) -> None:
outer = tmp_path / "outer"
inner = outer / "packages" / "ui"
(inner / "src").mkdir(parents=True)
(outer / "package.json").write_text("{}")
(inner / "tsconfig.json").write_text("{}")
leaf = inner / "src" / "App.tsx"
leaf.write_text("export {}\n")
# tsconfig appears first in the marker tuple, so the inner package
# wins — even though package.json exists higher up.
assert find_project_root(str(leaf), "typescriptreact") == str(inner.resolve())
def test_returns_none_when_no_marker(self, tmp_path: Path) -> None:
orphan = tmp_path / "orphan.ts"
orphan.write_text("export {}\n")
assert find_project_root(str(orphan), "typescript") is None
def test_returns_none_for_unknown_language(self, tmp_path: Path) -> None:
f = tmp_path / "foo.lol"
f.write_text("")
assert find_project_root(str(f), "klingon") is None
class TestUri:
def test_round_trip_absolute_path(self, tmp_path: Path) -> None:
path = tmp_path / "x.ts"
path.write_text("")
uri = path_to_uri(str(path))
assert uri.startswith("file://")
assert uri.endswith("x.ts")
def test_relative_path_does_not_raise(self, tmp_path: Path, monkeypatch) -> None:
monkeypatch.chdir(tmp_path)
# Should not raise even before the file exists.
uri = path_to_uri("not-yet-created.ts")
assert uri.startswith("file://")
class TestFraming:
def test_encode_then_decode_roundtrip(self) -> None:
payload = {"jsonrpc": "2.0", "id": 7, "method": "x", "params": {"a": 1}}
wire = _encode_message(payload)
assert wire.startswith(b"Content-Length: ")
# Read it back through _read_message using a BytesIO.
decoded = _read_message(io.BytesIO(wire))
assert decoded == payload
def test_eof_returns_none(self) -> None:
assert _read_message(io.BytesIO(b"")) is None
def test_garbage_header_raises(self) -> None:
# Header without Content-Length (8KB cap doesn't kick in here, but
# parser should still reject after seeing the terminator).
bad = b"X-Garbage: yes\r\n\r\n"
with pytest.raises(RuntimeError):
_read_message(io.BytesIO(bad))
class TestDiagnostic:
def test_from_lsp_converts_zero_based_to_one_based(self) -> None:
raw: dict[str, Any] = {
"range": {"start": {"line": 9, "character": 4}, "end": {"line": 9, "character": 8}},
"severity": 1,
"message": "Cannot find name 'foo'.",
"source": "ts",
"code": 2304,
}
d = Diagnostic.from_lsp(raw)
assert d.line == 10
assert d.column == 5
assert d.severity == 1
assert d.code == "2304"
def test_format_includes_path_and_source(self) -> None:
d = Diagnostic(line=3, column=2, severity=2, message="unused", source="eslint", code="no-unused")
text = d.format("/tmp/x.ts")
assert text.startswith("/tmp/x.ts:3:2:")
assert "warning" in text
assert "unused" in text
assert "eslint" in text
def test_format_diagnostics_joins_with_newlines(self) -> None:
out = format_diagnostics(
"/tmp/x.ts",
[
Diagnostic(line=1, column=1, message="a"),
Diagnostic(line=2, column=1, message="b"),
],
)
assert out.split("\n") == [
"/tmp/x.ts:1:1: error: a",
"/tmp/x.ts:2:1: error: b",
]
# ---------------------------------------------------------------------------
# Fake LSP server (a short Python script we exec via subprocess)
# ---------------------------------------------------------------------------
_FAKE_SERVER_TEMPLATE = r"""
import json
import sys
import struct
# Modes:
# "ok" -> publish empty diagnostics for any didOpen URI.
# "errors" -> publish two diagnostics for any didOpen URI.
# "settle" -> publish empty diagnostics first, then publish two
# diagnostics 50ms later. Exercises the settle window.
MODE = {mode!r}
def _read():
header = b''
while b'\r\n\r\n' not in header:
chunk = sys.stdin.buffer.read(1)
if not chunk:
return None
header += chunk
length = int(header.split(b'Content-Length:')[1].split(b'\r\n')[0].strip())
body = b''
while len(body) < length:
chunk = sys.stdin.buffer.read(length - len(body))
if not chunk:
return None
body += chunk
return json.loads(body.decode('utf-8'))
def _write(payload):
body = json.dumps(payload).encode('utf-8')
sys.stdout.buffer.write(b'Content-Length: ' + str(len(body)).encode() + b'\r\n\r\n' + body)
sys.stdout.buffer.flush()
def _diags(uri):
return [
{{"range": {{"start": {{"line": 0, "character": 0}}, "end": {{"line": 0, "character": 1}}}},
"severity": 1, "message": "boom", "source": "fake", "code": "E1"}},
{{"range": {{"start": {{"line": 1, "character": 4}}, "end": {{"line": 1, "character": 6}}}},
"severity": 2, "message": "warn", "source": "fake"}},
]
while True:
msg = _read()
if msg is None:
break
if 'id' in msg and msg.get('method') == 'initialize':
_write({{"jsonrpc": "2.0", "id": msg['id'], "result": {{"capabilities": {{}}}}}})
elif 'id' in msg and msg.get('method') == 'shutdown':
_write({{"jsonrpc": "2.0", "id": msg['id'], "result": None}})
elif msg.get('method') == 'exit':
break
elif msg.get('method') == 'textDocument/didOpen':
uri = msg['params']['textDocument']['uri']
if MODE == 'ok':
_write({{"jsonrpc": "2.0", "method": "textDocument/publishDiagnostics",
"params": {{"uri": uri, "diagnostics": []}}}})
elif MODE == 'errors':
_write({{"jsonrpc": "2.0", "method": "textDocument/publishDiagnostics",
"params": {{"uri": uri, "diagnostics": _diags(uri)}}}})
elif MODE == 'settle':
_write({{"jsonrpc": "2.0", "method": "textDocument/publishDiagnostics",
"params": {{"uri": uri, "diagnostics": []}}}})
import time as _t; _t.sleep(0.05)
_write({{"jsonrpc": "2.0", "method": "textDocument/publishDiagnostics",
"params": {{"uri": uri, "diagnostics": _diags(uri)}}}})
"""
@pytest.fixture
def fake_server_factory(tmp_path: Path):
"""Return a callable that materialises a fake-LSP launch command.
We materialise per-mode so tests can choose between the empty and
error-emitting servers without juggling globals.
"""
def _make(mode: str) -> list[str]:
script = tmp_path / f"fake_lsp_{mode}.py"
script.write_text(_FAKE_SERVER_TEMPLATE.format(mode=mode))
# Use the same interpreter that's running pytest so the fake
# server inherits the same Python ABI on every CI image.
return [sys.executable, str(script)]
return _make
@pytest.fixture(autouse=True)
def _shutdown_registry_between_tests():
"""Make sure the module-global client cache cannot leak servers
across tests — every cached subprocess is real and would survive."""
yield
shutdown_all_clients()
class TestClientHandshakeAndDiagnostics:
def test_clean_file_returns_no_diagnostics(self, fake_server_factory, tmp_path: Path) -> None:
cmd = fake_server_factory("ok")
client = LspClient(cmd, str(tmp_path), "typescript")
client.start()
try:
diags = client.diagnostics(str(tmp_path / "a.ts"), "export {}\n", timeout=3.0, settle_ms=0)
finally:
client.shutdown()
assert diags == []
def test_dirty_file_surfaces_error_and_warning(self, fake_server_factory, tmp_path: Path) -> None:
cmd = fake_server_factory("errors")
client = LspClient(cmd, str(tmp_path), "typescript")
client.start()
try:
diags = client.diagnostics(str(tmp_path / "a.ts"), "x;\n", timeout=3.0, settle_ms=0)
finally:
client.shutdown()
# Two diagnostics with severities 1 (error) and 2 (warning).
assert sorted(d.severity for d in diags) == [1, 2]
assert any("boom" in d.message for d in diags)
def test_settle_window_picks_up_late_diagnostics(self, fake_server_factory, tmp_path: Path) -> None:
cmd = fake_server_factory("settle")
client = LspClient(cmd, str(tmp_path), "typescript")
client.start()
try:
# Without a settle window we'd only see the empty first batch;
# the 200ms window must be long enough to capture the second
# publishDiagnostics that lands ~50ms after didOpen.
diags = client.diagnostics(str(tmp_path / "a.ts"), "x;\n", timeout=3.0, settle_ms=200)
finally:
client.shutdown()
assert len(diags) == 2
def test_request_timeout_does_not_corrupt_state(self, tmp_path: Path) -> None:
# A "server" that reads but never writes anything back. Initialize
# must time out cleanly without deadlocking the writer lock.
script = tmp_path / "silent_lsp.py"
script.write_text("import sys\nwhile sys.stdin.buffer.read(1):\n pass\n")
client = LspClient([sys.executable, str(script)], str(tmp_path), "typescript", startup_timeout=0.5)
with pytest.raises(TimeoutError):
client.start()
class TestRegistry:
def test_missing_binary_returns_none(self, tmp_path: Path) -> None:
result = get_or_start_client(
"typescript",
str(tmp_path),
["definitely-not-a-real-binary-xyz123"],
)
assert result is None
def test_caches_per_root(self, fake_server_factory, tmp_path: Path) -> None:
cmd = fake_server_factory("ok")
a = get_or_start_client("typescript", str(tmp_path), cmd)
b = get_or_start_client("typescript", str(tmp_path), cmd)
assert a is not None and b is not None
assert a is b
def test_separate_roots_get_separate_clients(self, fake_server_factory, tmp_path: Path) -> None:
cmd = fake_server_factory("ok")
root_a = tmp_path / "a"; root_a.mkdir()
root_b = tmp_path / "b"; root_b.mkdir()
a = get_or_start_client("typescript", str(root_a), cmd)
b = get_or_start_client("typescript", str(root_b), cmd)
assert a is not None and b is not None
assert a is not b
class TestShutdown:
def test_shutdown_terminates_server(self, fake_server_factory, tmp_path: Path) -> None:
cmd = fake_server_factory("ok")
client = LspClient(cmd, str(tmp_path), "typescript")
client.start()
proc = client._proc
assert proc is not None
client.shutdown()
# The server must exit promptly; otherwise we'd be leaking processes
# in long-lived agent sessions every time idle_shutdown reaps a client.
assert proc.wait(timeout=3.0) is not None

View File

@@ -0,0 +1,345 @@
"""Tests for ``tools.lsp_lint`` — the bridge between ``_check_lint`` and the
LSP client.
We do not spin up a real language server here. Instead we monkeypatch
``get_or_start_client`` with a fake that returns whatever diagnostics the
test wants. This keeps the bridge logic — feature flag, language map,
project-root gating, error containment, observability — under tight
unit-test control while leaving the real LSP wire-protocol coverage to
test_lsp_client.py.
"""
from __future__ import annotations
import logging
from pathlib import Path
from unittest.mock import patch
import pytest
from tools import lsp_lint
from tools.lsp_client import Diagnostic
from tools.environments.local import LocalEnvironment
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
class _FakeClient:
def __init__(self, diags: list[Diagnostic] | Exception | None = None) -> None:
self._diags = diags
def diagnostics(self, path: str, content: str, *, timeout: float, settle_ms: int) -> list[Diagnostic]:
if isinstance(self._diags, Exception):
raise self._diags
return list(self._diags or [])
@pytest.fixture
def ts_project(tmp_path: Path) -> tuple[Path, Path]:
"""A tsconfig-rooted project with one source file."""
(tmp_path / "tsconfig.json").write_text("{}")
src = tmp_path / "src" / "app.ts"
src.parent.mkdir(parents=True)
src.write_text("export {}\n")
return tmp_path, src
@pytest.fixture
def local_env(tmp_path: Path) -> LocalEnvironment:
return LocalEnvironment(cwd=str(tmp_path), timeout=5)
# ---------------------------------------------------------------------------
# Feature-flag gating
# ---------------------------------------------------------------------------
class TestFeatureFlag:
def test_returns_none_when_disabled(self, ts_project, local_env) -> None:
_, src = ts_project
with patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": False}):
result = lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
assert result is None
def test_returns_none_when_config_missing(self, ts_project, local_env) -> None:
_, src = ts_project
with patch.object(lsp_lint, "_load_lsp_config", return_value={}):
result = lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
assert result is None
def test_returns_none_when_content_missing(self, ts_project, local_env) -> None:
_, src = ts_project
with patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}):
result = lsp_lint.maybe_lint_via_lsp(str(src), None, local_env)
# We never lint without content — there is no point sending an empty
# didOpen text payload.
assert result is None
# ---------------------------------------------------------------------------
# Backend gating
# ---------------------------------------------------------------------------
class TestBackendGating:
def test_returns_none_for_non_local_env(self, ts_project) -> None:
_, src = ts_project
class _FakeRemoteEnv:
cwd = "/remote/workspace"
with patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}):
result = lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", _FakeRemoteEnv())
# Container/SSH/Modal backends are deliberately excluded in this PR.
assert result is None
# ---------------------------------------------------------------------------
# Project-root gating
# ---------------------------------------------------------------------------
class TestProjectRootGating:
def test_orphan_file_returns_none(self, tmp_path: Path, local_env) -> None:
# No tsconfig anywhere on the way up — the bridge must abort
# rather than reproduce the phantom-error problem.
orphan_dir = tmp_path / "orphan"
orphan_dir.mkdir()
orphan = orphan_dir / "loose.ts"
orphan.write_text("export {}\n")
with patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}):
result = lsp_lint.maybe_lint_via_lsp(str(orphan), "export {}\n", local_env)
assert result is None
# ---------------------------------------------------------------------------
# Language map
# ---------------------------------------------------------------------------
class TestLanguageMap:
def test_unsupported_extension_returns_none(self, tmp_path: Path, local_env) -> None:
path = tmp_path / "file.txt"
path.write_text("hello")
with patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}):
result = lsp_lint.maybe_lint_via_lsp(str(path), "hello", local_env)
assert result is None
# ---------------------------------------------------------------------------
# Happy path + error containment
# ---------------------------------------------------------------------------
class TestRouting:
def test_clean_run_returns_success_lint_result(self, ts_project, local_env) -> None:
_, src = ts_project
with patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}), \
patch.object(lsp_lint, "get_or_start_client", return_value=_FakeClient([])):
result = lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
assert result is not None
assert result.success is True
assert result.output == ""
def test_errors_become_failure_with_formatted_output(self, ts_project, local_env) -> None:
_, src = ts_project
diags = [
Diagnostic(line=12, column=4, severity=1, message="Cannot find name 'foo'", source="ts", code="2304"),
Diagnostic(line=15, column=1, severity=2, message="Unused import", source="ts"),
]
with patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}), \
patch.object(lsp_lint, "get_or_start_client", return_value=_FakeClient(diags)):
result = lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
assert result is not None
assert result.success is False
assert ":12:4: error: Cannot find name 'foo'" in result.output
assert ":15:1: warning: Unused import" in result.output
def test_hint_severity_is_filtered_out(self, ts_project, local_env) -> None:
_, src = ts_project
diags = [
Diagnostic(line=1, column=1, severity=4, message="hint", source="ts"),
Diagnostic(line=2, column=1, severity=3, message="info", source="ts"),
]
with patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}), \
patch.object(lsp_lint, "get_or_start_client", return_value=_FakeClient(diags)):
result = lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
# No errors or warnings → treated as clean. Otherwise we'd be
# changing the agent's verdict mid-edit when shell linters never
# surfaced these severities.
assert result is not None
assert result.success is True
assert result.output == ""
def test_client_unavailable_returns_none(self, ts_project, local_env) -> None:
_, src = ts_project
with patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}), \
patch.object(lsp_lint, "get_or_start_client", return_value=None):
result = lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
# Server binary missing or failed to spawn: caller must fall through
# to the legacy shell linter rather than report a phantom-clean.
assert result is None
def test_diagnostics_timeout_returns_none(self, ts_project, local_env) -> None:
_, src = ts_project
with patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}), \
patch.object(lsp_lint, "get_or_start_client",
return_value=_FakeClient(TimeoutError("slow"))):
result = lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
assert result is None
def test_unexpected_exception_returns_none(self, ts_project, local_env) -> None:
_, src = ts_project
with patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}), \
patch.object(lsp_lint, "get_or_start_client",
return_value=_FakeClient(RuntimeError("server crash"))):
result = lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
# The lint hook is on the hot edit path — bridge must never raise.
assert result is None
# ---------------------------------------------------------------------------
# Observability — log levels and once-per-X dedup
# ---------------------------------------------------------------------------
#
# The default agent.log threshold is INFO. These tests pin down that
# *steady state* (1000 clean writes, 1000 feature-off writes, etc.) emits
# zero records visible to the agent.log handler, and that *novel* events
# get exactly one WARNING/INFO line that survives.
@pytest.fixture(autouse=True)
def _reset_dedup_caches():
"""Each test sees a fresh dedup state so prior runs cannot mask
a missing announcement (or hide a duplicate one)."""
lsp_lint._reset_announce_caches()
yield
lsp_lint._reset_announce_caches()
def _records_at(caplog, level: int) -> list[logging.LogRecord]:
return [r for r in caplog.records if r.levelno >= level]
class TestLogLevelsSteadyState:
"""Anything that fires every write must stay at DEBUG so 1000 writes
cannot flood agent.log at the default INFO threshold."""
def test_feature_off_is_debug(self, ts_project, local_env, caplog) -> None:
_, src = ts_project
with caplog.at_level(logging.DEBUG, logger="hermes.lint.lsp"), \
patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": False}):
for _ in range(5):
lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
assert _records_at(caplog, logging.INFO) == []
def test_unmapped_extension_is_debug(self, tmp_path: Path, local_env, caplog) -> None:
path = tmp_path / "x.txt"
path.write_text("hi")
with caplog.at_level(logging.DEBUG, logger="hermes.lint.lsp"), \
patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}):
for _ in range(5):
lsp_lint.maybe_lint_via_lsp(str(path), "hi", local_env)
assert _records_at(caplog, logging.INFO) == []
def test_non_local_backend_is_debug(self, ts_project, caplog) -> None:
_, src = ts_project
class _FakeRemote:
cwd = "/remote"
with caplog.at_level(logging.DEBUG, logger="hermes.lint.lsp"), \
patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}):
for _ in range(5):
lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", _FakeRemote())
assert _records_at(caplog, logging.INFO) == []
def test_clean_write_is_debug(self, ts_project, local_env, caplog) -> None:
_, src = ts_project
with caplog.at_level(logging.DEBUG, logger="hermes.lint.lsp"), \
patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}), \
patch.object(lsp_lint, "get_or_start_client", return_value=_FakeClient([])):
# First write produces ONE INFO ("active for ...") for the
# state transition. Subsequent clean writes must be DEBUG so
# 1000 clean writes don't carpet agent.log.
for _ in range(5):
lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
infos = [r for r in caplog.records if r.levelno == logging.INFO]
assert len(infos) == 1, f"expected exactly one 'active' INFO, got {[r.getMessage() for r in infos]}"
assert "active for" in infos[0].getMessage()
class TestLogLevelsNovelEvents:
"""Things that change state or require action must escape DEBUG."""
def test_diagnostics_are_info_per_call(self, ts_project, local_env, caplog) -> None:
_, src = ts_project
diags = [Diagnostic(line=1, column=1, severity=1, message="boom")]
with caplog.at_level(logging.INFO, logger="hermes.lint.lsp"), \
patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}), \
patch.object(lsp_lint, "get_or_start_client", return_value=_FakeClient(diags)):
for _ in range(3):
lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
# Anchor on the diagnostic message shape — temp paths can contain
# "diag" as a substring (e.g. test_diagnostics_are_info_per_0/), so
# a bare ``"diag" in msg`` filter would over-match the active line.
diag_lines = [r.getMessage() for r in caplog.records if "] 1 diag (" in r.getMessage()]
assert len(diag_lines) == 3
def test_active_announcement_fires_once_per_root(self, ts_project, local_env, caplog) -> None:
_, src = ts_project
with caplog.at_level(logging.INFO, logger="hermes.lint.lsp"), \
patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}), \
patch.object(lsp_lint, "get_or_start_client", return_value=_FakeClient([])):
for _ in range(10):
lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
actives = [r for r in caplog.records if "active for" in r.getMessage()]
assert len(actives) == 1
class TestLogLevelsActionRequired:
"""First occurrence WARNING, subsequent same-event DEBUG."""
def test_server_unavailable_warns_once(self, ts_project, local_env, caplog) -> None:
_, src = ts_project
with caplog.at_level(logging.DEBUG, logger="hermes.lint.lsp"), \
patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}), \
patch.object(lsp_lint, "get_or_start_client", return_value=None):
for _ in range(5):
lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
warnings = [r for r in caplog.records if r.levelno == logging.WARNING]
# Exactly one WARNING — the other 4 calls demoted to DEBUG so
# errors.log (WARNING+) doesn't get the same line 1000 times.
assert len(warnings) == 1
assert "server unavailable" in warnings[0].getMessage()
def test_no_project_root_info_once_per_path(self, tmp_path: Path, local_env, caplog) -> None:
# Two distinct orphan files → two INFO lines. Same orphan twice → one.
orphan_a = tmp_path / "a.ts"
orphan_a.write_text("")
orphan_b = tmp_path / "b.ts"
orphan_b.write_text("")
with caplog.at_level(logging.DEBUG, logger="hermes.lint.lsp"), \
patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}):
lsp_lint.maybe_lint_via_lsp(str(orphan_a), "", local_env)
lsp_lint.maybe_lint_via_lsp(str(orphan_a), "", local_env)
lsp_lint.maybe_lint_via_lsp(str(orphan_b), "", local_env)
lsp_lint.maybe_lint_via_lsp(str(orphan_b), "", local_env)
infos = [r for r in caplog.records if r.levelno == logging.INFO]
assert len(infos) == 2, [r.getMessage() for r in infos]
def test_timeout_warns_every_time(self, ts_project, local_env, caplog) -> None:
# Timeouts are inherently novel events (a hang now isn't the same
# signal as a hang yesterday), so every one must escape DEBUG.
_, src = ts_project
with caplog.at_level(logging.WARNING, logger="hermes.lint.lsp"), \
patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}), \
patch.object(lsp_lint, "get_or_start_client",
return_value=_FakeClient(TimeoutError("slow"))):
for _ in range(3):
lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
timeouts = [r for r in caplog.records if "timeout" in r.getMessage()]
assert len(timeouts) == 3

View File

@@ -1033,11 +1033,19 @@ class ShellFileOperations(FileOperations):
"""
Run syntax check on a file after editing.
Prefers the in-process linter for structured formats (JSON, YAML,
TOML) when possible — those parse via the Python stdlib in
microseconds and don't require a subprocess. Falls back to the
shell linter table for compiled/type-checked languages
(py_compile, node --check, tsc, go vet, rustfmt).
Resolution order:
1. LSP-backed lint via ``tools.lsp_lint.maybe_lint_via_lsp`` —
only fires when ``lint.lsp.enabled`` is true, the backend is
local, and a project root marker (tsconfig.json, Cargo.toml,
go.mod, …) is present. Returns None when any of those
prerequisites fail, so the cheaper paths below run untouched
for everyone who hasn't opted in.
2. In-process linter for structured formats (JSON/YAML/TOML)
plus Python ``ast.parse`` — microseconds per call, no
subprocess.
3. Shell linter table (py_compile, node --check, tsc, go vet,
rustfmt) for compiled/type-checked languages without an
in-process option.
Args:
path: File path (used to select the linter + for shell invocation).
@@ -1051,7 +1059,20 @@ class ShellFileOperations(FileOperations):
"""
ext = os.path.splitext(path)[1].lower()
# Prefer in-process linter when available.
# 1) LSP — opt-in, local-only today. Lazily imported so the rest of
# the file_operations surface keeps working in environments where
# the LSP module's transitive imports (config loader, etc.) are
# unavailable, e.g. minimal sandboxes used by some unit tests.
if content is not None:
try:
from tools.lsp_lint import maybe_lint_via_lsp
lsp_result = maybe_lint_via_lsp(path, content, self.env)
except Exception:
lsp_result = None
if lsp_result is not None:
return lsp_result
# 2) In-process linter when available.
inproc = LINTERS_INPROC.get(ext)
if inproc is not None:
# Need content — either passed in or read from disk.
@@ -1066,7 +1087,9 @@ class ShellFileOperations(FileOperations):
return LintResult(skipped=True, message=f"No linter available for {ext} (missing dependency)")
return LintResult(success=ok, output="" if ok else err)
# Fall back to shell linter.
# 3) Shell linter table — last-resort fallback for compiled / type-
# checked languages without an in-process option (and the path the
# disabled LSP feature deliberately falls through to).
if ext not in LINTERS:
return LintResult(skipped=True, message=f"No linter for {ext} files")

662
tools/lsp_client.py Normal file
View File

@@ -0,0 +1,662 @@
"""Stdio Language Server Protocol client for post-edit linting.
Goal
----
Replace ``npx tsc --noEmit <single-file>`` (and the ``go vet`` / ``rustfmt
--check`` siblings in ``tools/file_operations.LINTERS``) with project-aware
diagnostics from a real language server, so the agent stops chasing phantom
"Cannot find module" errors that vanish the moment ``tsconfig.json`` is in
scope.
Why a hand-rolled client (vs. an existing OSS library)
-------------------------------------------------------
Three options were evaluated before writing this:
* **microsoft/multilspy** (v0.0.15, "pre-alpha" per its own README) is the
closest fit on paper, but it is async-first, drags in a pinned
``jedi-language-server`` server as a hard dependency just to wire up
Python, and auto-downloads server binaries to ``~/.multilspy/`` —
which collides with HERMES_HOME profile isolation and the user's own
toolchain. Its public API is built around navigation
(``request_definition`` / ``request_references`` / ``request_hover``)
rather than the diagnostic flow we need, so we'd be working against
the grain of the documented surface.
* **sansio-lsp-client** (~3K monthly downloads, single primary
maintainer) is sans-I/O — it gives us ~50 lines of framing in exchange
for a new dependency, but every other moving part below (subprocess
pump, reader thread, request demux, per-``(env, root)`` cache, idle
reaper, error containment for the lint hot path) would still be ours.
* **pygls** is server-side only; client support is "Coming Soon" in the
v2 docs and isn't usable yet.
So the cost/benefit of a library is "import a pre-alpha or thin
dependency, still write 80% of this module, and inherit binary-download
magic we don't want." Better to own the ~500 lines, document the
contract, and pull in ``lsprotocol`` for typed messages later if/when we
need pull diagnostics or progress tokens.
What this module owns
---------------------
* Stdio JSON-RPC transport with Content-Length framing.
* Per ``(language, project_root, server_cmd)`` cached client, kept alive
across edits so the agent only pays cold-start once per project.
* ``diagnostics(path, content)`` returns one shot of ``Diagnostic`` items
via ``textDocument/didOpen`` + ``publishDiagnostics`` (push) with a
short settle window. Pull diagnostics (LSP 3.17) is not used here — the
push flow works on every server we care about, including
``typescript-language-server`` and ``rust-analyzer``.
* Idle reaper thread shuts down servers after a configurable quiet
period so long-lived sessions do not accumulate processes.
Out of scope (deliberately deferred)
-------------------------------------
* Container / SSH / Modal / Daytona backends — the server has to live
where the files live, which means baking it into the sandbox image or
installing on first use. That is the gnarly half and gets its own PR.
* didChange / live editing — every Hermes call here re-opens the file
with the freshly-written contents, so we never have an in-flight buffer
to keep in sync.
* workspace/configuration, pull diagnostics, file watching — none of
those are needed for the "one diagnostic snapshot per write" use case
we are solving.
"""
from __future__ import annotations
import json
import logging
import os
import shutil
import subprocess
import threading
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Iterable, Optional, Sequence
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Project-root discovery
# ---------------------------------------------------------------------------
# Filenames whose presence in a directory marks it as a project root for the
# given language id. Order matters within each tuple: the first marker found
# walking upward wins, so users can opt into a sub-project root by dropping a
# more-specific marker (``tsconfig.json``) inside a parent that also has a
# generic one (``package.json``).
PROJECT_ROOT_MARKERS: dict[str, tuple[str, ...]] = {
"typescript": ("tsconfig.json", "jsconfig.json", "package.json"),
"typescriptreact": ("tsconfig.json", "jsconfig.json", "package.json"),
"javascript": ("jsconfig.json", "package.json"),
"javascriptreact": ("jsconfig.json", "package.json"),
"rust": ("Cargo.toml",),
"go": ("go.mod",),
}
def find_project_root(file_path: str, language_id: str) -> Optional[str]:
"""Walk upward from *file_path* looking for the first project marker.
Returns the absolute directory path or ``None`` if no marker is found
before hitting the filesystem root. ``None`` is the signal to skip
LSP and fall back to the shell linter — an orphan file with no
project context will produce the same phantom errors LSP was meant to
eliminate.
"""
markers = PROJECT_ROOT_MARKERS.get(language_id)
if not markers:
return None
try:
start = Path(file_path).expanduser().resolve()
except (OSError, RuntimeError):
return None
cursor = start.parent if start.is_file() or not start.exists() else start
visited = 0
while True:
for marker in markers:
if (cursor / marker).exists():
return str(cursor)
parent = cursor.parent
if parent == cursor:
return None
cursor = parent
visited += 1
# Defensive cap: the deepest reasonable monorepo is well under this.
if visited > 64:
return None
# ---------------------------------------------------------------------------
# LSP framing
# ---------------------------------------------------------------------------
_HEADER_TERMINATOR = b"\r\n\r\n"
def _encode_message(payload: dict[str, Any]) -> bytes:
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
header = f"Content-Length: {len(body)}\r\n\r\n".encode("ascii")
return header + body
def _read_message(stream) -> Optional[dict[str, Any]]:
"""Read a single LSP message from *stream*. Returns ``None`` at EOF."""
header = b""
while _HEADER_TERMINATOR not in header:
chunk = stream.read(1)
if not chunk:
return None
header += chunk
# Pathological server: header floods without terminator. Cap so we
# do not spin forever consuming bytes.
if len(header) > 8192:
raise RuntimeError("LSP header exceeded 8 KiB without terminator")
header_text, _, _ = header.partition(_HEADER_TERMINATOR)
content_length: Optional[int] = None
for line in header_text.decode("ascii", errors="replace").splitlines():
name, _, value = line.partition(":")
if name.strip().lower() == "content-length":
try:
content_length = int(value.strip())
except ValueError:
content_length = None
if content_length is None or content_length < 0:
raise RuntimeError(f"LSP header missing Content-Length: {header_text!r}")
body = b""
while len(body) < content_length:
chunk = stream.read(content_length - len(body))
if not chunk:
return None
body += chunk
try:
return json.loads(body.decode("utf-8"))
except json.JSONDecodeError as exc:
raise RuntimeError(f"LSP body was not valid JSON: {exc}") from exc
# ---------------------------------------------------------------------------
# Diagnostic shape (subset of the LSP type)
# ---------------------------------------------------------------------------
@dataclass
class Diagnostic:
"""Subset of the LSP ``Diagnostic`` we surface to the lint layer."""
line: int # 1-based; LSP is 0-based and we convert at parse time.
column: int # 1-based for the same reason.
severity: int = 1 # 1=error, 2=warning, 3=info, 4=hint
message: str = ""
source: str = ""
code: str = ""
@classmethod
def from_lsp(cls, raw: dict[str, Any]) -> "Diagnostic":
rng = raw.get("range") or {}
start = rng.get("start") or {}
line = int(start.get("line", 0)) + 1
col = int(start.get("character", 0)) + 1
code = raw.get("code", "")
return cls(
line=line,
column=col,
severity=int(raw.get("severity", 1)),
message=str(raw.get("message", "")).strip(),
source=str(raw.get("source", "")),
code=str(code) if code != "" else "",
)
def format(self, path: str) -> str:
sev = {1: "error", 2: "warning", 3: "info", 4: "hint"}.get(self.severity, "error")
prefix = f"{path}:{self.line}:{self.column}"
tail = f" [{self.source}{':' + self.code if self.code else ''}]" if self.source else ""
return f"{prefix}: {sev}: {self.message}{tail}"
def format_diagnostics(path: str, diags: Iterable[Diagnostic]) -> str:
"""Render a diagnostic list to the same `path:line:col: msg` shape that
the existing shell linters produce, so ``_check_lint_delta`` can keep
diffing pre/post by line equality."""
return "\n".join(d.format(path) for d in diags)
# ---------------------------------------------------------------------------
# Pending-request bookkeeping
# ---------------------------------------------------------------------------
@dataclass
class _Pending:
event: threading.Event = field(default_factory=threading.Event)
result: Any = None
error: Any = None
# ---------------------------------------------------------------------------
# Client
# ---------------------------------------------------------------------------
class LspClient:
"""Long-lived stdio JSON-RPC client for one (server, project root)."""
def __init__(
self,
server_cmd: Sequence[str],
project_root: str,
language_id: str,
*,
startup_timeout: float = 15.0,
) -> None:
self.server_cmd = list(server_cmd)
self.project_root = project_root
self.language_id = language_id
self.startup_timeout = startup_timeout
self._proc: Optional[subprocess.Popen] = None
self._writer_lock = threading.Lock()
self._reader_thread: Optional[threading.Thread] = None
self._next_id = 1
self._next_id_lock = threading.Lock()
self._pending: dict[int, _Pending] = {}
self._pending_lock = threading.Lock()
self._diagnostics: dict[str, list[Diagnostic]] = {}
self._diag_events: dict[str, threading.Event] = {}
self._diag_lock = threading.Lock()
self._closed = False
self._last_used = time.monotonic()
# -- lifecycle ----------------------------------------------------------
def start(self) -> None:
if self._proc is not None:
return
try:
self._proc = subprocess.Popen(
self.server_cmd,
cwd=self.project_root,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
# Capture stderr separately so server-side noise (rust-analyzer
# progress, ts-server compile warnings) never corrupts the
# JSON-RPC stream on stdout.
stderr=subprocess.DEVNULL,
bufsize=0,
)
except (FileNotFoundError, PermissionError, OSError) as exc:
raise RuntimeError(f"failed to spawn LSP server {self.server_cmd[0]!r}: {exc}") from exc
self._reader_thread = threading.Thread(
target=self._reader_loop,
name=f"lsp-reader[{self.language_id}]",
daemon=True,
)
self._reader_thread.start()
try:
self._initialize()
except Exception:
self.shutdown()
raise
def shutdown(self) -> None:
if self._closed:
return
self._closed = True
# Best-effort graceful shutdown: send `shutdown` + `exit`. If the
# server is unresponsive we just kill the process — the reader
# thread is daemonized and will die with stdout EOF.
proc = self._proc
if proc is not None and proc.poll() is None:
try:
self._request("shutdown", None, timeout=2.0)
except Exception:
pass
try:
self._notify("exit", None)
except Exception:
pass
try:
proc.wait(timeout=2.0)
except subprocess.TimeoutExpired:
try:
proc.kill()
except Exception:
pass
self._proc = None
# -- JSON-RPC ----------------------------------------------------------
def _allocate_id(self) -> int:
with self._next_id_lock:
value = self._next_id
self._next_id += 1
return value
def _send(self, payload: dict[str, Any]) -> None:
proc = self._proc
if proc is None or proc.stdin is None:
raise RuntimeError("LSP client not started")
data = _encode_message(payload)
with self._writer_lock:
try:
proc.stdin.write(data)
proc.stdin.flush()
except (BrokenPipeError, OSError) as exc:
raise RuntimeError(f"LSP server stdin closed: {exc}") from exc
def _notify(self, method: str, params: Any) -> None:
self._send({"jsonrpc": "2.0", "method": method, "params": params})
def _request(self, method: str, params: Any, *, timeout: float) -> Any:
request_id = self._allocate_id()
pending = _Pending()
with self._pending_lock:
self._pending[request_id] = pending
try:
self._send({
"jsonrpc": "2.0",
"id": request_id,
"method": method,
"params": params,
})
if not pending.event.wait(timeout):
raise TimeoutError(f"LSP {method!r} timed out after {timeout:.1f}s")
if pending.error is not None:
raise RuntimeError(f"LSP {method!r} error: {pending.error}")
return pending.result
finally:
with self._pending_lock:
self._pending.pop(request_id, None)
# -- reader -------------------------------------------------------------
def _reader_loop(self) -> None:
proc = self._proc
if proc is None or proc.stdout is None:
return
try:
while not self._closed:
try:
msg = _read_message(proc.stdout)
except RuntimeError as exc:
logger.debug("LSP reader bailing on %s: %s", self.server_cmd[0], exc)
break
if msg is None:
break
self._handle_message(msg)
finally:
# Wake up every waiter so callers do not hang forever when the
# server dies mid-request.
with self._pending_lock:
for pending in self._pending.values():
if pending.error is None and pending.result is None:
pending.error = "server stream closed"
pending.event.set()
with self._diag_lock:
for event in self._diag_events.values():
event.set()
def _handle_message(self, msg: dict[str, Any]) -> None:
if "id" in msg and ("result" in msg or "error" in msg):
try:
request_id = int(msg["id"])
except (TypeError, ValueError):
return
with self._pending_lock:
pending = self._pending.get(request_id)
if pending is None:
return
pending.result = msg.get("result")
pending.error = msg.get("error")
pending.event.set()
return
method = msg.get("method")
if method == "textDocument/publishDiagnostics":
params = msg.get("params") or {}
uri = params.get("uri")
raw_list = params.get("diagnostics") or []
if not isinstance(uri, str):
return
parsed = [Diagnostic.from_lsp(d) for d in raw_list if isinstance(d, dict)]
with self._diag_lock:
self._diagnostics[uri] = parsed
event = self._diag_events.get(uri)
if event is not None:
event.set()
return
# Server-issued requests (e.g. workspace/configuration). Reply with
# a generic null so the server stops waiting. We do not implement
# any server features beyond what's needed to coax diagnostics out.
if "id" in msg and "method" in msg:
self._send({"jsonrpc": "2.0", "id": msg["id"], "result": None})
# -- handshake ----------------------------------------------------------
def _initialize(self) -> None:
root_uri = path_to_uri(self.project_root)
params = {
"processId": os.getpid(),
"clientInfo": {"name": "hermes-agent", "version": "1"},
"rootUri": root_uri,
"rootPath": self.project_root,
"workspaceFolders": [{"uri": root_uri, "name": Path(self.project_root).name}],
"capabilities": {
"textDocument": {
"synchronization": {"didSave": True, "willSave": False, "dynamicRegistration": False},
"publishDiagnostics": {"relatedInformation": False, "versionSupport": False},
},
"workspace": {
"workspaceFolders": True,
"configuration": True,
},
},
"initializationOptions": {},
}
self._request("initialize", params, timeout=self.startup_timeout)
self._notify("initialized", {})
# -- public API ---------------------------------------------------------
def diagnostics(
self,
file_path: str,
content: str,
*,
timeout: float = 10.0,
settle_ms: int = 400,
) -> list[Diagnostic]:
"""Open *file_path* with *content* and collect one diagnostic snapshot.
The flow:
1. Register a fresh wait-event for the URI.
2. ``didOpen`` with the new content (LSP version 1).
3. Block until the first ``publishDiagnostics`` for the URI or
*timeout* seconds elapse.
4. Sleep *settle_ms* milliseconds and re-snapshot, since some
servers (notably ``typescript-language-server``) emit a
"no diagnostics yet" notification while the program graph is
still loading and only the second batch carries real errors.
5. ``didClose`` so the server frees the buffer.
"""
if self._closed or self._proc is None:
raise RuntimeError("LSP client is not running")
uri = path_to_uri(file_path)
event = threading.Event()
with self._diag_lock:
self._diagnostics.pop(uri, None)
self._diag_events[uri] = event
try:
self._notify("textDocument/didOpen", {
"textDocument": {
"uri": uri,
"languageId": self.language_id,
"version": 1,
"text": content,
},
})
self._last_used = time.monotonic()
event.wait(timeout)
if settle_ms > 0:
time.sleep(settle_ms / 1000.0)
with self._diag_lock:
diags = list(self._diagnostics.get(uri, []))
finally:
with self._diag_lock:
self._diag_events.pop(uri, None)
try:
self._notify("textDocument/didClose", {"textDocument": {"uri": uri}})
except RuntimeError:
pass
return diags
@property
def last_used(self) -> float:
return self._last_used
@property
def is_alive(self) -> bool:
proc = self._proc
return not self._closed and proc is not None and proc.poll() is None
# ---------------------------------------------------------------------------
# URI helpers
# ---------------------------------------------------------------------------
def path_to_uri(path: str) -> str:
"""Convert a filesystem path to a ``file://`` URI.
Uses ``Path.as_uri`` after resolving so symlinks and ``..`` components
do not produce two URIs for the same file (which would cause the
diagnostics dict to miss the response).
"""
try:
resolved = Path(path).expanduser().resolve()
except (OSError, RuntimeError):
resolved = Path(path).expanduser()
try:
return resolved.as_uri()
except ValueError:
# ``as_uri`` rejects relative paths; the resolve() above almost
# always prevents this, but fall back to a manual file:// prefix
# so we never raise from inside the lint hot path.
return "file://" + str(resolved).replace(os.sep, "/")
# ---------------------------------------------------------------------------
# Process-wide registry + idle reaper
# ---------------------------------------------------------------------------
_clients_lock = threading.Lock()
_clients: dict[tuple[str, str, tuple[str, ...]], LspClient] = {}
_reaper_thread: Optional[threading.Thread] = None
_reaper_stop = threading.Event()
_reaper_idle_seconds = 600.0
_reaper_interval = 30.0
def get_or_start_client(
language_id: str,
project_root: str,
server_cmd: Sequence[str],
*,
idle_seconds: float = 600.0,
) -> Optional[LspClient]:
"""Return a started client for the (language, root, cmd) triple.
Returns ``None`` if the server binary is missing or fails to start —
callers are expected to treat that as "fall back to shell linter"
rather than a hard error.
"""
if not server_cmd:
return None
binary = server_cmd[0]
if not _which(binary):
return None
key = (language_id, project_root, tuple(server_cmd))
with _clients_lock:
client = _clients.get(key)
if client is not None and client.is_alive:
return client
if client is not None:
# Stale entry — server crashed or was reaped. Drop it before
# respawning so we never hand a dead client back to a caller.
_clients.pop(key, None)
client = LspClient(server_cmd, project_root, language_id)
try:
client.start()
except Exception as exc:
logger.warning("LSP server %s failed to start: %s", server_cmd[0], exc)
return None
_clients[key] = client
global _reaper_idle_seconds
_reaper_idle_seconds = max(_reaper_idle_seconds, idle_seconds)
_ensure_reaper()
return client
def shutdown_all_clients() -> None:
"""Stop every cached server. Intended for tests and process exit."""
with _clients_lock:
clients = list(_clients.values())
_clients.clear()
for client in clients:
try:
client.shutdown()
except Exception:
pass
_reaper_stop.set()
def _which(binary: str) -> Optional[str]:
# Honour absolute / relative paths verbatim — ``shutil.which`` only
# consults PATH, and a user pointing at ``./bin/typescript-language-server``
# in their config should still work.
if os.sep in binary or (os.altsep and os.altsep in binary):
return binary if os.path.isfile(binary) and os.access(binary, os.X_OK) else None
return shutil.which(binary)
def _ensure_reaper() -> None:
global _reaper_thread
if _reaper_thread is not None and _reaper_thread.is_alive():
return
_reaper_stop.clear()
_reaper_thread = threading.Thread(
target=_reaper_loop,
name="lsp-reaper",
daemon=True,
)
_reaper_thread.start()
def _reaper_loop() -> None:
while not _reaper_stop.wait(_reaper_interval):
now = time.monotonic()
idle_cutoff = now - _reaper_idle_seconds
to_close: list[LspClient] = []
with _clients_lock:
for key, client in list(_clients.items()):
if not client.is_alive or client.last_used < idle_cutoff:
_clients.pop(key, None)
to_close.append(client)
for client in to_close:
try:
client.shutdown()
except Exception:
pass

346
tools/lsp_lint.py Normal file
View File

@@ -0,0 +1,346 @@
"""Bridge between ``ShellFileOperations._check_lint`` and ``tools.lsp_client``.
Kept in its own module so the change to ``file_operations.py`` is a single
import + one branch in ``_check_lint`` — easy to audit, easy to revert.
Behavioural contract for ``maybe_lint_via_lsp``:
* Returns ``None`` whenever LSP cannot or should not handle this file
(feature off, language not configured, env not local, no project
root, server missing, request timed out). The caller falls back to
the existing in-process / shell linter exactly as before.
* Returns a ``LintResult`` only when the LSP path produced a real
verdict — clean or with diagnostics.
We never raise. The lint hook is on the hot edit path and any exception
here would surface as a write failure rather than a degraded warning.
Observability
-------------
Calls emit structured log lines on the dedicated ``hermes.lint.lsp``
logger so live agent sessions can answer "did the LSP path fire on
that edit?" with a single ``rg``. Levels are tuned for a 1000-write
session — steady state is silent at the default INFO threshold, only
state changes and action-required failures escape:
* ``DEBUG`` (default agent.log threshold = INFO, so invisible) for
every per-call event that has no novel signal: ``clean``, ``feature
off``, ``extension not mapped``, ``backend not local``, repeated
``no project root`` for an already-announced file, repeated ``server
unavailable`` for an already-announced binary.
* ``INFO`` for state transitions worth surfacing exactly once:
``active for <root>`` the first time a (language, project_root)
client starts, ``no project root for <path>`` the first time we see
that file. Plus every ``N diags`` event (the failure signal users
actually want — these are inherently rare and per-edit).
* ``WARNING`` for action-required failures the first time they happen
per (language, binary): ``server unavailable`` (binary not on PATH),
``no server configured``. Per-call ``WARNING`` for timeouts, server
errors, and unexpected bridge exceptions — these are inherently
novel events, not steady state.
Net result: an agent that writes 1000 clean ``.ts`` files in one
project session emits ONE INFO line (``active for <root>``) and 1000
DEBUG lines that never reach ``agent.log`` at default level.
Grep recipe::
tail -f ~/.hermes/logs/agent.log | rg 'lsp\\['
"""
from __future__ import annotations
import logging
import os
import threading
from typing import TYPE_CHECKING, Any, Optional
from tools.lsp_client import (
Diagnostic,
find_project_root,
format_diagnostics,
get_or_start_client,
)
if TYPE_CHECKING:
from tools.file_operations import LintResult
logger = logging.getLogger(__name__)
# Dedicated logger name so it survives a ``logging.getLogger(__name__)``
# rename of the module without breaking the documented grep recipe.
_event_log = logging.getLogger("hermes.lint.lsp")
def _short_path(file_path: str) -> str:
"""Render *file_path* relative to the cwd when sensible, else absolute.
Keeps the log line readable for the common case (the user is inside
the project they're editing) without emitting brittle ``../../..``
chains for the cross-tree case.
"""
try:
rel = os.path.relpath(file_path)
except ValueError:
return file_path
if rel.startswith(".." + os.sep) or rel == "..":
return file_path
return rel
def _log(language_id: str, level: int, message: str) -> None:
_event_log.log(level, "lsp[%s] %s", language_id, message)
# ---------------------------------------------------------------------------
# Once-per-X dedup for steady-state events
# ---------------------------------------------------------------------------
#
# Why these aren't bounded LRU caches: each set grows at most by the number
# of distinct (language, project_root) and (language, file_path) pairs the
# user touches in a single Python process. Even an aggressive monorepo
# session is well under a few hundred entries — bytes of memory, not MB.
# Bounded eviction would risk re-firing INFO/WARNING lines for the same
# event, which is exactly what these caches exist to prevent.
_announce_lock = threading.Lock()
_announced_active: set[tuple[str, str]] = set()
_announced_unavailable: set[tuple[str, str]] = set()
_announced_no_root: set[tuple[str, str]] = set()
def _reset_announce_caches() -> None:
"""Clear the dedup caches. Test-only — production code never calls this."""
with _announce_lock:
_announced_active.clear()
_announced_unavailable.clear()
_announced_no_root.clear()
def _announce_once(bucket: set[tuple[str, str]], key: tuple[str, str]) -> bool:
"""Return True if *key* has not been announced for *bucket* yet.
Atomically marks the key as announced as a side effect, so concurrent
callers cannot both win the race and double-log.
"""
with _announce_lock:
if key in bucket:
return False
bucket.add(key)
return True
# Map file extension → LSP language id. Mirrors the ``LINTERS`` table in
# file_operations.py but only covers languages where we have an opinion
# about which server to use. New extensions added here MUST also appear
# in the ``servers`` config block below or in the user override.
_EXT_TO_LANGUAGE: dict[str, str] = {
".ts": "typescript",
".tsx": "typescriptreact",
".mts": "typescript",
".cts": "typescript",
".js": "javascript",
".jsx": "javascriptreact",
".mjs": "javascript",
".cjs": "javascript",
".rs": "rust",
".go": "go",
}
# Default server command per language. Every entry can be overridden via
# ``lint.lsp.servers.<language>`` in config.yaml. Keep these to the
# upstream-recommended invocation; users who want overlay configurations
# (extra args, alternate binaries) live in their own config.
_DEFAULT_SERVERS: dict[str, list[str]] = {
"typescript": ["typescript-language-server", "--stdio"],
"typescriptreact": ["typescript-language-server", "--stdio"],
"javascript": ["typescript-language-server", "--stdio"],
"javascriptreact": ["typescript-language-server", "--stdio"],
"rust": ["rust-analyzer"],
"go": ["gopls"],
}
def _load_lsp_config() -> dict[str, Any]:
"""Read ``lint.lsp`` from config.yaml, with deep defaults baked in.
Cached imports are deliberately avoided here: the lint hook runs
inside long-lived agent loops and a stale cache would prevent users
from toggling the flag mid-session via ``/config reload``. The yaml
parse cost is negligible compared to the actual lint subprocess.
"""
try:
from hermes_cli.config import load_config
except Exception:
return {}
try:
cfg = load_config() or {}
except Exception:
return {}
lint_cfg = cfg.get("lint")
if not isinstance(lint_cfg, dict):
return {}
lsp_cfg = lint_cfg.get("lsp")
return lsp_cfg if isinstance(lsp_cfg, dict) else {}
def _resolve_server_cmd(language_id: str, lsp_cfg: dict[str, Any]) -> Optional[list[str]]:
overrides = lsp_cfg.get("servers")
if isinstance(overrides, dict):
candidate = overrides.get(language_id)
if isinstance(candidate, str) and candidate.strip():
return candidate.split()
if isinstance(candidate, list) and candidate:
return [str(part) for part in candidate]
return list(_DEFAULT_SERVERS.get(language_id, []))
def _is_local_env(env: object) -> bool:
"""LSP only routes to local files in this PR.
Rationale lives in lsp_client.py: container/SSH backends need the
server inside the sandbox, which is a separate engineering effort.
Until that lands, anything non-local falls through to the shell
linter so we don't pretend to lint a file the host process can't
even open.
"""
try:
from tools.environments.local import LocalEnvironment
except Exception:
return False
return isinstance(env, LocalEnvironment)
def maybe_lint_via_lsp(
file_path: str,
content: Optional[str],
env: object,
) -> Optional["LintResult"]:
"""Try to produce a LintResult via LSP. Returns ``None`` when caller
should fall through to the in-process / shell linter.
Args:
file_path: Absolute path of the just-written file.
content: Post-write file content. Required — we send it as the
``didOpen`` text payload so the server lints the bytes the
agent intended to land on disk, not whatever was there
before the write completed.
env: Terminal environment from ``ShellFileOperations.env``.
We currently only handle ``LocalEnvironment``.
"""
if content is None:
# No content means the caller (delta refinement against an absent
# pre-write state) has nothing for didOpen — silent skip.
return None
ext = os.path.splitext(file_path)[1].lower()
language_id = _EXT_TO_LANGUAGE.get(ext)
if language_id is None:
# Most writes are .py / .json / .yaml — DEBUG so steady state stays
# invisible at the default INFO threshold.
_event_log.debug("lsp[?] skipped: extension %s not mapped (%s)", ext or "<none>", _short_path(file_path))
return None
if not _is_local_env(env):
_log(language_id, logging.DEBUG, f"skipped: backend not local ({_short_path(file_path)})")
return None
lsp_cfg = _load_lsp_config()
if not lsp_cfg.get("enabled"):
_log(language_id, logging.DEBUG, f"skipped: feature off ({_short_path(file_path)})")
return None
short = _short_path(file_path)
project_root = find_project_root(file_path, language_id)
if project_root is None:
# First time we see this orphan file → INFO so an opt-in user
# learns about the coverage gap. Repeated writes to the same
# orphan stay silent at DEBUG.
if _announce_once(_announced_no_root, (language_id, file_path)):
_log(language_id, logging.INFO, f"skipped: no project root for {short}")
else:
_log(language_id, logging.DEBUG, f"skipped: no project root for {short}")
return None
server_cmd = _resolve_server_cmd(language_id, lsp_cfg)
if not server_cmd:
# Configuration error, not a runtime fluke. Once-per-language is
# plenty — the missing block won't fix itself on the next write.
if _announce_once(_announced_unavailable, (language_id, "<no-cmd>")):
_log(language_id, logging.WARNING, f"skipped: no server configured (set lint.lsp.servers.{language_id})")
return None
binary_name = server_cmd[0]
idle_seconds = float(lsp_cfg.get("idle_shutdown") or 600.0)
try:
client = get_or_start_client(
language_id,
project_root,
server_cmd,
idle_seconds=idle_seconds,
)
except Exception as exc:
if _announce_once(_announced_unavailable, (language_id, binary_name)):
_log(language_id, logging.WARNING, f"skipped: spawn failed for {binary_name!r}: {exc}")
else:
_log(language_id, logging.DEBUG, f"skipped: spawn failed again for {binary_name!r}: {exc}")
return None
if client is None:
# Most likely cause: binary not on PATH. WARNING the first time so
# the opt-in user notices and installs it; DEBUG after so a session
# of 1000 edits doesn't carpet ``errors.log`` with the same line.
if _announce_once(_announced_unavailable, (language_id, binary_name)):
_log(language_id, logging.WARNING, f"skipped: server unavailable ({binary_name!r} not on PATH)")
else:
_log(language_id, logging.DEBUG, f"skipped: server still unavailable ({binary_name!r})")
return None
# First successful client for this (language, root) → one INFO line.
# That's the opt-in user's "yes, LSP is wired up" confirmation.
if _announce_once(_announced_active, (language_id, project_root)):
_log(language_id, logging.INFO, f"active for {_short_path(project_root)} (server: {binary_name})")
timeout = float(lsp_cfg.get("diagnostic_timeout") or 10.0)
settle_ms = int(lsp_cfg.get("settle_ms") or 400)
try:
diagnostics: list[Diagnostic] = client.diagnostics(
file_path,
content,
timeout=timeout,
settle_ms=settle_ms,
)
except TimeoutError as exc:
# Timeouts are inherently distinct events (a hung server today is
# not the same signal as a hung server yesterday), so each one
# gets its own WARNING.
_log(language_id, logging.WARNING, f"skipped: timeout after {timeout:.1f}s ({short}): {exc}")
return None
except RuntimeError as exc:
_log(language_id, logging.WARNING, f"skipped: server error for {short}: {exc}")
return None
except Exception as exc: # noqa: BLE001 — never raise from the lint hot path
_log(language_id, logging.WARNING, f"skipped: unexpected bridge error for {short}: {exc}")
return None
# Filter to errors + warnings. Hints/info are skipped because the
# existing shell linters never surface them, and surfacing them here
# would change the agent's "lint clean / lint dirty" verdict mid-PR.
blocking = [d for d in diagnostics if d.severity in (1, 2)]
# Lazy import to avoid a circular import at module load time.
from tools.file_operations import LintResult
if not blocking:
# Clean writes are the steady state on a healthy project — DEBUG
# so a 1000-write session stays silent at the default threshold.
_log(language_id, logging.DEBUG, f"clean ({short})")
return LintResult(success=True, output="")
# Diagnostics are inherently rare and per-edit, so each one is its
# own INFO event — this is what the user actually wants to grep for.
_log(language_id, logging.INFO, f"{len(blocking)} diag{'s' if len(blocking) != 1 else ''} ({short})")
return LintResult(success=False, output=format_diagnostics(file_path, blocking))
__all__ = [
"maybe_lint_via_lsp",
]