mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-10 04:08:28 +08:00
Compare commits
2 Commits
merge-comm
...
bb/lsp-lin
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3d8be3e84d | ||
|
|
9a546d6b08 |
@@ -1193,6 +1193,45 @@ DEFAULT_CONFIG = {
|
||||
},
|
||||
},
|
||||
|
||||
# Post-edit lint behaviour. Hermes runs a syntax check after every
|
||||
# write/patch and surfaces only the errors that edit introduced (see
|
||||
# ``ShellFileOperations._check_lint_delta`` in tools/file_operations.py).
|
||||
# The defaults below leave the legacy shell linters (``npx tsc --noEmit``,
|
||||
# ``go vet``, ``rustfmt --check``, ``py_compile``) in charge — the LSP
|
||||
# path is opt-in until container/SSH backends grow a way to host the
|
||||
# language server inside the sandbox.
|
||||
"lint": {
|
||||
"lsp": {
|
||||
# Master switch. When false, ``_check_lint`` skips LSP entirely
|
||||
# and the in-process / shell linter table runs as before.
|
||||
"enabled": False,
|
||||
# Per-language server launch command. Each value is either a
|
||||
# whitespace-separated string or a list. Missing languages
|
||||
# fall back to ``tools.lsp_lint._DEFAULT_SERVERS``.
|
||||
"servers": {
|
||||
"typescript": "typescript-language-server --stdio",
|
||||
"typescriptreact": "typescript-language-server --stdio",
|
||||
"javascript": "typescript-language-server --stdio",
|
||||
"javascriptreact": "typescript-language-server --stdio",
|
||||
"rust": "rust-analyzer",
|
||||
"go": "gopls",
|
||||
},
|
||||
# Wall-clock timeout for a single ``didOpen`` → diagnostics
|
||||
# round-trip. Servers that take longer than this on cold start
|
||||
# (notably rust-analyzer indexing a fresh checkout) cause the
|
||||
# caller to fall through to the shell linter.
|
||||
"diagnostic_timeout": 10,
|
||||
# Settle window (milliseconds). Typescript-language-server
|
||||
# publishes an empty diagnostics batch while the program graph
|
||||
# loads, then re-publishes once the file is fully analysed.
|
||||
# Re-snapshotting after this delay catches the real verdict.
|
||||
"settle_ms": 400,
|
||||
# Shut down a long-lived server process after this many seconds
|
||||
# of inactivity. The reaper runs in-process; no daemons.
|
||||
"idle_shutdown": 600,
|
||||
},
|
||||
},
|
||||
|
||||
# Honcho AI-native memory -- reads ~/.honcho/config.json as single source of truth.
|
||||
# This section is only needed for hermes-specific overrides; everything else
|
||||
# (apiKey, workspace, peerName, sessions, enabled) comes from the global config.
|
||||
|
||||
106
tests/tools/test_check_lint_lsp_routing.py
Normal file
106
tests/tools/test_check_lint_lsp_routing.py
Normal file
@@ -0,0 +1,106 @@
|
||||
"""Tests for ``ShellFileOperations._check_lint`` LSP routing.
|
||||
|
||||
Two regression guarantees:
|
||||
|
||||
1. With LSP disabled (the default), behaviour is unchanged for every
|
||||
extension — ``_check_lint`` still hits the in-process linter for
|
||||
.py/.json/.yaml/.toml and the shell linter table for .ts/.go/.rs.
|
||||
|
||||
2. With LSP enabled and the bridge returning a verdict, ``_check_lint``
|
||||
surfaces that verdict directly and skips the shell linter.
|
||||
|
||||
Both sides matter: a regression in (1) would break every existing user
|
||||
who has not opted in; a regression in (2) would silently mean the
|
||||
opt-in flag does nothing.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from tools import file_operations
|
||||
from tools.file_operations import LintResult, ShellFileOperations
|
||||
from tools.environments.local import LocalEnvironment
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ops(tmp_path: Path) -> ShellFileOperations:
|
||||
env = LocalEnvironment(cwd=str(tmp_path), timeout=5)
|
||||
return ShellFileOperations(env, cwd=str(tmp_path))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Default-off path
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDefaultOff:
|
||||
def test_python_still_uses_in_process_linter(self, ops, tmp_path: Path) -> None:
|
||||
path = tmp_path / "a.py"
|
||||
# Don't actually call the LSP bridge; the in-process linter should
|
||||
# short-circuit before it is even consulted.
|
||||
with patch("tools.lsp_lint.maybe_lint_via_lsp", return_value=None) as bridge:
|
||||
result = ops._check_lint(str(path), content="x = 1\n")
|
||||
assert result.success is True
|
||||
# Bridge is consulted (returning None) — that's the contract — but
|
||||
# the in-process linter still runs and produces the verdict.
|
||||
bridge.assert_called_once()
|
||||
|
||||
def test_typescript_falls_through_to_shell_when_bridge_returns_none(
|
||||
self, ops, tmp_path: Path
|
||||
) -> None:
|
||||
path = tmp_path / "x.ts"
|
||||
path.write_text("export {}\n")
|
||||
# Bridge says "not handling this" → the existing shell-linter path
|
||||
# must run. We don't actually exec tsc; we mock _has_command to
|
||||
# report it absent so the shell branch produces a deterministic
|
||||
# "skipped" result we can assert on.
|
||||
with patch("tools.lsp_lint.maybe_lint_via_lsp", return_value=None), \
|
||||
patch.object(ops, "_has_command", return_value=False):
|
||||
result = ops._check_lint(str(path), content="export {}\n")
|
||||
assert result.skipped is True
|
||||
assert "not available" in result.message
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# LSP-on path
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestLspRouting:
|
||||
def test_lsp_clean_short_circuits_shell_linter(self, ops, tmp_path: Path) -> None:
|
||||
path = tmp_path / "x.ts"
|
||||
path.write_text("export {}\n")
|
||||
clean = LintResult(success=True, output="")
|
||||
with patch("tools.lsp_lint.maybe_lint_via_lsp", return_value=clean), \
|
||||
patch.object(ops, "_has_command") as has_command:
|
||||
result = ops._check_lint(str(path), content="export {}\n")
|
||||
assert result is clean
|
||||
# Shell linter must not even probe for `tsc` when LSP returned a
|
||||
# verdict; otherwise we'd be paying for both subprocess startups.
|
||||
has_command.assert_not_called()
|
||||
|
||||
def test_lsp_errors_short_circuit_shell_linter(self, ops, tmp_path: Path) -> None:
|
||||
path = tmp_path / "x.ts"
|
||||
path.write_text("export {}\n")
|
||||
dirty = LintResult(success=False, output=str(path) + ":1:1: error: boom")
|
||||
with patch("tools.lsp_lint.maybe_lint_via_lsp", return_value=dirty), \
|
||||
patch.object(ops, "_has_command") as has_command:
|
||||
result = ops._check_lint(str(path), content="export {}\n")
|
||||
assert result is dirty
|
||||
has_command.assert_not_called()
|
||||
|
||||
def test_lsp_bridge_exception_falls_through_safely(self, ops, tmp_path: Path) -> None:
|
||||
# Even if the bridge module itself throws (broken import, config
|
||||
# parse error, etc.) the lint hook must never propagate — the
|
||||
# write_file caller would otherwise see a synthetic error for an
|
||||
# otherwise-successful write.
|
||||
path = tmp_path / "x.ts"
|
||||
path.write_text("export {}\n")
|
||||
with patch("tools.lsp_lint.maybe_lint_via_lsp", side_effect=RuntimeError("oops")), \
|
||||
patch.object(ops, "_has_command", return_value=False):
|
||||
result = ops._check_lint(str(path), content="export {}\n")
|
||||
assert result.skipped is True
|
||||
332
tests/tools/test_lsp_client.py
Normal file
332
tests/tools/test_lsp_client.py
Normal file
@@ -0,0 +1,332 @@
|
||||
"""Tests for ``tools.lsp_client``.
|
||||
|
||||
The bulk of this module is covered by exercising the real JSON-RPC stack
|
||||
against a *fake* language server we ship in-tree as a tiny Python script.
|
||||
That keeps the tests hermetic — no system typescript-language-server / gopls
|
||||
required — while still proving the framing, request/response demux, and
|
||||
publishDiagnostics wait loop behave end-to-end.
|
||||
|
||||
Pure helper functions (path → URI, project-root walk, message framing) are
|
||||
covered in plain unit tests above.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from tools import lsp_client
|
||||
from tools.lsp_client import (
|
||||
Diagnostic,
|
||||
LspClient,
|
||||
_encode_message,
|
||||
_read_message,
|
||||
find_project_root,
|
||||
format_diagnostics,
|
||||
get_or_start_client,
|
||||
path_to_uri,
|
||||
shutdown_all_clients,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pure helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestProjectRoot:
|
||||
def test_walks_up_to_tsconfig(self, tmp_path: Path) -> None:
|
||||
root = tmp_path / "proj"
|
||||
(root / "src" / "components").mkdir(parents=True)
|
||||
(root / "tsconfig.json").write_text("{}")
|
||||
deep = root / "src" / "components" / "Button.tsx"
|
||||
deep.write_text("export {}\n")
|
||||
assert find_project_root(str(deep), "typescript") == str(root.resolve())
|
||||
|
||||
def test_subproject_marker_wins_over_parent(self, tmp_path: Path) -> None:
|
||||
outer = tmp_path / "outer"
|
||||
inner = outer / "packages" / "ui"
|
||||
(inner / "src").mkdir(parents=True)
|
||||
(outer / "package.json").write_text("{}")
|
||||
(inner / "tsconfig.json").write_text("{}")
|
||||
leaf = inner / "src" / "App.tsx"
|
||||
leaf.write_text("export {}\n")
|
||||
# tsconfig appears first in the marker tuple, so the inner package
|
||||
# wins — even though package.json exists higher up.
|
||||
assert find_project_root(str(leaf), "typescriptreact") == str(inner.resolve())
|
||||
|
||||
def test_returns_none_when_no_marker(self, tmp_path: Path) -> None:
|
||||
orphan = tmp_path / "orphan.ts"
|
||||
orphan.write_text("export {}\n")
|
||||
assert find_project_root(str(orphan), "typescript") is None
|
||||
|
||||
def test_returns_none_for_unknown_language(self, tmp_path: Path) -> None:
|
||||
f = tmp_path / "foo.lol"
|
||||
f.write_text("")
|
||||
assert find_project_root(str(f), "klingon") is None
|
||||
|
||||
|
||||
class TestUri:
|
||||
def test_round_trip_absolute_path(self, tmp_path: Path) -> None:
|
||||
path = tmp_path / "x.ts"
|
||||
path.write_text("")
|
||||
uri = path_to_uri(str(path))
|
||||
assert uri.startswith("file://")
|
||||
assert uri.endswith("x.ts")
|
||||
|
||||
def test_relative_path_does_not_raise(self, tmp_path: Path, monkeypatch) -> None:
|
||||
monkeypatch.chdir(tmp_path)
|
||||
# Should not raise even before the file exists.
|
||||
uri = path_to_uri("not-yet-created.ts")
|
||||
assert uri.startswith("file://")
|
||||
|
||||
|
||||
class TestFraming:
|
||||
def test_encode_then_decode_roundtrip(self) -> None:
|
||||
payload = {"jsonrpc": "2.0", "id": 7, "method": "x", "params": {"a": 1}}
|
||||
wire = _encode_message(payload)
|
||||
assert wire.startswith(b"Content-Length: ")
|
||||
# Read it back through _read_message using a BytesIO.
|
||||
decoded = _read_message(io.BytesIO(wire))
|
||||
assert decoded == payload
|
||||
|
||||
def test_eof_returns_none(self) -> None:
|
||||
assert _read_message(io.BytesIO(b"")) is None
|
||||
|
||||
def test_garbage_header_raises(self) -> None:
|
||||
# Header without Content-Length (8KB cap doesn't kick in here, but
|
||||
# parser should still reject after seeing the terminator).
|
||||
bad = b"X-Garbage: yes\r\n\r\n"
|
||||
with pytest.raises(RuntimeError):
|
||||
_read_message(io.BytesIO(bad))
|
||||
|
||||
|
||||
class TestDiagnostic:
|
||||
def test_from_lsp_converts_zero_based_to_one_based(self) -> None:
|
||||
raw: dict[str, Any] = {
|
||||
"range": {"start": {"line": 9, "character": 4}, "end": {"line": 9, "character": 8}},
|
||||
"severity": 1,
|
||||
"message": "Cannot find name 'foo'.",
|
||||
"source": "ts",
|
||||
"code": 2304,
|
||||
}
|
||||
d = Diagnostic.from_lsp(raw)
|
||||
assert d.line == 10
|
||||
assert d.column == 5
|
||||
assert d.severity == 1
|
||||
assert d.code == "2304"
|
||||
|
||||
def test_format_includes_path_and_source(self) -> None:
|
||||
d = Diagnostic(line=3, column=2, severity=2, message="unused", source="eslint", code="no-unused")
|
||||
text = d.format("/tmp/x.ts")
|
||||
assert text.startswith("/tmp/x.ts:3:2:")
|
||||
assert "warning" in text
|
||||
assert "unused" in text
|
||||
assert "eslint" in text
|
||||
|
||||
def test_format_diagnostics_joins_with_newlines(self) -> None:
|
||||
out = format_diagnostics(
|
||||
"/tmp/x.ts",
|
||||
[
|
||||
Diagnostic(line=1, column=1, message="a"),
|
||||
Diagnostic(line=2, column=1, message="b"),
|
||||
],
|
||||
)
|
||||
assert out.split("\n") == [
|
||||
"/tmp/x.ts:1:1: error: a",
|
||||
"/tmp/x.ts:2:1: error: b",
|
||||
]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fake LSP server (a short Python script we exec via subprocess)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
_FAKE_SERVER_TEMPLATE = r"""
|
||||
import json
|
||||
import sys
|
||||
import struct
|
||||
|
||||
# Modes:
|
||||
# "ok" -> publish empty diagnostics for any didOpen URI.
|
||||
# "errors" -> publish two diagnostics for any didOpen URI.
|
||||
# "settle" -> publish empty diagnostics first, then publish two
|
||||
# diagnostics 50ms later. Exercises the settle window.
|
||||
MODE = {mode!r}
|
||||
|
||||
def _read():
|
||||
header = b''
|
||||
while b'\r\n\r\n' not in header:
|
||||
chunk = sys.stdin.buffer.read(1)
|
||||
if not chunk:
|
||||
return None
|
||||
header += chunk
|
||||
length = int(header.split(b'Content-Length:')[1].split(b'\r\n')[0].strip())
|
||||
body = b''
|
||||
while len(body) < length:
|
||||
chunk = sys.stdin.buffer.read(length - len(body))
|
||||
if not chunk:
|
||||
return None
|
||||
body += chunk
|
||||
return json.loads(body.decode('utf-8'))
|
||||
|
||||
def _write(payload):
|
||||
body = json.dumps(payload).encode('utf-8')
|
||||
sys.stdout.buffer.write(b'Content-Length: ' + str(len(body)).encode() + b'\r\n\r\n' + body)
|
||||
sys.stdout.buffer.flush()
|
||||
|
||||
def _diags(uri):
|
||||
return [
|
||||
{{"range": {{"start": {{"line": 0, "character": 0}}, "end": {{"line": 0, "character": 1}}}},
|
||||
"severity": 1, "message": "boom", "source": "fake", "code": "E1"}},
|
||||
{{"range": {{"start": {{"line": 1, "character": 4}}, "end": {{"line": 1, "character": 6}}}},
|
||||
"severity": 2, "message": "warn", "source": "fake"}},
|
||||
]
|
||||
|
||||
while True:
|
||||
msg = _read()
|
||||
if msg is None:
|
||||
break
|
||||
if 'id' in msg and msg.get('method') == 'initialize':
|
||||
_write({{"jsonrpc": "2.0", "id": msg['id'], "result": {{"capabilities": {{}}}}}})
|
||||
elif 'id' in msg and msg.get('method') == 'shutdown':
|
||||
_write({{"jsonrpc": "2.0", "id": msg['id'], "result": None}})
|
||||
elif msg.get('method') == 'exit':
|
||||
break
|
||||
elif msg.get('method') == 'textDocument/didOpen':
|
||||
uri = msg['params']['textDocument']['uri']
|
||||
if MODE == 'ok':
|
||||
_write({{"jsonrpc": "2.0", "method": "textDocument/publishDiagnostics",
|
||||
"params": {{"uri": uri, "diagnostics": []}}}})
|
||||
elif MODE == 'errors':
|
||||
_write({{"jsonrpc": "2.0", "method": "textDocument/publishDiagnostics",
|
||||
"params": {{"uri": uri, "diagnostics": _diags(uri)}}}})
|
||||
elif MODE == 'settle':
|
||||
_write({{"jsonrpc": "2.0", "method": "textDocument/publishDiagnostics",
|
||||
"params": {{"uri": uri, "diagnostics": []}}}})
|
||||
import time as _t; _t.sleep(0.05)
|
||||
_write({{"jsonrpc": "2.0", "method": "textDocument/publishDiagnostics",
|
||||
"params": {{"uri": uri, "diagnostics": _diags(uri)}}}})
|
||||
"""
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fake_server_factory(tmp_path: Path):
|
||||
"""Return a callable that materialises a fake-LSP launch command.
|
||||
|
||||
We materialise per-mode so tests can choose between the empty and
|
||||
error-emitting servers without juggling globals.
|
||||
"""
|
||||
|
||||
def _make(mode: str) -> list[str]:
|
||||
script = tmp_path / f"fake_lsp_{mode}.py"
|
||||
script.write_text(_FAKE_SERVER_TEMPLATE.format(mode=mode))
|
||||
# Use the same interpreter that's running pytest so the fake
|
||||
# server inherits the same Python ABI on every CI image.
|
||||
return [sys.executable, str(script)]
|
||||
|
||||
return _make
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _shutdown_registry_between_tests():
|
||||
"""Make sure the module-global client cache cannot leak servers
|
||||
across tests — every cached subprocess is real and would survive."""
|
||||
yield
|
||||
shutdown_all_clients()
|
||||
|
||||
|
||||
class TestClientHandshakeAndDiagnostics:
|
||||
def test_clean_file_returns_no_diagnostics(self, fake_server_factory, tmp_path: Path) -> None:
|
||||
cmd = fake_server_factory("ok")
|
||||
client = LspClient(cmd, str(tmp_path), "typescript")
|
||||
client.start()
|
||||
try:
|
||||
diags = client.diagnostics(str(tmp_path / "a.ts"), "export {}\n", timeout=3.0, settle_ms=0)
|
||||
finally:
|
||||
client.shutdown()
|
||||
assert diags == []
|
||||
|
||||
def test_dirty_file_surfaces_error_and_warning(self, fake_server_factory, tmp_path: Path) -> None:
|
||||
cmd = fake_server_factory("errors")
|
||||
client = LspClient(cmd, str(tmp_path), "typescript")
|
||||
client.start()
|
||||
try:
|
||||
diags = client.diagnostics(str(tmp_path / "a.ts"), "x;\n", timeout=3.0, settle_ms=0)
|
||||
finally:
|
||||
client.shutdown()
|
||||
# Two diagnostics with severities 1 (error) and 2 (warning).
|
||||
assert sorted(d.severity for d in diags) == [1, 2]
|
||||
assert any("boom" in d.message for d in diags)
|
||||
|
||||
def test_settle_window_picks_up_late_diagnostics(self, fake_server_factory, tmp_path: Path) -> None:
|
||||
cmd = fake_server_factory("settle")
|
||||
client = LspClient(cmd, str(tmp_path), "typescript")
|
||||
client.start()
|
||||
try:
|
||||
# Without a settle window we'd only see the empty first batch;
|
||||
# the 200ms window must be long enough to capture the second
|
||||
# publishDiagnostics that lands ~50ms after didOpen.
|
||||
diags = client.diagnostics(str(tmp_path / "a.ts"), "x;\n", timeout=3.0, settle_ms=200)
|
||||
finally:
|
||||
client.shutdown()
|
||||
assert len(diags) == 2
|
||||
|
||||
def test_request_timeout_does_not_corrupt_state(self, tmp_path: Path) -> None:
|
||||
# A "server" that reads but never writes anything back. Initialize
|
||||
# must time out cleanly without deadlocking the writer lock.
|
||||
script = tmp_path / "silent_lsp.py"
|
||||
script.write_text("import sys\nwhile sys.stdin.buffer.read(1):\n pass\n")
|
||||
client = LspClient([sys.executable, str(script)], str(tmp_path), "typescript", startup_timeout=0.5)
|
||||
with pytest.raises(TimeoutError):
|
||||
client.start()
|
||||
|
||||
|
||||
class TestRegistry:
|
||||
def test_missing_binary_returns_none(self, tmp_path: Path) -> None:
|
||||
result = get_or_start_client(
|
||||
"typescript",
|
||||
str(tmp_path),
|
||||
["definitely-not-a-real-binary-xyz123"],
|
||||
)
|
||||
assert result is None
|
||||
|
||||
def test_caches_per_root(self, fake_server_factory, tmp_path: Path) -> None:
|
||||
cmd = fake_server_factory("ok")
|
||||
a = get_or_start_client("typescript", str(tmp_path), cmd)
|
||||
b = get_or_start_client("typescript", str(tmp_path), cmd)
|
||||
assert a is not None and b is not None
|
||||
assert a is b
|
||||
|
||||
def test_separate_roots_get_separate_clients(self, fake_server_factory, tmp_path: Path) -> None:
|
||||
cmd = fake_server_factory("ok")
|
||||
root_a = tmp_path / "a"; root_a.mkdir()
|
||||
root_b = tmp_path / "b"; root_b.mkdir()
|
||||
a = get_or_start_client("typescript", str(root_a), cmd)
|
||||
b = get_or_start_client("typescript", str(root_b), cmd)
|
||||
assert a is not None and b is not None
|
||||
assert a is not b
|
||||
|
||||
|
||||
class TestShutdown:
|
||||
def test_shutdown_terminates_server(self, fake_server_factory, tmp_path: Path) -> None:
|
||||
cmd = fake_server_factory("ok")
|
||||
client = LspClient(cmd, str(tmp_path), "typescript")
|
||||
client.start()
|
||||
proc = client._proc
|
||||
assert proc is not None
|
||||
client.shutdown()
|
||||
# The server must exit promptly; otherwise we'd be leaking processes
|
||||
# in long-lived agent sessions every time idle_shutdown reaps a client.
|
||||
assert proc.wait(timeout=3.0) is not None
|
||||
345
tests/tools/test_lsp_lint_bridge.py
Normal file
345
tests/tools/test_lsp_lint_bridge.py
Normal file
@@ -0,0 +1,345 @@
|
||||
"""Tests for ``tools.lsp_lint`` — the bridge between ``_check_lint`` and the
|
||||
LSP client.
|
||||
|
||||
We do not spin up a real language server here. Instead we monkeypatch
|
||||
``get_or_start_client`` with a fake that returns whatever diagnostics the
|
||||
test wants. This keeps the bridge logic — feature flag, language map,
|
||||
project-root gating, error containment, observability — under tight
|
||||
unit-test control while leaving the real LSP wire-protocol coverage to
|
||||
test_lsp_client.py.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from tools import lsp_lint
|
||||
from tools.lsp_client import Diagnostic
|
||||
from tools.environments.local import LocalEnvironment
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class _FakeClient:
|
||||
def __init__(self, diags: list[Diagnostic] | Exception | None = None) -> None:
|
||||
self._diags = diags
|
||||
|
||||
def diagnostics(self, path: str, content: str, *, timeout: float, settle_ms: int) -> list[Diagnostic]:
|
||||
if isinstance(self._diags, Exception):
|
||||
raise self._diags
|
||||
return list(self._diags or [])
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ts_project(tmp_path: Path) -> tuple[Path, Path]:
|
||||
"""A tsconfig-rooted project with one source file."""
|
||||
(tmp_path / "tsconfig.json").write_text("{}")
|
||||
src = tmp_path / "src" / "app.ts"
|
||||
src.parent.mkdir(parents=True)
|
||||
src.write_text("export {}\n")
|
||||
return tmp_path, src
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def local_env(tmp_path: Path) -> LocalEnvironment:
|
||||
return LocalEnvironment(cwd=str(tmp_path), timeout=5)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Feature-flag gating
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFeatureFlag:
|
||||
def test_returns_none_when_disabled(self, ts_project, local_env) -> None:
|
||||
_, src = ts_project
|
||||
with patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": False}):
|
||||
result = lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
|
||||
assert result is None
|
||||
|
||||
def test_returns_none_when_config_missing(self, ts_project, local_env) -> None:
|
||||
_, src = ts_project
|
||||
with patch.object(lsp_lint, "_load_lsp_config", return_value={}):
|
||||
result = lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
|
||||
assert result is None
|
||||
|
||||
def test_returns_none_when_content_missing(self, ts_project, local_env) -> None:
|
||||
_, src = ts_project
|
||||
with patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}):
|
||||
result = lsp_lint.maybe_lint_via_lsp(str(src), None, local_env)
|
||||
# We never lint without content — there is no point sending an empty
|
||||
# didOpen text payload.
|
||||
assert result is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Backend gating
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestBackendGating:
|
||||
def test_returns_none_for_non_local_env(self, ts_project) -> None:
|
||||
_, src = ts_project
|
||||
|
||||
class _FakeRemoteEnv:
|
||||
cwd = "/remote/workspace"
|
||||
|
||||
with patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}):
|
||||
result = lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", _FakeRemoteEnv())
|
||||
# Container/SSH/Modal backends are deliberately excluded in this PR.
|
||||
assert result is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Project-root gating
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestProjectRootGating:
|
||||
def test_orphan_file_returns_none(self, tmp_path: Path, local_env) -> None:
|
||||
# No tsconfig anywhere on the way up — the bridge must abort
|
||||
# rather than reproduce the phantom-error problem.
|
||||
orphan_dir = tmp_path / "orphan"
|
||||
orphan_dir.mkdir()
|
||||
orphan = orphan_dir / "loose.ts"
|
||||
orphan.write_text("export {}\n")
|
||||
with patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}):
|
||||
result = lsp_lint.maybe_lint_via_lsp(str(orphan), "export {}\n", local_env)
|
||||
assert result is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Language map
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestLanguageMap:
|
||||
def test_unsupported_extension_returns_none(self, tmp_path: Path, local_env) -> None:
|
||||
path = tmp_path / "file.txt"
|
||||
path.write_text("hello")
|
||||
with patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}):
|
||||
result = lsp_lint.maybe_lint_via_lsp(str(path), "hello", local_env)
|
||||
assert result is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Happy path + error containment
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRouting:
|
||||
def test_clean_run_returns_success_lint_result(self, ts_project, local_env) -> None:
|
||||
_, src = ts_project
|
||||
with patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}), \
|
||||
patch.object(lsp_lint, "get_or_start_client", return_value=_FakeClient([])):
|
||||
result = lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
|
||||
assert result is not None
|
||||
assert result.success is True
|
||||
assert result.output == ""
|
||||
|
||||
def test_errors_become_failure_with_formatted_output(self, ts_project, local_env) -> None:
|
||||
_, src = ts_project
|
||||
diags = [
|
||||
Diagnostic(line=12, column=4, severity=1, message="Cannot find name 'foo'", source="ts", code="2304"),
|
||||
Diagnostic(line=15, column=1, severity=2, message="Unused import", source="ts"),
|
||||
]
|
||||
with patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}), \
|
||||
patch.object(lsp_lint, "get_or_start_client", return_value=_FakeClient(diags)):
|
||||
result = lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
|
||||
assert result is not None
|
||||
assert result.success is False
|
||||
assert ":12:4: error: Cannot find name 'foo'" in result.output
|
||||
assert ":15:1: warning: Unused import" in result.output
|
||||
|
||||
def test_hint_severity_is_filtered_out(self, ts_project, local_env) -> None:
|
||||
_, src = ts_project
|
||||
diags = [
|
||||
Diagnostic(line=1, column=1, severity=4, message="hint", source="ts"),
|
||||
Diagnostic(line=2, column=1, severity=3, message="info", source="ts"),
|
||||
]
|
||||
with patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}), \
|
||||
patch.object(lsp_lint, "get_or_start_client", return_value=_FakeClient(diags)):
|
||||
result = lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
|
||||
# No errors or warnings → treated as clean. Otherwise we'd be
|
||||
# changing the agent's verdict mid-edit when shell linters never
|
||||
# surfaced these severities.
|
||||
assert result is not None
|
||||
assert result.success is True
|
||||
assert result.output == ""
|
||||
|
||||
def test_client_unavailable_returns_none(self, ts_project, local_env) -> None:
|
||||
_, src = ts_project
|
||||
with patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}), \
|
||||
patch.object(lsp_lint, "get_or_start_client", return_value=None):
|
||||
result = lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
|
||||
# Server binary missing or failed to spawn: caller must fall through
|
||||
# to the legacy shell linter rather than report a phantom-clean.
|
||||
assert result is None
|
||||
|
||||
def test_diagnostics_timeout_returns_none(self, ts_project, local_env) -> None:
|
||||
_, src = ts_project
|
||||
with patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}), \
|
||||
patch.object(lsp_lint, "get_or_start_client",
|
||||
return_value=_FakeClient(TimeoutError("slow"))):
|
||||
result = lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
|
||||
assert result is None
|
||||
|
||||
def test_unexpected_exception_returns_none(self, ts_project, local_env) -> None:
|
||||
_, src = ts_project
|
||||
with patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}), \
|
||||
patch.object(lsp_lint, "get_or_start_client",
|
||||
return_value=_FakeClient(RuntimeError("server crash"))):
|
||||
result = lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
|
||||
# The lint hook is on the hot edit path — bridge must never raise.
|
||||
assert result is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Observability — log levels and once-per-X dedup
|
||||
# ---------------------------------------------------------------------------
|
||||
#
|
||||
# The default agent.log threshold is INFO. These tests pin down that
|
||||
# *steady state* (1000 clean writes, 1000 feature-off writes, etc.) emits
|
||||
# zero records visible to the agent.log handler, and that *novel* events
|
||||
# get exactly one WARNING/INFO line that survives.
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset_dedup_caches():
|
||||
"""Each test sees a fresh dedup state so prior runs cannot mask
|
||||
a missing announcement (or hide a duplicate one)."""
|
||||
lsp_lint._reset_announce_caches()
|
||||
yield
|
||||
lsp_lint._reset_announce_caches()
|
||||
|
||||
|
||||
def _records_at(caplog, level: int) -> list[logging.LogRecord]:
|
||||
return [r for r in caplog.records if r.levelno >= level]
|
||||
|
||||
|
||||
class TestLogLevelsSteadyState:
|
||||
"""Anything that fires every write must stay at DEBUG so 1000 writes
|
||||
cannot flood agent.log at the default INFO threshold."""
|
||||
|
||||
def test_feature_off_is_debug(self, ts_project, local_env, caplog) -> None:
|
||||
_, src = ts_project
|
||||
with caplog.at_level(logging.DEBUG, logger="hermes.lint.lsp"), \
|
||||
patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": False}):
|
||||
for _ in range(5):
|
||||
lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
|
||||
assert _records_at(caplog, logging.INFO) == []
|
||||
|
||||
def test_unmapped_extension_is_debug(self, tmp_path: Path, local_env, caplog) -> None:
|
||||
path = tmp_path / "x.txt"
|
||||
path.write_text("hi")
|
||||
with caplog.at_level(logging.DEBUG, logger="hermes.lint.lsp"), \
|
||||
patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}):
|
||||
for _ in range(5):
|
||||
lsp_lint.maybe_lint_via_lsp(str(path), "hi", local_env)
|
||||
assert _records_at(caplog, logging.INFO) == []
|
||||
|
||||
def test_non_local_backend_is_debug(self, ts_project, caplog) -> None:
|
||||
_, src = ts_project
|
||||
|
||||
class _FakeRemote:
|
||||
cwd = "/remote"
|
||||
|
||||
with caplog.at_level(logging.DEBUG, logger="hermes.lint.lsp"), \
|
||||
patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}):
|
||||
for _ in range(5):
|
||||
lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", _FakeRemote())
|
||||
assert _records_at(caplog, logging.INFO) == []
|
||||
|
||||
def test_clean_write_is_debug(self, ts_project, local_env, caplog) -> None:
|
||||
_, src = ts_project
|
||||
with caplog.at_level(logging.DEBUG, logger="hermes.lint.lsp"), \
|
||||
patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}), \
|
||||
patch.object(lsp_lint, "get_or_start_client", return_value=_FakeClient([])):
|
||||
# First write produces ONE INFO ("active for ...") for the
|
||||
# state transition. Subsequent clean writes must be DEBUG so
|
||||
# 1000 clean writes don't carpet agent.log.
|
||||
for _ in range(5):
|
||||
lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
|
||||
infos = [r for r in caplog.records if r.levelno == logging.INFO]
|
||||
assert len(infos) == 1, f"expected exactly one 'active' INFO, got {[r.getMessage() for r in infos]}"
|
||||
assert "active for" in infos[0].getMessage()
|
||||
|
||||
|
||||
class TestLogLevelsNovelEvents:
|
||||
"""Things that change state or require action must escape DEBUG."""
|
||||
|
||||
def test_diagnostics_are_info_per_call(self, ts_project, local_env, caplog) -> None:
|
||||
_, src = ts_project
|
||||
diags = [Diagnostic(line=1, column=1, severity=1, message="boom")]
|
||||
with caplog.at_level(logging.INFO, logger="hermes.lint.lsp"), \
|
||||
patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}), \
|
||||
patch.object(lsp_lint, "get_or_start_client", return_value=_FakeClient(diags)):
|
||||
for _ in range(3):
|
||||
lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
|
||||
# Anchor on the diagnostic message shape — temp paths can contain
|
||||
# "diag" as a substring (e.g. test_diagnostics_are_info_per_0/), so
|
||||
# a bare ``"diag" in msg`` filter would over-match the active line.
|
||||
diag_lines = [r.getMessage() for r in caplog.records if "] 1 diag (" in r.getMessage()]
|
||||
assert len(diag_lines) == 3
|
||||
|
||||
def test_active_announcement_fires_once_per_root(self, ts_project, local_env, caplog) -> None:
|
||||
_, src = ts_project
|
||||
with caplog.at_level(logging.INFO, logger="hermes.lint.lsp"), \
|
||||
patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}), \
|
||||
patch.object(lsp_lint, "get_or_start_client", return_value=_FakeClient([])):
|
||||
for _ in range(10):
|
||||
lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
|
||||
actives = [r for r in caplog.records if "active for" in r.getMessage()]
|
||||
assert len(actives) == 1
|
||||
|
||||
|
||||
class TestLogLevelsActionRequired:
|
||||
"""First occurrence WARNING, subsequent same-event DEBUG."""
|
||||
|
||||
def test_server_unavailable_warns_once(self, ts_project, local_env, caplog) -> None:
|
||||
_, src = ts_project
|
||||
with caplog.at_level(logging.DEBUG, logger="hermes.lint.lsp"), \
|
||||
patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}), \
|
||||
patch.object(lsp_lint, "get_or_start_client", return_value=None):
|
||||
for _ in range(5):
|
||||
lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
|
||||
warnings = [r for r in caplog.records if r.levelno == logging.WARNING]
|
||||
# Exactly one WARNING — the other 4 calls demoted to DEBUG so
|
||||
# errors.log (WARNING+) doesn't get the same line 1000 times.
|
||||
assert len(warnings) == 1
|
||||
assert "server unavailable" in warnings[0].getMessage()
|
||||
|
||||
def test_no_project_root_info_once_per_path(self, tmp_path: Path, local_env, caplog) -> None:
|
||||
# Two distinct orphan files → two INFO lines. Same orphan twice → one.
|
||||
orphan_a = tmp_path / "a.ts"
|
||||
orphan_a.write_text("")
|
||||
orphan_b = tmp_path / "b.ts"
|
||||
orphan_b.write_text("")
|
||||
with caplog.at_level(logging.DEBUG, logger="hermes.lint.lsp"), \
|
||||
patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}):
|
||||
lsp_lint.maybe_lint_via_lsp(str(orphan_a), "", local_env)
|
||||
lsp_lint.maybe_lint_via_lsp(str(orphan_a), "", local_env)
|
||||
lsp_lint.maybe_lint_via_lsp(str(orphan_b), "", local_env)
|
||||
lsp_lint.maybe_lint_via_lsp(str(orphan_b), "", local_env)
|
||||
infos = [r for r in caplog.records if r.levelno == logging.INFO]
|
||||
assert len(infos) == 2, [r.getMessage() for r in infos]
|
||||
|
||||
def test_timeout_warns_every_time(self, ts_project, local_env, caplog) -> None:
|
||||
# Timeouts are inherently novel events (a hang now isn't the same
|
||||
# signal as a hang yesterday), so every one must escape DEBUG.
|
||||
_, src = ts_project
|
||||
with caplog.at_level(logging.WARNING, logger="hermes.lint.lsp"), \
|
||||
patch.object(lsp_lint, "_load_lsp_config", return_value={"enabled": True}), \
|
||||
patch.object(lsp_lint, "get_or_start_client",
|
||||
return_value=_FakeClient(TimeoutError("slow"))):
|
||||
for _ in range(3):
|
||||
lsp_lint.maybe_lint_via_lsp(str(src), "export {}\n", local_env)
|
||||
timeouts = [r for r in caplog.records if "timeout" in r.getMessage()]
|
||||
assert len(timeouts) == 3
|
||||
@@ -1033,11 +1033,19 @@ class ShellFileOperations(FileOperations):
|
||||
"""
|
||||
Run syntax check on a file after editing.
|
||||
|
||||
Prefers the in-process linter for structured formats (JSON, YAML,
|
||||
TOML) when possible — those parse via the Python stdlib in
|
||||
microseconds and don't require a subprocess. Falls back to the
|
||||
shell linter table for compiled/type-checked languages
|
||||
(py_compile, node --check, tsc, go vet, rustfmt).
|
||||
Resolution order:
|
||||
1. LSP-backed lint via ``tools.lsp_lint.maybe_lint_via_lsp`` —
|
||||
only fires when ``lint.lsp.enabled`` is true, the backend is
|
||||
local, and a project root marker (tsconfig.json, Cargo.toml,
|
||||
go.mod, …) is present. Returns None when any of those
|
||||
prerequisites fail, so the cheaper paths below run untouched
|
||||
for everyone who hasn't opted in.
|
||||
2. In-process linter for structured formats (JSON/YAML/TOML)
|
||||
plus Python ``ast.parse`` — microseconds per call, no
|
||||
subprocess.
|
||||
3. Shell linter table (py_compile, node --check, tsc, go vet,
|
||||
rustfmt) for compiled/type-checked languages without an
|
||||
in-process option.
|
||||
|
||||
Args:
|
||||
path: File path (used to select the linter + for shell invocation).
|
||||
@@ -1051,7 +1059,20 @@ class ShellFileOperations(FileOperations):
|
||||
"""
|
||||
ext = os.path.splitext(path)[1].lower()
|
||||
|
||||
# Prefer in-process linter when available.
|
||||
# 1) LSP — opt-in, local-only today. Lazily imported so the rest of
|
||||
# the file_operations surface keeps working in environments where
|
||||
# the LSP module's transitive imports (config loader, etc.) are
|
||||
# unavailable, e.g. minimal sandboxes used by some unit tests.
|
||||
if content is not None:
|
||||
try:
|
||||
from tools.lsp_lint import maybe_lint_via_lsp
|
||||
lsp_result = maybe_lint_via_lsp(path, content, self.env)
|
||||
except Exception:
|
||||
lsp_result = None
|
||||
if lsp_result is not None:
|
||||
return lsp_result
|
||||
|
||||
# 2) In-process linter when available.
|
||||
inproc = LINTERS_INPROC.get(ext)
|
||||
if inproc is not None:
|
||||
# Need content — either passed in or read from disk.
|
||||
@@ -1066,7 +1087,9 @@ class ShellFileOperations(FileOperations):
|
||||
return LintResult(skipped=True, message=f"No linter available for {ext} (missing dependency)")
|
||||
return LintResult(success=ok, output="" if ok else err)
|
||||
|
||||
# Fall back to shell linter.
|
||||
# 3) Shell linter table — last-resort fallback for compiled / type-
|
||||
# checked languages without an in-process option (and the path the
|
||||
# disabled LSP feature deliberately falls through to).
|
||||
if ext not in LINTERS:
|
||||
return LintResult(skipped=True, message=f"No linter for {ext} files")
|
||||
|
||||
|
||||
662
tools/lsp_client.py
Normal file
662
tools/lsp_client.py
Normal file
@@ -0,0 +1,662 @@
|
||||
"""Stdio Language Server Protocol client for post-edit linting.
|
||||
|
||||
Goal
|
||||
----
|
||||
Replace ``npx tsc --noEmit <single-file>`` (and the ``go vet`` / ``rustfmt
|
||||
--check`` siblings in ``tools/file_operations.LINTERS``) with project-aware
|
||||
diagnostics from a real language server, so the agent stops chasing phantom
|
||||
"Cannot find module" errors that vanish the moment ``tsconfig.json`` is in
|
||||
scope.
|
||||
|
||||
Why a hand-rolled client (vs. an existing OSS library)
|
||||
-------------------------------------------------------
|
||||
Three options were evaluated before writing this:
|
||||
|
||||
* **microsoft/multilspy** (v0.0.15, "pre-alpha" per its own README) is the
|
||||
closest fit on paper, but it is async-first, drags in a pinned
|
||||
``jedi-language-server`` server as a hard dependency just to wire up
|
||||
Python, and auto-downloads server binaries to ``~/.multilspy/`` —
|
||||
which collides with HERMES_HOME profile isolation and the user's own
|
||||
toolchain. Its public API is built around navigation
|
||||
(``request_definition`` / ``request_references`` / ``request_hover``)
|
||||
rather than the diagnostic flow we need, so we'd be working against
|
||||
the grain of the documented surface.
|
||||
* **sansio-lsp-client** (~3K monthly downloads, single primary
|
||||
maintainer) is sans-I/O — it gives us ~50 lines of framing in exchange
|
||||
for a new dependency, but every other moving part below (subprocess
|
||||
pump, reader thread, request demux, per-``(env, root)`` cache, idle
|
||||
reaper, error containment for the lint hot path) would still be ours.
|
||||
* **pygls** is server-side only; client support is "Coming Soon" in the
|
||||
v2 docs and isn't usable yet.
|
||||
|
||||
So the cost/benefit of a library is "import a pre-alpha or thin
|
||||
dependency, still write 80% of this module, and inherit binary-download
|
||||
magic we don't want." Better to own the ~500 lines, document the
|
||||
contract, and pull in ``lsprotocol`` for typed messages later if/when we
|
||||
need pull diagnostics or progress tokens.
|
||||
|
||||
What this module owns
|
||||
---------------------
|
||||
* Stdio JSON-RPC transport with Content-Length framing.
|
||||
* Per ``(language, project_root, server_cmd)`` cached client, kept alive
|
||||
across edits so the agent only pays cold-start once per project.
|
||||
* ``diagnostics(path, content)`` returns one shot of ``Diagnostic`` items
|
||||
via ``textDocument/didOpen`` + ``publishDiagnostics`` (push) with a
|
||||
short settle window. Pull diagnostics (LSP 3.17) is not used here — the
|
||||
push flow works on every server we care about, including
|
||||
``typescript-language-server`` and ``rust-analyzer``.
|
||||
* Idle reaper thread shuts down servers after a configurable quiet
|
||||
period so long-lived sessions do not accumulate processes.
|
||||
|
||||
Out of scope (deliberately deferred)
|
||||
-------------------------------------
|
||||
* Container / SSH / Modal / Daytona backends — the server has to live
|
||||
where the files live, which means baking it into the sandbox image or
|
||||
installing on first use. That is the gnarly half and gets its own PR.
|
||||
* didChange / live editing — every Hermes call here re-opens the file
|
||||
with the freshly-written contents, so we never have an in-flight buffer
|
||||
to keep in sync.
|
||||
* workspace/configuration, pull diagnostics, file watching — none of
|
||||
those are needed for the "one diagnostic snapshot per write" use case
|
||||
we are solving.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Iterable, Optional, Sequence
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Project-root discovery
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Filenames whose presence in a directory marks it as a project root for the
|
||||
# given language id. Order matters within each tuple: the first marker found
|
||||
# walking upward wins, so users can opt into a sub-project root by dropping a
|
||||
# more-specific marker (``tsconfig.json``) inside a parent that also has a
|
||||
# generic one (``package.json``).
|
||||
PROJECT_ROOT_MARKERS: dict[str, tuple[str, ...]] = {
|
||||
"typescript": ("tsconfig.json", "jsconfig.json", "package.json"),
|
||||
"typescriptreact": ("tsconfig.json", "jsconfig.json", "package.json"),
|
||||
"javascript": ("jsconfig.json", "package.json"),
|
||||
"javascriptreact": ("jsconfig.json", "package.json"),
|
||||
"rust": ("Cargo.toml",),
|
||||
"go": ("go.mod",),
|
||||
}
|
||||
|
||||
|
||||
def find_project_root(file_path: str, language_id: str) -> Optional[str]:
|
||||
"""Walk upward from *file_path* looking for the first project marker.
|
||||
|
||||
Returns the absolute directory path or ``None`` if no marker is found
|
||||
before hitting the filesystem root. ``None`` is the signal to skip
|
||||
LSP and fall back to the shell linter — an orphan file with no
|
||||
project context will produce the same phantom errors LSP was meant to
|
||||
eliminate.
|
||||
"""
|
||||
markers = PROJECT_ROOT_MARKERS.get(language_id)
|
||||
if not markers:
|
||||
return None
|
||||
try:
|
||||
start = Path(file_path).expanduser().resolve()
|
||||
except (OSError, RuntimeError):
|
||||
return None
|
||||
cursor = start.parent if start.is_file() or not start.exists() else start
|
||||
visited = 0
|
||||
while True:
|
||||
for marker in markers:
|
||||
if (cursor / marker).exists():
|
||||
return str(cursor)
|
||||
parent = cursor.parent
|
||||
if parent == cursor:
|
||||
return None
|
||||
cursor = parent
|
||||
visited += 1
|
||||
# Defensive cap: the deepest reasonable monorepo is well under this.
|
||||
if visited > 64:
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# LSP framing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_HEADER_TERMINATOR = b"\r\n\r\n"
|
||||
|
||||
|
||||
def _encode_message(payload: dict[str, Any]) -> bytes:
|
||||
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
||||
header = f"Content-Length: {len(body)}\r\n\r\n".encode("ascii")
|
||||
return header + body
|
||||
|
||||
|
||||
def _read_message(stream) -> Optional[dict[str, Any]]:
|
||||
"""Read a single LSP message from *stream*. Returns ``None`` at EOF."""
|
||||
header = b""
|
||||
while _HEADER_TERMINATOR not in header:
|
||||
chunk = stream.read(1)
|
||||
if not chunk:
|
||||
return None
|
||||
header += chunk
|
||||
# Pathological server: header floods without terminator. Cap so we
|
||||
# do not spin forever consuming bytes.
|
||||
if len(header) > 8192:
|
||||
raise RuntimeError("LSP header exceeded 8 KiB without terminator")
|
||||
header_text, _, _ = header.partition(_HEADER_TERMINATOR)
|
||||
content_length: Optional[int] = None
|
||||
for line in header_text.decode("ascii", errors="replace").splitlines():
|
||||
name, _, value = line.partition(":")
|
||||
if name.strip().lower() == "content-length":
|
||||
try:
|
||||
content_length = int(value.strip())
|
||||
except ValueError:
|
||||
content_length = None
|
||||
if content_length is None or content_length < 0:
|
||||
raise RuntimeError(f"LSP header missing Content-Length: {header_text!r}")
|
||||
body = b""
|
||||
while len(body) < content_length:
|
||||
chunk = stream.read(content_length - len(body))
|
||||
if not chunk:
|
||||
return None
|
||||
body += chunk
|
||||
try:
|
||||
return json.loads(body.decode("utf-8"))
|
||||
except json.JSONDecodeError as exc:
|
||||
raise RuntimeError(f"LSP body was not valid JSON: {exc}") from exc
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Diagnostic shape (subset of the LSP type)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@dataclass
|
||||
class Diagnostic:
|
||||
"""Subset of the LSP ``Diagnostic`` we surface to the lint layer."""
|
||||
|
||||
line: int # 1-based; LSP is 0-based and we convert at parse time.
|
||||
column: int # 1-based for the same reason.
|
||||
severity: int = 1 # 1=error, 2=warning, 3=info, 4=hint
|
||||
message: str = ""
|
||||
source: str = ""
|
||||
code: str = ""
|
||||
|
||||
@classmethod
|
||||
def from_lsp(cls, raw: dict[str, Any]) -> "Diagnostic":
|
||||
rng = raw.get("range") or {}
|
||||
start = rng.get("start") or {}
|
||||
line = int(start.get("line", 0)) + 1
|
||||
col = int(start.get("character", 0)) + 1
|
||||
code = raw.get("code", "")
|
||||
return cls(
|
||||
line=line,
|
||||
column=col,
|
||||
severity=int(raw.get("severity", 1)),
|
||||
message=str(raw.get("message", "")).strip(),
|
||||
source=str(raw.get("source", "")),
|
||||
code=str(code) if code != "" else "",
|
||||
)
|
||||
|
||||
def format(self, path: str) -> str:
|
||||
sev = {1: "error", 2: "warning", 3: "info", 4: "hint"}.get(self.severity, "error")
|
||||
prefix = f"{path}:{self.line}:{self.column}"
|
||||
tail = f" [{self.source}{':' + self.code if self.code else ''}]" if self.source else ""
|
||||
return f"{prefix}: {sev}: {self.message}{tail}"
|
||||
|
||||
|
||||
def format_diagnostics(path: str, diags: Iterable[Diagnostic]) -> str:
|
||||
"""Render a diagnostic list to the same `path:line:col: msg` shape that
|
||||
the existing shell linters produce, so ``_check_lint_delta`` can keep
|
||||
diffing pre/post by line equality."""
|
||||
return "\n".join(d.format(path) for d in diags)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pending-request bookkeeping
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@dataclass
|
||||
class _Pending:
|
||||
event: threading.Event = field(default_factory=threading.Event)
|
||||
result: Any = None
|
||||
error: Any = None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Client
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class LspClient:
|
||||
"""Long-lived stdio JSON-RPC client for one (server, project root)."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
server_cmd: Sequence[str],
|
||||
project_root: str,
|
||||
language_id: str,
|
||||
*,
|
||||
startup_timeout: float = 15.0,
|
||||
) -> None:
|
||||
self.server_cmd = list(server_cmd)
|
||||
self.project_root = project_root
|
||||
self.language_id = language_id
|
||||
self.startup_timeout = startup_timeout
|
||||
|
||||
self._proc: Optional[subprocess.Popen] = None
|
||||
self._writer_lock = threading.Lock()
|
||||
self._reader_thread: Optional[threading.Thread] = None
|
||||
self._next_id = 1
|
||||
self._next_id_lock = threading.Lock()
|
||||
self._pending: dict[int, _Pending] = {}
|
||||
self._pending_lock = threading.Lock()
|
||||
self._diagnostics: dict[str, list[Diagnostic]] = {}
|
||||
self._diag_events: dict[str, threading.Event] = {}
|
||||
self._diag_lock = threading.Lock()
|
||||
self._closed = False
|
||||
self._last_used = time.monotonic()
|
||||
|
||||
# -- lifecycle ----------------------------------------------------------
|
||||
|
||||
def start(self) -> None:
|
||||
if self._proc is not None:
|
||||
return
|
||||
try:
|
||||
self._proc = subprocess.Popen(
|
||||
self.server_cmd,
|
||||
cwd=self.project_root,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
# Capture stderr separately so server-side noise (rust-analyzer
|
||||
# progress, ts-server compile warnings) never corrupts the
|
||||
# JSON-RPC stream on stdout.
|
||||
stderr=subprocess.DEVNULL,
|
||||
bufsize=0,
|
||||
)
|
||||
except (FileNotFoundError, PermissionError, OSError) as exc:
|
||||
raise RuntimeError(f"failed to spawn LSP server {self.server_cmd[0]!r}: {exc}") from exc
|
||||
|
||||
self._reader_thread = threading.Thread(
|
||||
target=self._reader_loop,
|
||||
name=f"lsp-reader[{self.language_id}]",
|
||||
daemon=True,
|
||||
)
|
||||
self._reader_thread.start()
|
||||
|
||||
try:
|
||||
self._initialize()
|
||||
except Exception:
|
||||
self.shutdown()
|
||||
raise
|
||||
|
||||
def shutdown(self) -> None:
|
||||
if self._closed:
|
||||
return
|
||||
self._closed = True
|
||||
# Best-effort graceful shutdown: send `shutdown` + `exit`. If the
|
||||
# server is unresponsive we just kill the process — the reader
|
||||
# thread is daemonized and will die with stdout EOF.
|
||||
proc = self._proc
|
||||
if proc is not None and proc.poll() is None:
|
||||
try:
|
||||
self._request("shutdown", None, timeout=2.0)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
self._notify("exit", None)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
proc.wait(timeout=2.0)
|
||||
except subprocess.TimeoutExpired:
|
||||
try:
|
||||
proc.kill()
|
||||
except Exception:
|
||||
pass
|
||||
self._proc = None
|
||||
|
||||
# -- JSON-RPC ----------------------------------------------------------
|
||||
|
||||
def _allocate_id(self) -> int:
|
||||
with self._next_id_lock:
|
||||
value = self._next_id
|
||||
self._next_id += 1
|
||||
return value
|
||||
|
||||
def _send(self, payload: dict[str, Any]) -> None:
|
||||
proc = self._proc
|
||||
if proc is None or proc.stdin is None:
|
||||
raise RuntimeError("LSP client not started")
|
||||
data = _encode_message(payload)
|
||||
with self._writer_lock:
|
||||
try:
|
||||
proc.stdin.write(data)
|
||||
proc.stdin.flush()
|
||||
except (BrokenPipeError, OSError) as exc:
|
||||
raise RuntimeError(f"LSP server stdin closed: {exc}") from exc
|
||||
|
||||
def _notify(self, method: str, params: Any) -> None:
|
||||
self._send({"jsonrpc": "2.0", "method": method, "params": params})
|
||||
|
||||
def _request(self, method: str, params: Any, *, timeout: float) -> Any:
|
||||
request_id = self._allocate_id()
|
||||
pending = _Pending()
|
||||
with self._pending_lock:
|
||||
self._pending[request_id] = pending
|
||||
try:
|
||||
self._send({
|
||||
"jsonrpc": "2.0",
|
||||
"id": request_id,
|
||||
"method": method,
|
||||
"params": params,
|
||||
})
|
||||
if not pending.event.wait(timeout):
|
||||
raise TimeoutError(f"LSP {method!r} timed out after {timeout:.1f}s")
|
||||
if pending.error is not None:
|
||||
raise RuntimeError(f"LSP {method!r} error: {pending.error}")
|
||||
return pending.result
|
||||
finally:
|
||||
with self._pending_lock:
|
||||
self._pending.pop(request_id, None)
|
||||
|
||||
# -- reader -------------------------------------------------------------
|
||||
|
||||
def _reader_loop(self) -> None:
|
||||
proc = self._proc
|
||||
if proc is None or proc.stdout is None:
|
||||
return
|
||||
try:
|
||||
while not self._closed:
|
||||
try:
|
||||
msg = _read_message(proc.stdout)
|
||||
except RuntimeError as exc:
|
||||
logger.debug("LSP reader bailing on %s: %s", self.server_cmd[0], exc)
|
||||
break
|
||||
if msg is None:
|
||||
break
|
||||
self._handle_message(msg)
|
||||
finally:
|
||||
# Wake up every waiter so callers do not hang forever when the
|
||||
# server dies mid-request.
|
||||
with self._pending_lock:
|
||||
for pending in self._pending.values():
|
||||
if pending.error is None and pending.result is None:
|
||||
pending.error = "server stream closed"
|
||||
pending.event.set()
|
||||
with self._diag_lock:
|
||||
for event in self._diag_events.values():
|
||||
event.set()
|
||||
|
||||
def _handle_message(self, msg: dict[str, Any]) -> None:
|
||||
if "id" in msg and ("result" in msg or "error" in msg):
|
||||
try:
|
||||
request_id = int(msg["id"])
|
||||
except (TypeError, ValueError):
|
||||
return
|
||||
with self._pending_lock:
|
||||
pending = self._pending.get(request_id)
|
||||
if pending is None:
|
||||
return
|
||||
pending.result = msg.get("result")
|
||||
pending.error = msg.get("error")
|
||||
pending.event.set()
|
||||
return
|
||||
|
||||
method = msg.get("method")
|
||||
if method == "textDocument/publishDiagnostics":
|
||||
params = msg.get("params") or {}
|
||||
uri = params.get("uri")
|
||||
raw_list = params.get("diagnostics") or []
|
||||
if not isinstance(uri, str):
|
||||
return
|
||||
parsed = [Diagnostic.from_lsp(d) for d in raw_list if isinstance(d, dict)]
|
||||
with self._diag_lock:
|
||||
self._diagnostics[uri] = parsed
|
||||
event = self._diag_events.get(uri)
|
||||
if event is not None:
|
||||
event.set()
|
||||
return
|
||||
|
||||
# Server-issued requests (e.g. workspace/configuration). Reply with
|
||||
# a generic null so the server stops waiting. We do not implement
|
||||
# any server features beyond what's needed to coax diagnostics out.
|
||||
if "id" in msg and "method" in msg:
|
||||
self._send({"jsonrpc": "2.0", "id": msg["id"], "result": None})
|
||||
|
||||
# -- handshake ----------------------------------------------------------
|
||||
|
||||
def _initialize(self) -> None:
|
||||
root_uri = path_to_uri(self.project_root)
|
||||
params = {
|
||||
"processId": os.getpid(),
|
||||
"clientInfo": {"name": "hermes-agent", "version": "1"},
|
||||
"rootUri": root_uri,
|
||||
"rootPath": self.project_root,
|
||||
"workspaceFolders": [{"uri": root_uri, "name": Path(self.project_root).name}],
|
||||
"capabilities": {
|
||||
"textDocument": {
|
||||
"synchronization": {"didSave": True, "willSave": False, "dynamicRegistration": False},
|
||||
"publishDiagnostics": {"relatedInformation": False, "versionSupport": False},
|
||||
},
|
||||
"workspace": {
|
||||
"workspaceFolders": True,
|
||||
"configuration": True,
|
||||
},
|
||||
},
|
||||
"initializationOptions": {},
|
||||
}
|
||||
self._request("initialize", params, timeout=self.startup_timeout)
|
||||
self._notify("initialized", {})
|
||||
|
||||
# -- public API ---------------------------------------------------------
|
||||
|
||||
def diagnostics(
|
||||
self,
|
||||
file_path: str,
|
||||
content: str,
|
||||
*,
|
||||
timeout: float = 10.0,
|
||||
settle_ms: int = 400,
|
||||
) -> list[Diagnostic]:
|
||||
"""Open *file_path* with *content* and collect one diagnostic snapshot.
|
||||
|
||||
The flow:
|
||||
1. Register a fresh wait-event for the URI.
|
||||
2. ``didOpen`` with the new content (LSP version 1).
|
||||
3. Block until the first ``publishDiagnostics`` for the URI or
|
||||
*timeout* seconds elapse.
|
||||
4. Sleep *settle_ms* milliseconds and re-snapshot, since some
|
||||
servers (notably ``typescript-language-server``) emit a
|
||||
"no diagnostics yet" notification while the program graph is
|
||||
still loading and only the second batch carries real errors.
|
||||
5. ``didClose`` so the server frees the buffer.
|
||||
"""
|
||||
if self._closed or self._proc is None:
|
||||
raise RuntimeError("LSP client is not running")
|
||||
|
||||
uri = path_to_uri(file_path)
|
||||
event = threading.Event()
|
||||
with self._diag_lock:
|
||||
self._diagnostics.pop(uri, None)
|
||||
self._diag_events[uri] = event
|
||||
|
||||
try:
|
||||
self._notify("textDocument/didOpen", {
|
||||
"textDocument": {
|
||||
"uri": uri,
|
||||
"languageId": self.language_id,
|
||||
"version": 1,
|
||||
"text": content,
|
||||
},
|
||||
})
|
||||
self._last_used = time.monotonic()
|
||||
|
||||
event.wait(timeout)
|
||||
|
||||
if settle_ms > 0:
|
||||
time.sleep(settle_ms / 1000.0)
|
||||
|
||||
with self._diag_lock:
|
||||
diags = list(self._diagnostics.get(uri, []))
|
||||
finally:
|
||||
with self._diag_lock:
|
||||
self._diag_events.pop(uri, None)
|
||||
try:
|
||||
self._notify("textDocument/didClose", {"textDocument": {"uri": uri}})
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
return diags
|
||||
|
||||
@property
|
||||
def last_used(self) -> float:
|
||||
return self._last_used
|
||||
|
||||
@property
|
||||
def is_alive(self) -> bool:
|
||||
proc = self._proc
|
||||
return not self._closed and proc is not None and proc.poll() is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# URI helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def path_to_uri(path: str) -> str:
|
||||
"""Convert a filesystem path to a ``file://`` URI.
|
||||
|
||||
Uses ``Path.as_uri`` after resolving so symlinks and ``..`` components
|
||||
do not produce two URIs for the same file (which would cause the
|
||||
diagnostics dict to miss the response).
|
||||
"""
|
||||
try:
|
||||
resolved = Path(path).expanduser().resolve()
|
||||
except (OSError, RuntimeError):
|
||||
resolved = Path(path).expanduser()
|
||||
try:
|
||||
return resolved.as_uri()
|
||||
except ValueError:
|
||||
# ``as_uri`` rejects relative paths; the resolve() above almost
|
||||
# always prevents this, but fall back to a manual file:// prefix
|
||||
# so we never raise from inside the lint hot path.
|
||||
return "file://" + str(resolved).replace(os.sep, "/")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Process-wide registry + idle reaper
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
_clients_lock = threading.Lock()
|
||||
_clients: dict[tuple[str, str, tuple[str, ...]], LspClient] = {}
|
||||
_reaper_thread: Optional[threading.Thread] = None
|
||||
_reaper_stop = threading.Event()
|
||||
_reaper_idle_seconds = 600.0
|
||||
_reaper_interval = 30.0
|
||||
|
||||
|
||||
def get_or_start_client(
|
||||
language_id: str,
|
||||
project_root: str,
|
||||
server_cmd: Sequence[str],
|
||||
*,
|
||||
idle_seconds: float = 600.0,
|
||||
) -> Optional[LspClient]:
|
||||
"""Return a started client for the (language, root, cmd) triple.
|
||||
|
||||
Returns ``None`` if the server binary is missing or fails to start —
|
||||
callers are expected to treat that as "fall back to shell linter"
|
||||
rather than a hard error.
|
||||
"""
|
||||
if not server_cmd:
|
||||
return None
|
||||
binary = server_cmd[0]
|
||||
if not _which(binary):
|
||||
return None
|
||||
|
||||
key = (language_id, project_root, tuple(server_cmd))
|
||||
with _clients_lock:
|
||||
client = _clients.get(key)
|
||||
if client is not None and client.is_alive:
|
||||
return client
|
||||
if client is not None:
|
||||
# Stale entry — server crashed or was reaped. Drop it before
|
||||
# respawning so we never hand a dead client back to a caller.
|
||||
_clients.pop(key, None)
|
||||
|
||||
client = LspClient(server_cmd, project_root, language_id)
|
||||
try:
|
||||
client.start()
|
||||
except Exception as exc:
|
||||
logger.warning("LSP server %s failed to start: %s", server_cmd[0], exc)
|
||||
return None
|
||||
_clients[key] = client
|
||||
global _reaper_idle_seconds
|
||||
_reaper_idle_seconds = max(_reaper_idle_seconds, idle_seconds)
|
||||
_ensure_reaper()
|
||||
return client
|
||||
|
||||
|
||||
def shutdown_all_clients() -> None:
|
||||
"""Stop every cached server. Intended for tests and process exit."""
|
||||
with _clients_lock:
|
||||
clients = list(_clients.values())
|
||||
_clients.clear()
|
||||
for client in clients:
|
||||
try:
|
||||
client.shutdown()
|
||||
except Exception:
|
||||
pass
|
||||
_reaper_stop.set()
|
||||
|
||||
|
||||
def _which(binary: str) -> Optional[str]:
|
||||
# Honour absolute / relative paths verbatim — ``shutil.which`` only
|
||||
# consults PATH, and a user pointing at ``./bin/typescript-language-server``
|
||||
# in their config should still work.
|
||||
if os.sep in binary or (os.altsep and os.altsep in binary):
|
||||
return binary if os.path.isfile(binary) and os.access(binary, os.X_OK) else None
|
||||
return shutil.which(binary)
|
||||
|
||||
|
||||
def _ensure_reaper() -> None:
|
||||
global _reaper_thread
|
||||
if _reaper_thread is not None and _reaper_thread.is_alive():
|
||||
return
|
||||
_reaper_stop.clear()
|
||||
_reaper_thread = threading.Thread(
|
||||
target=_reaper_loop,
|
||||
name="lsp-reaper",
|
||||
daemon=True,
|
||||
)
|
||||
_reaper_thread.start()
|
||||
|
||||
|
||||
def _reaper_loop() -> None:
|
||||
while not _reaper_stop.wait(_reaper_interval):
|
||||
now = time.monotonic()
|
||||
idle_cutoff = now - _reaper_idle_seconds
|
||||
to_close: list[LspClient] = []
|
||||
with _clients_lock:
|
||||
for key, client in list(_clients.items()):
|
||||
if not client.is_alive or client.last_used < idle_cutoff:
|
||||
_clients.pop(key, None)
|
||||
to_close.append(client)
|
||||
for client in to_close:
|
||||
try:
|
||||
client.shutdown()
|
||||
except Exception:
|
||||
pass
|
||||
346
tools/lsp_lint.py
Normal file
346
tools/lsp_lint.py
Normal file
@@ -0,0 +1,346 @@
|
||||
"""Bridge between ``ShellFileOperations._check_lint`` and ``tools.lsp_client``.
|
||||
|
||||
Kept in its own module so the change to ``file_operations.py`` is a single
|
||||
import + one branch in ``_check_lint`` — easy to audit, easy to revert.
|
||||
|
||||
Behavioural contract for ``maybe_lint_via_lsp``:
|
||||
* Returns ``None`` whenever LSP cannot or should not handle this file
|
||||
(feature off, language not configured, env not local, no project
|
||||
root, server missing, request timed out). The caller falls back to
|
||||
the existing in-process / shell linter exactly as before.
|
||||
* Returns a ``LintResult`` only when the LSP path produced a real
|
||||
verdict — clean or with diagnostics.
|
||||
|
||||
We never raise. The lint hook is on the hot edit path and any exception
|
||||
here would surface as a write failure rather than a degraded warning.
|
||||
|
||||
Observability
|
||||
-------------
|
||||
Calls emit structured log lines on the dedicated ``hermes.lint.lsp``
|
||||
logger so live agent sessions can answer "did the LSP path fire on
|
||||
that edit?" with a single ``rg``. Levels are tuned for a 1000-write
|
||||
session — steady state is silent at the default INFO threshold, only
|
||||
state changes and action-required failures escape:
|
||||
|
||||
* ``DEBUG`` (default agent.log threshold = INFO, so invisible) for
|
||||
every per-call event that has no novel signal: ``clean``, ``feature
|
||||
off``, ``extension not mapped``, ``backend not local``, repeated
|
||||
``no project root`` for an already-announced file, repeated ``server
|
||||
unavailable`` for an already-announced binary.
|
||||
* ``INFO`` for state transitions worth surfacing exactly once:
|
||||
``active for <root>`` the first time a (language, project_root)
|
||||
client starts, ``no project root for <path>`` the first time we see
|
||||
that file. Plus every ``N diags`` event (the failure signal users
|
||||
actually want — these are inherently rare and per-edit).
|
||||
* ``WARNING`` for action-required failures the first time they happen
|
||||
per (language, binary): ``server unavailable`` (binary not on PATH),
|
||||
``no server configured``. Per-call ``WARNING`` for timeouts, server
|
||||
errors, and unexpected bridge exceptions — these are inherently
|
||||
novel events, not steady state.
|
||||
|
||||
Net result: an agent that writes 1000 clean ``.ts`` files in one
|
||||
project session emits ONE INFO line (``active for <root>``) and 1000
|
||||
DEBUG lines that never reach ``agent.log`` at default level.
|
||||
|
||||
Grep recipe::
|
||||
|
||||
tail -f ~/.hermes/logs/agent.log | rg 'lsp\\['
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
from typing import TYPE_CHECKING, Any, Optional
|
||||
|
||||
from tools.lsp_client import (
|
||||
Diagnostic,
|
||||
find_project_root,
|
||||
format_diagnostics,
|
||||
get_or_start_client,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from tools.file_operations import LintResult
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
# Dedicated logger name so it survives a ``logging.getLogger(__name__)``
|
||||
# rename of the module without breaking the documented grep recipe.
|
||||
_event_log = logging.getLogger("hermes.lint.lsp")
|
||||
|
||||
|
||||
def _short_path(file_path: str) -> str:
|
||||
"""Render *file_path* relative to the cwd when sensible, else absolute.
|
||||
|
||||
Keeps the log line readable for the common case (the user is inside
|
||||
the project they're editing) without emitting brittle ``../../..``
|
||||
chains for the cross-tree case.
|
||||
"""
|
||||
try:
|
||||
rel = os.path.relpath(file_path)
|
||||
except ValueError:
|
||||
return file_path
|
||||
if rel.startswith(".." + os.sep) or rel == "..":
|
||||
return file_path
|
||||
return rel
|
||||
|
||||
|
||||
def _log(language_id: str, level: int, message: str) -> None:
|
||||
_event_log.log(level, "lsp[%s] %s", language_id, message)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Once-per-X dedup for steady-state events
|
||||
# ---------------------------------------------------------------------------
|
||||
#
|
||||
# Why these aren't bounded LRU caches: each set grows at most by the number
|
||||
# of distinct (language, project_root) and (language, file_path) pairs the
|
||||
# user touches in a single Python process. Even an aggressive monorepo
|
||||
# session is well under a few hundred entries — bytes of memory, not MB.
|
||||
# Bounded eviction would risk re-firing INFO/WARNING lines for the same
|
||||
# event, which is exactly what these caches exist to prevent.
|
||||
|
||||
_announce_lock = threading.Lock()
|
||||
_announced_active: set[tuple[str, str]] = set()
|
||||
_announced_unavailable: set[tuple[str, str]] = set()
|
||||
_announced_no_root: set[tuple[str, str]] = set()
|
||||
|
||||
|
||||
def _reset_announce_caches() -> None:
|
||||
"""Clear the dedup caches. Test-only — production code never calls this."""
|
||||
with _announce_lock:
|
||||
_announced_active.clear()
|
||||
_announced_unavailable.clear()
|
||||
_announced_no_root.clear()
|
||||
|
||||
|
||||
def _announce_once(bucket: set[tuple[str, str]], key: tuple[str, str]) -> bool:
|
||||
"""Return True if *key* has not been announced for *bucket* yet.
|
||||
|
||||
Atomically marks the key as announced as a side effect, so concurrent
|
||||
callers cannot both win the race and double-log.
|
||||
"""
|
||||
with _announce_lock:
|
||||
if key in bucket:
|
||||
return False
|
||||
bucket.add(key)
|
||||
return True
|
||||
|
||||
|
||||
# Map file extension → LSP language id. Mirrors the ``LINTERS`` table in
|
||||
# file_operations.py but only covers languages where we have an opinion
|
||||
# about which server to use. New extensions added here MUST also appear
|
||||
# in the ``servers`` config block below or in the user override.
|
||||
_EXT_TO_LANGUAGE: dict[str, str] = {
|
||||
".ts": "typescript",
|
||||
".tsx": "typescriptreact",
|
||||
".mts": "typescript",
|
||||
".cts": "typescript",
|
||||
".js": "javascript",
|
||||
".jsx": "javascriptreact",
|
||||
".mjs": "javascript",
|
||||
".cjs": "javascript",
|
||||
".rs": "rust",
|
||||
".go": "go",
|
||||
}
|
||||
|
||||
|
||||
# Default server command per language. Every entry can be overridden via
|
||||
# ``lint.lsp.servers.<language>`` in config.yaml. Keep these to the
|
||||
# upstream-recommended invocation; users who want overlay configurations
|
||||
# (extra args, alternate binaries) live in their own config.
|
||||
_DEFAULT_SERVERS: dict[str, list[str]] = {
|
||||
"typescript": ["typescript-language-server", "--stdio"],
|
||||
"typescriptreact": ["typescript-language-server", "--stdio"],
|
||||
"javascript": ["typescript-language-server", "--stdio"],
|
||||
"javascriptreact": ["typescript-language-server", "--stdio"],
|
||||
"rust": ["rust-analyzer"],
|
||||
"go": ["gopls"],
|
||||
}
|
||||
|
||||
|
||||
def _load_lsp_config() -> dict[str, Any]:
|
||||
"""Read ``lint.lsp`` from config.yaml, with deep defaults baked in.
|
||||
|
||||
Cached imports are deliberately avoided here: the lint hook runs
|
||||
inside long-lived agent loops and a stale cache would prevent users
|
||||
from toggling the flag mid-session via ``/config reload``. The yaml
|
||||
parse cost is negligible compared to the actual lint subprocess.
|
||||
"""
|
||||
try:
|
||||
from hermes_cli.config import load_config
|
||||
except Exception:
|
||||
return {}
|
||||
try:
|
||||
cfg = load_config() or {}
|
||||
except Exception:
|
||||
return {}
|
||||
lint_cfg = cfg.get("lint")
|
||||
if not isinstance(lint_cfg, dict):
|
||||
return {}
|
||||
lsp_cfg = lint_cfg.get("lsp")
|
||||
return lsp_cfg if isinstance(lsp_cfg, dict) else {}
|
||||
|
||||
|
||||
def _resolve_server_cmd(language_id: str, lsp_cfg: dict[str, Any]) -> Optional[list[str]]:
|
||||
overrides = lsp_cfg.get("servers")
|
||||
if isinstance(overrides, dict):
|
||||
candidate = overrides.get(language_id)
|
||||
if isinstance(candidate, str) and candidate.strip():
|
||||
return candidate.split()
|
||||
if isinstance(candidate, list) and candidate:
|
||||
return [str(part) for part in candidate]
|
||||
return list(_DEFAULT_SERVERS.get(language_id, []))
|
||||
|
||||
|
||||
def _is_local_env(env: object) -> bool:
|
||||
"""LSP only routes to local files in this PR.
|
||||
|
||||
Rationale lives in lsp_client.py: container/SSH backends need the
|
||||
server inside the sandbox, which is a separate engineering effort.
|
||||
Until that lands, anything non-local falls through to the shell
|
||||
linter so we don't pretend to lint a file the host process can't
|
||||
even open.
|
||||
"""
|
||||
try:
|
||||
from tools.environments.local import LocalEnvironment
|
||||
except Exception:
|
||||
return False
|
||||
return isinstance(env, LocalEnvironment)
|
||||
|
||||
|
||||
def maybe_lint_via_lsp(
|
||||
file_path: str,
|
||||
content: Optional[str],
|
||||
env: object,
|
||||
) -> Optional["LintResult"]:
|
||||
"""Try to produce a LintResult via LSP. Returns ``None`` when caller
|
||||
should fall through to the in-process / shell linter.
|
||||
|
||||
Args:
|
||||
file_path: Absolute path of the just-written file.
|
||||
content: Post-write file content. Required — we send it as the
|
||||
``didOpen`` text payload so the server lints the bytes the
|
||||
agent intended to land on disk, not whatever was there
|
||||
before the write completed.
|
||||
env: Terminal environment from ``ShellFileOperations.env``.
|
||||
We currently only handle ``LocalEnvironment``.
|
||||
"""
|
||||
if content is None:
|
||||
# No content means the caller (delta refinement against an absent
|
||||
# pre-write state) has nothing for didOpen — silent skip.
|
||||
return None
|
||||
|
||||
ext = os.path.splitext(file_path)[1].lower()
|
||||
language_id = _EXT_TO_LANGUAGE.get(ext)
|
||||
if language_id is None:
|
||||
# Most writes are .py / .json / .yaml — DEBUG so steady state stays
|
||||
# invisible at the default INFO threshold.
|
||||
_event_log.debug("lsp[?] skipped: extension %s not mapped (%s)", ext or "<none>", _short_path(file_path))
|
||||
return None
|
||||
|
||||
if not _is_local_env(env):
|
||||
_log(language_id, logging.DEBUG, f"skipped: backend not local ({_short_path(file_path)})")
|
||||
return None
|
||||
|
||||
lsp_cfg = _load_lsp_config()
|
||||
if not lsp_cfg.get("enabled"):
|
||||
_log(language_id, logging.DEBUG, f"skipped: feature off ({_short_path(file_path)})")
|
||||
return None
|
||||
|
||||
short = _short_path(file_path)
|
||||
|
||||
project_root = find_project_root(file_path, language_id)
|
||||
if project_root is None:
|
||||
# First time we see this orphan file → INFO so an opt-in user
|
||||
# learns about the coverage gap. Repeated writes to the same
|
||||
# orphan stay silent at DEBUG.
|
||||
if _announce_once(_announced_no_root, (language_id, file_path)):
|
||||
_log(language_id, logging.INFO, f"skipped: no project root for {short}")
|
||||
else:
|
||||
_log(language_id, logging.DEBUG, f"skipped: no project root for {short}")
|
||||
return None
|
||||
|
||||
server_cmd = _resolve_server_cmd(language_id, lsp_cfg)
|
||||
if not server_cmd:
|
||||
# Configuration error, not a runtime fluke. Once-per-language is
|
||||
# plenty — the missing block won't fix itself on the next write.
|
||||
if _announce_once(_announced_unavailable, (language_id, "<no-cmd>")):
|
||||
_log(language_id, logging.WARNING, f"skipped: no server configured (set lint.lsp.servers.{language_id})")
|
||||
return None
|
||||
binary_name = server_cmd[0]
|
||||
|
||||
idle_seconds = float(lsp_cfg.get("idle_shutdown") or 600.0)
|
||||
try:
|
||||
client = get_or_start_client(
|
||||
language_id,
|
||||
project_root,
|
||||
server_cmd,
|
||||
idle_seconds=idle_seconds,
|
||||
)
|
||||
except Exception as exc:
|
||||
if _announce_once(_announced_unavailable, (language_id, binary_name)):
|
||||
_log(language_id, logging.WARNING, f"skipped: spawn failed for {binary_name!r}: {exc}")
|
||||
else:
|
||||
_log(language_id, logging.DEBUG, f"skipped: spawn failed again for {binary_name!r}: {exc}")
|
||||
return None
|
||||
if client is None:
|
||||
# Most likely cause: binary not on PATH. WARNING the first time so
|
||||
# the opt-in user notices and installs it; DEBUG after so a session
|
||||
# of 1000 edits doesn't carpet ``errors.log`` with the same line.
|
||||
if _announce_once(_announced_unavailable, (language_id, binary_name)):
|
||||
_log(language_id, logging.WARNING, f"skipped: server unavailable ({binary_name!r} not on PATH)")
|
||||
else:
|
||||
_log(language_id, logging.DEBUG, f"skipped: server still unavailable ({binary_name!r})")
|
||||
return None
|
||||
|
||||
# First successful client for this (language, root) → one INFO line.
|
||||
# That's the opt-in user's "yes, LSP is wired up" confirmation.
|
||||
if _announce_once(_announced_active, (language_id, project_root)):
|
||||
_log(language_id, logging.INFO, f"active for {_short_path(project_root)} (server: {binary_name})")
|
||||
|
||||
timeout = float(lsp_cfg.get("diagnostic_timeout") or 10.0)
|
||||
settle_ms = int(lsp_cfg.get("settle_ms") or 400)
|
||||
try:
|
||||
diagnostics: list[Diagnostic] = client.diagnostics(
|
||||
file_path,
|
||||
content,
|
||||
timeout=timeout,
|
||||
settle_ms=settle_ms,
|
||||
)
|
||||
except TimeoutError as exc:
|
||||
# Timeouts are inherently distinct events (a hung server today is
|
||||
# not the same signal as a hung server yesterday), so each one
|
||||
# gets its own WARNING.
|
||||
_log(language_id, logging.WARNING, f"skipped: timeout after {timeout:.1f}s ({short}): {exc}")
|
||||
return None
|
||||
except RuntimeError as exc:
|
||||
_log(language_id, logging.WARNING, f"skipped: server error for {short}: {exc}")
|
||||
return None
|
||||
except Exception as exc: # noqa: BLE001 — never raise from the lint hot path
|
||||
_log(language_id, logging.WARNING, f"skipped: unexpected bridge error for {short}: {exc}")
|
||||
return None
|
||||
|
||||
# Filter to errors + warnings. Hints/info are skipped because the
|
||||
# existing shell linters never surface them, and surfacing them here
|
||||
# would change the agent's "lint clean / lint dirty" verdict mid-PR.
|
||||
blocking = [d for d in diagnostics if d.severity in (1, 2)]
|
||||
|
||||
# Lazy import to avoid a circular import at module load time.
|
||||
from tools.file_operations import LintResult
|
||||
|
||||
if not blocking:
|
||||
# Clean writes are the steady state on a healthy project — DEBUG
|
||||
# so a 1000-write session stays silent at the default threshold.
|
||||
_log(language_id, logging.DEBUG, f"clean ({short})")
|
||||
return LintResult(success=True, output="")
|
||||
# Diagnostics are inherently rare and per-edit, so each one is its
|
||||
# own INFO event — this is what the user actually wants to grep for.
|
||||
_log(language_id, logging.INFO, f"{len(blocking)} diag{'s' if len(blocking) != 1 else ''} ({short})")
|
||||
return LintResult(success=False, output=format_diagnostics(file_path, blocking))
|
||||
|
||||
|
||||
__all__ = [
|
||||
"maybe_lint_via_lsp",
|
||||
]
|
||||
Reference in New Issue
Block a user