Compare commits

...

6 Commits

Author SHA1 Message Date
alt-glitch
2b47b40c10 docs(lsp): add feature page — setup, CLI, supported languages, troubleshooting
Covers: enable flow, server installation (detect-only default vs
hermes lsp install), how diagnostics reach the model, config knobs,
all 26 supported languages, and troubleshooting common issues.
2026-05-12 15:23:47 +00:00
alt-glitch
b1a609fba3 chore: remove plan from PR (working document, not shipped) 2026-05-12 15:18:20 +00:00
alt-glitch
6d80aa80eb refactor(lsp): simplify __init__.py per /simplify review
- Remove dead _post_tool_call (body was only comments)
- Remove _on_session_start (redundant — _ensure_service lazy-inits)
- Remove _atexit_cleanup (duplicate of _on_session_end)
- Switch _baselines from dict to set (presence sentinel only)
- Remove redundant enabled_for recheck in transform_tool_result
- Remove V4A guard (path-empty check already covers it)
- Use modern type syntax (X | None, dict[], set[])
- Reduce from 322 → 217 lines, same behavior

77/77 tests pass.
2026-05-12 15:01:44 +00:00
alt-glitch
e0a1778028 fix(lsp): address review findings — TOCTOU, None guard, JSON safety
Fixes from Claude Code adversarial review:
- Snapshot _service to local var before .is_active() (TOCTOU fix)
- Guard session_id against None with 'or ""'
- Remove text-append fallback — only inject when result is dict JSON
- Add ValueError to json.dumps except clause
- Guard result=None with 'or ""' and isinstance check

Non-dict JSON results and non-JSON results are now left unmodified
(return None = no injection) rather than risking format corruption.
2026-05-12 13:13:53 +00:00
alt-glitch
40a9327248 fix(lsp): wire CLI subcommands via setup_lsp_parser for plugin registration
register_cli_command's setup_fn receives an already-created parser,
not the parent's SubParsersAction. Added setup_lsp_parser() that adds
subcommands (status, list, install, restart, which) to the provided
parser.

Verified: 'hermes lsp status' works from cold shell when plugin is
enabled in plugins.enabled config.
2026-05-12 13:08:49 +00:00
alt-glitch
23344a9a3c feat(lsp): plugin-based LSP diagnostics with zero core changes
Ship LSP semantic diagnostics as a bundled plugin (plugins/lsp/) using
existing hook system.  Zero lines of core code modified.

Plugin wiring:
- pre_tool_call: capture LSP baseline before write_file/patch
- transform_tool_result: inject diagnostics into tool result JSON
- on_session_start/on_session_end + atexit: lifecycle management

Key design:
- Baselines keyed by (session_id, abs_path) for concurrent safety
- Diagnostics added as 'lsp_diagnostics' JSON field (preserves shape)
- Per-file workspace detection (no static session-start gate)
- V4A multi-file patch skipped for MVP
- Short timeout (3s) — cold start degrades gracefully
- os.path.exists heuristic for Docker/SSH backend skip
- First relevant write with no server → INFO log with install hint

Tests: 77/77 pass including:
- Protocol framing, reporter formatting, workspace resolution
- Client E2E against mock LSP server (live_system_guard_bypass)
- Eventlog steady-state silence contract
- Backend-gate heuristic (local vs non-local paths)
- Full hook flow integration (pre→write→transform with diagnostics)

Source: PR #24168 by @teknium1, PR #24155 by @OutThisLife
Co-authored-by: Teknium <127238744+teknium1@users.noreply.github.com>
2026-05-12 13:01:13 +00:00
23 changed files with 5727 additions and 0 deletions

230
plugins/lsp/__init__.py Normal file
View File

@@ -0,0 +1,230 @@
"""LSP Plugin — semantic diagnostics from real language servers.
Hooks into write_file/patch via the Hermes plugin system to surface
type errors, undefined names, missing imports, and other semantic
issues detected by pyright, gopls, rust-analyzer, typescript-language-server,
and ~20 more.
Opt-in: add ``lsp`` to ``plugins.enabled`` in config.yaml.
"""
from __future__ import annotations
import atexit
import json
import logging
import os
import threading
from typing import Any
logger = logging.getLogger("plugins.lsp")
# Module-level state
_service: Any = None # LSPService | None
_service_lock = threading.Lock()
# Presence set: (session_id, abs_path) entries where a baseline was captured.
_baselines: set[tuple[str, str]] = set()
def register(ctx) -> None:
"""Plugin registration — wire hooks and CLI commands."""
ctx.register_hook("on_session_end", _on_session_end)
ctx.register_hook("pre_tool_call", _pre_tool_call)
ctx.register_hook("transform_tool_result", _transform_tool_result)
try:
from plugins.lsp.cli import setup_lsp_parser, run_lsp_command
ctx.register_cli_command(
name="lsp",
help="Language Server Protocol management",
setup_fn=setup_lsp_parser,
handler_fn=run_lsp_command,
)
except Exception as e:
logger.debug("LSP CLI registration failed: %s", e)
atexit.register(_on_session_end)
# ---------------------------------------------------------------------------
# Lifecycle
# ---------------------------------------------------------------------------
def _on_session_end(**kwargs) -> None:
"""Tear down all language servers and clear baselines."""
global _service
with _service_lock:
if _service is not None:
try:
_service.shutdown()
except Exception as e:
logger.debug("LSP shutdown error: %s", e)
_service = None
_baselines.clear()
# ---------------------------------------------------------------------------
# Tool hooks
# ---------------------------------------------------------------------------
def _pre_tool_call(**kwargs) -> None:
"""Snapshot LSP baseline before a file write."""
tool_name = kwargs.get("tool_name", "")
if tool_name not in ("write_file", "patch"):
return
svc = _ensure_service()
if svc is None:
return
args = _parse_args(kwargs.get("args"))
if args is None:
return
path = args.get("path", "")
if not path:
return
abs_path = _resolve_path(path)
# Best-effort local-only check: skip if parent dir doesn't exist on host
if not os.path.exists(os.path.dirname(abs_path) or "."):
return
if not svc.enabled_for(abs_path):
return
session_id = kwargs.get("session_id") or ""
key = (session_id, abs_path)
try:
svc.snapshot_baseline(abs_path)
_baselines.add(key)
except Exception as e:
logger.debug("LSP baseline snapshot failed for %s: %s", abs_path, e)
def _transform_tool_result(**kwargs) -> str | None:
"""Inject LSP diagnostics into the tool result JSON.
Returns modified result string with ``lsp_diagnostics`` field,
or None to leave unchanged.
"""
tool_name = kwargs.get("tool_name", "")
if tool_name not in ("write_file", "patch"):
return None
svc = _service
if svc is None or not svc.is_active():
return None
args = _parse_args(kwargs.get("args"))
if args is None:
return None
path = args.get("path", "")
if not path:
return None
abs_path = _resolve_path(path)
session_id = kwargs.get("session_id") or ""
key = (session_id, abs_path)
if key not in _baselines:
return None
_baselines.discard(key)
# Fetch diagnostics with short timeout
try:
diagnostics = svc.get_diagnostics_sync(abs_path, delta=True, timeout=3.0)
except Exception as e:
logger.debug("LSP diagnostics fetch failed for %s: %s", abs_path, e)
return None
if not diagnostics:
return None
# Format
try:
from plugins.lsp.reporter import report_for_file, truncate
block = report_for_file(abs_path, diagnostics)
if not block:
return None
lsp_output = truncate(block)
except Exception:
return None
# Inject into result JSON (only when result is a JSON dict)
result = kwargs.get("result")
if not isinstance(result, str):
return None
try:
result_data = json.loads(result)
if not isinstance(result_data, dict):
return None
result_data["lsp_diagnostics"] = lsp_output
return json.dumps(result_data, ensure_ascii=False)
except (json.JSONDecodeError, TypeError, ValueError):
return None
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _ensure_service():
"""Lazy-initialize the LSP service singleton."""
global _service
svc = _service
if svc is not None:
return svc if svc.is_active() else None
with _service_lock:
if _service is not None:
return _service if _service.is_active() else None
try:
from plugins.lsp.manager import LSPService
_service = LSPService.create_from_config()
except Exception as e:
logger.debug("LSP service creation failed: %s", e)
return None
return _service if (_service and _service.is_active()) else None
def _parse_args(args) -> dict[str, Any] | None:
"""Normalize args (may be dict or JSON string)."""
if isinstance(args, dict):
return args
if isinstance(args, str):
try:
parsed = json.loads(args)
if isinstance(parsed, dict):
return parsed
except (json.JSONDecodeError, TypeError):
pass
return None
def _resolve_path(path: str) -> str:
"""Expand and absolutify a path."""
expanded = os.path.expanduser(path)
if not os.path.isabs(expanded):
expanded = os.path.join(os.getcwd(), expanded)
return os.path.normpath(expanded)
# ---------------------------------------------------------------------------
# Public API (used by plugins/lsp/cli.py)
# ---------------------------------------------------------------------------
def get_service():
"""Return the active LSP service or None."""
svc = _service
return svc if (svc is not None and svc.is_active()) else None
def shutdown_service() -> None:
"""Tear down the LSP service (idempotent)."""
_on_session_end()

313
plugins/lsp/cli.py Normal file
View File

@@ -0,0 +1,313 @@
"""``hermes lsp`` CLI subcommand.
Subcommands:
- ``status`` — show service state, configured servers, install status.
- ``install <server_id>`` — eagerly install one server's binary.
- ``install-all`` — try to install every server with a known recipe.
- ``restart`` — tear down running clients so the next edit re-spawns.
- ``which <server_id>`` — print the resolved binary path for one server.
- ``list`` — print the registry of supported servers.
The handlers are kept here (rather than in
``hermes_cli/main.py``) so the LSP module ships self-contained.
"""
from __future__ import annotations
import argparse
import sys
from typing import Optional
def register_subparser(subparsers: argparse._SubParsersAction) -> None:
"""Wire the ``hermes lsp`` subcommand tree into the main argparse."""
parser = subparsers.add_parser(
"lsp",
help="Language Server Protocol management",
description=(
"Manage the LSP layer that powers post-write semantic "
"diagnostics in write_file/patch."
),
)
sub = parser.add_subparsers(dest="lsp_command")
sub_status = sub.add_parser("status", help="Show LSP service status")
sub_status.add_argument(
"--json", action="store_true", help="Emit machine-readable JSON"
)
sub_list = sub.add_parser("list", help="List supported language servers")
sub_list.add_argument(
"--installed-only",
action="store_true",
help="Only show servers whose binary is currently available",
)
sub_install = sub.add_parser("install", help="Install a server binary")
sub_install.add_argument("server", help="Server id (e.g. pyright, gopls)")
sub_install_all = sub.add_parser(
"install-all",
help="Install every server with a known auto-install recipe",
)
sub_install_all.add_argument(
"--include-manual",
action="store_true",
help="Even attempt servers marked manual-install (best effort)",
)
sub_restart = sub.add_parser(
"restart",
help="Tear down running LSP clients (next edit re-spawns)",
)
sub_which = sub.add_parser("which", help="Print binary path for a server")
sub_which.add_argument("server", help="Server id")
parser.set_defaults(func=run_lsp_command)
def setup_lsp_parser(parser: argparse.ArgumentParser) -> None:
"""Set up subcommands on an already-created 'lsp' parser.
Called by the plugin system's register_cli_command pathway, where
main.py creates the top-level ``hermes lsp`` parser and passes it
to us for subcommand wiring.
"""
sub = parser.add_subparsers(dest="lsp_command")
sub_status = sub.add_parser("status", help="Show LSP service status")
sub_status.add_argument(
"--json", action="store_true", help="Emit machine-readable JSON"
)
sub_list = sub.add_parser("list", help="List supported language servers")
sub_list.add_argument(
"--installed-only",
action="store_true",
help="Only show servers whose binary is currently available",
)
sub_install = sub.add_parser("install", help="Install a server binary")
sub_install.add_argument("server", help="Server id (e.g. pyright, gopls)")
sub_install_all = sub.add_parser(
"install-all",
help="Install every server with a known auto-install recipe",
)
sub_install_all.add_argument(
"--include-manual",
action="store_true",
help="Even attempt servers marked manual-install (best effort)",
)
sub_restart = sub.add_parser(
"restart",
help="Tear down running LSP clients (next edit re-spawns)",
)
sub_which = sub.add_parser("which", help="Print binary path for a server")
sub_which.add_argument("server", help="Server id")
def run_lsp_command(args: argparse.Namespace) -> int:
"""Top-level dispatcher for ``hermes lsp <subcommand>``."""
sub = getattr(args, "lsp_command", None) or "status"
try:
if sub == "status":
return _cmd_status(getattr(args, "json", False))
if sub == "list":
return _cmd_list(getattr(args, "installed_only", False))
if sub == "install":
return _cmd_install(args.server)
if sub == "install-all":
return _cmd_install_all(getattr(args, "include_manual", False))
if sub == "restart":
return _cmd_restart()
if sub == "which":
return _cmd_which(args.server)
sys.stderr.write(f"unknown lsp subcommand: {sub}\n")
return 2
except KeyboardInterrupt:
return 130
def _cmd_status(emit_json: bool) -> int:
from plugins.lsp import get_service
from plugins.lsp.servers import SERVERS
from plugins.lsp.install import detect_status
svc = get_service()
service_active = svc is not None
info = svc.get_status() if svc is not None else {"enabled": False}
if emit_json:
import json
payload = {
"service": info,
"registry": [
{
"server_id": s.server_id,
"extensions": list(s.extensions),
"description": s.description,
"binary_status": detect_status(_recipe_pkg_for(s.server_id)),
}
for s in SERVERS
],
}
sys.stdout.write(json.dumps(payload, indent=2) + "\n")
return 0
out = []
out.append("LSP Service")
out.append("===========")
out.append(f" enabled: {info.get('enabled', False)}")
if service_active:
out.append(f" wait_mode: {info.get('wait_mode')}")
out.append(f" wait_timeout: {info.get('wait_timeout')}s")
out.append(f" install_strategy:{info.get('install_strategy')}")
clients = info.get("clients") or []
if clients:
out.append(f" active clients: {len(clients)}")
for c in clients:
out.append(
f" - {c['server_id']:20s} state={c['state']:10s} root={c['workspace_root']}"
)
else:
out.append(" active clients: none")
broken = info.get("broken") or []
if broken:
out.append(f" broken pairs: {len(broken)}")
for b in broken:
out.append(f" - {b}")
disabled = info.get("disabled_servers") or []
if disabled:
out.append(f" disabled in cfg: {', '.join(disabled)}")
out.append("")
out.append("Registered Servers")
out.append("==================")
for s in SERVERS:
pkg = _recipe_pkg_for(s.server_id)
status = detect_status(pkg)
marker = {
"installed": "",
"missing": "·",
"manual-only": "?",
}.get(status, " ")
ext_summary = ", ".join(list(s.extensions)[:5])
if len(s.extensions) > 5:
ext_summary += f", … (+{len(s.extensions) - 5})"
out.append(
f" {marker} {s.server_id:24s} [{status:11s}] {ext_summary}"
)
if s.description:
out.append(f" {s.description}")
sys.stdout.write("\n".join(out) + "\n")
return 0
def _cmd_list(installed_only: bool) -> int:
from plugins.lsp.servers import SERVERS
from plugins.lsp.install import detect_status
for s in SERVERS:
pkg = _recipe_pkg_for(s.server_id)
status = detect_status(pkg)
if installed_only and status != "installed":
continue
sys.stdout.write(
f"{s.server_id:24s} [{status:11s}] {','.join(s.extensions)}\n"
)
return 0
def _cmd_install(server_id: str) -> int:
from plugins.lsp.install import try_install, INSTALL_RECIPES, detect_status
pkg = _recipe_pkg_for(server_id)
pre_status = detect_status(pkg)
if pre_status == "installed":
sys.stdout.write(f"{server_id} already installed\n")
return 0
sys.stdout.write(f"installing {server_id} (pkg={pkg}) ...\n")
sys.stdout.flush()
bin_path = try_install(pkg, "auto")
if bin_path is None:
recipe = INSTALL_RECIPES.get(pkg)
if recipe and recipe.get("strategy") == "manual":
sys.stderr.write(
f"{server_id}: this server requires a manual install. "
f"See documentation.\n"
)
else:
sys.stderr.write(f"{server_id}: install failed (see logs).\n")
return 1
sys.stdout.write(f"installed: {bin_path}\n")
return 0
def _cmd_install_all(include_manual: bool) -> int:
from plugins.lsp.servers import SERVERS
from plugins.lsp.install import try_install, INSTALL_RECIPES, detect_status
rc = 0
for s in SERVERS:
pkg = _recipe_pkg_for(s.server_id)
recipe = INSTALL_RECIPES.get(pkg)
if recipe is None:
continue
if recipe.get("strategy") == "manual" and not include_manual:
continue
if detect_status(pkg) == "installed":
sys.stdout.write(f" {s.server_id:24s} already installed\n")
continue
sys.stdout.write(f" installing {s.server_id} (pkg={pkg}) ... ")
sys.stdout.flush()
path = try_install(pkg, "auto")
if path:
sys.stdout.write(f"ok ({path})\n")
else:
sys.stdout.write("FAILED\n")
rc = 1
return rc
def _cmd_restart() -> int:
from plugins.lsp import shutdown_service
shutdown_service()
sys.stdout.write("LSP service shut down. Next edit will respawn clients.\n")
return 0
def _cmd_which(server_id: str) -> int:
from plugins.lsp.install import INSTALL_RECIPES, hermes_lsp_bin_dir
import os
import shutil as _shutil
recipe = INSTALL_RECIPES.get(server_id)
bin_name = (recipe or {}).get("bin", server_id)
staged = hermes_lsp_bin_dir() / bin_name
if staged.exists():
sys.stdout.write(str(staged) + "\n")
return 0
on_path = _shutil.which(bin_name)
if on_path:
sys.stdout.write(on_path + "\n")
return 0
sys.stderr.write(f"{server_id}: not installed\n")
return 1
def _recipe_pkg_for(server_id: str) -> str:
"""Map a registry ``server_id`` to its install-recipe package key."""
# The mapping lives here (not in install.py) because it's a CLI
# convenience layer. Most server_ids are also their own recipe
# key, but a few differ (e.g. ``vue-language-server`` →
# ``@vue/language-server``).
aliases = {
"vue-language-server": "@vue/language-server",
"astro-language-server": "@astrojs/language-server",
"dockerfile-ls": "dockerfile-language-server-nodejs",
"typescript": "typescript-language-server",
}
return aliases.get(server_id, server_id)

930
plugins/lsp/client.py Normal file
View File

@@ -0,0 +1,930 @@
"""Async LSP client over stdin/stdout.
One :class:`LSPClient` corresponds to one ``(language_server, workspace_root)``
pair — exactly what OpenCode keys clients on, and the same shape Claude
Code uses. The client owns a child process, drives the JSON-RPC
exchange, and exposes:
- :meth:`open_file` / :meth:`change_file` — text document sync
- :meth:`wait_for_diagnostics` — block until the server emits fresh
diagnostics for a specific file (or a timeout fires)
- :meth:`diagnostics_for` — read the current per-file diagnostic store
- :meth:`shutdown` — graceful close + SIGTERM/SIGKILL fallback
The class is designed for async use from a single asyncio event loop.
The :class:`agent.lsp.manager.LSPService` runs an event loop in a
background thread so the synchronous file_operations layer can call
into it via :func:`agent.lsp.manager.LSPService.touch_file`.
Implementation notes:
- Push diagnostics are stored per-URI in :attr:`_push_diagnostics` from
``textDocument/publishDiagnostics`` notifications. Pull diagnostics
go in :attr:`_pull_diagnostics`. The merged view dedupes by content.
- Whole-document sync. Even when the server advertises incremental
sync, we send a single ``contentChanges`` entry replacing the
entire document. Pretending to be incremental while sending a
full replacement is well-tolerated by every major server and saves
range bookkeeping. See OpenCode's ``client.ts:584-659`` for the
same trick.
- The "touch-file dance": every ``open_file`` call also fires a
``workspace/didChangeWatchedFiles`` notification (CREATED on the
first open, CHANGED thereafter). Some servers (clangd, eslint)
only re-scan when this notification fires, even though the LSP spec
doesn't strictly require it.
- ``ContentModified`` (-32801) errors get retried with exponential
backoff up to 3 times. This matches Claude Code's
``LSPServerInstance.sendRequest``.
"""
from __future__ import annotations
import asyncio
import logging
import os
from pathlib import Path
from typing import Any, Awaitable, Callable, Dict, List, Optional, Set
from urllib.parse import quote, unquote
from plugins.lsp.protocol import (
ERROR_CONTENT_MODIFIED,
ERROR_METHOD_NOT_FOUND,
LSPProtocolError,
LSPRequestError,
classify_message,
encode_message,
make_error_response,
make_notification,
make_request,
make_response,
read_message,
)
logger = logging.getLogger("agent.lsp.client")
# Timeouts (seconds) — mirror OpenCode's constants, scaled to seconds.
INITIALIZE_TIMEOUT = 45.0
DIAGNOSTICS_DOCUMENT_WAIT = 5.0
DIAGNOSTICS_FULL_WAIT = 10.0
DIAGNOSTICS_REQUEST_TIMEOUT = 3.0
PUSH_DEBOUNCE = 0.15
SHUTDOWN_GRACE = 1.0 # seconds between SIGTERM and SIGKILL
# Retry policy for transient ContentModified errors.
MAX_CONTENT_MODIFIED_RETRIES = 3
RETRY_BASE_DELAY = 0.5 # 0.5, 1.0, 2.0 — exponential
def file_uri(path: str) -> str:
"""Return ``file://`` URI for an absolute filesystem path.
Mirrors Node's ``pathToFileURL`` — handles spaces, unicode, and
Windows drive letters (``C:\\foo`` → ``file:///C:/foo``).
"""
abs_path = os.path.abspath(path)
if os.name == "nt":
# Windows: backslash → forward slash, prepend extra slash so
# the drive letter shows up as part of the path component.
abs_path = abs_path.replace("\\", "/")
if not abs_path.startswith("/"):
abs_path = "/" + abs_path
return "file://" + quote(abs_path, safe="/:")
def uri_to_path(uri: str) -> str:
"""Inverse of :func:`file_uri`."""
if not uri.startswith("file://"):
return uri
raw = uri[len("file://"):]
if os.name == "nt" and raw.startswith("/") and len(raw) > 2 and raw[2] == ":":
raw = raw[1:] # strip leading slash before drive letter
return os.path.normpath(unquote(raw))
def _end_position(text: str) -> Dict[str, int]:
"""Return the LSP Position at the end of ``text``.
Used to construct a single-range "replace whole document" change
for ``textDocument/didChange`` regardless of the server's declared
sync mode.
"""
if not text:
return {"line": 0, "character": 0}
lines = text.splitlines(keepends=False)
last_line = len(lines) - 1
last_col = len(lines[-1]) if lines else 0
# If the text ends with a trailing newline, ``splitlines`` won't
# represent it. The end position is then the start of the next
# (empty) line — line index is len(lines), column 0.
if text.endswith(("\n", "\r")):
return {"line": last_line + 1, "character": 0}
return {"line": last_line, "character": last_col}
class LSPClient:
"""Async LSP client tied to one server process and one workspace root.
Lifecycle:
c = LSPClient(server_id, workspace_root, command, args, init_options)
await c.start() # spawn + initialize
ver = await c.open_file("/path/to/foo.py")
await c.wait_for_diagnostics("/path/to/foo.py", ver)
diags = c.diagnostics_for("/path/to/foo.py")
await c.shutdown()
"""
# ------------------------------------------------------------------
# construction + lifecycle
# ------------------------------------------------------------------
def __init__(
self,
*,
server_id: str,
workspace_root: str,
command: List[str],
env: Optional[Dict[str, str]] = None,
cwd: Optional[str] = None,
initialization_options: Optional[Dict[str, Any]] = None,
seed_diagnostics_on_first_push: bool = False,
) -> None:
self.server_id = server_id
self.workspace_root = workspace_root
self._command = list(command)
self._env = env
self._cwd = cwd or workspace_root
self._init_options = initialization_options or {}
self._seed_first_push = seed_diagnostics_on_first_push
# Process + streams
self._proc: Optional[asyncio.subprocess.Process] = None
self._stderr_task: Optional[asyncio.Task] = None
self._reader_task: Optional[asyncio.Task] = None
# Request/response correlation
self._next_id: int = 0
self._pending: Dict[int, asyncio.Future] = {}
# Server-side request handlers (server → client requests).
# Kept small and explicit; everything else returns method-not-found.
self._request_handlers: Dict[str, Callable[[Any], Awaitable[Any]]] = {
"window/workDoneProgress/create": self._handle_work_done_create,
"workspace/configuration": self._handle_workspace_configuration,
"client/registerCapability": self._handle_register_capability,
"client/unregisterCapability": self._handle_unregister_capability,
"workspace/workspaceFolders": self._handle_workspace_folders,
"workspace/diagnostic/refresh": self._handle_diagnostic_refresh,
}
# Notifications (server → client) we care about.
self._notification_handlers: Dict[str, Callable[[Any], None]] = {
"textDocument/publishDiagnostics": self._handle_publish_diagnostics,
# Everything else (window/showMessage, $/progress, etc.)
# is silently dropped by default.
}
# Tracked file state — required for didChange version bumps.
self._files: Dict[str, Dict[str, Any]] = {}
# Diagnostic stores, keyed by file path (NOT URI).
self._push_diagnostics: Dict[str, List[Dict[str, Any]]] = {}
self._pull_diagnostics: Dict[str, List[Dict[str, Any]]] = {}
# Per-path "last published" time so wait-for-fresh logic works.
self._published: Dict[str, float] = {}
# Per-path version of the latest push (matches our didChange
# version when the server respects it).
self._published_version: Dict[str, int] = {}
# First-push seen flag, for typescript-style seed-on-first-push.
self._first_push_seen: Set[str] = set()
# Capability registrations — only diagnostic ones are tracked.
self._diagnostic_registrations: Dict[str, Dict[str, Any]] = {}
# State machine
self._state: str = "stopped"
self._initialize_result: Optional[Dict[str, Any]] = None
self._sync_kind: int = 1 # 1=Full, 2=Incremental
self._stopping: bool = False
# Push event for waiters.
self._push_event = asyncio.Event()
# Monotonic counter incremented on every publishDiagnostics push.
# Waiters snapshot it on entry and treat any increase as
# "something happened, recheck the predicate". Avoids the
# asyncio.Event sticky-state trap.
self._push_counter = 0
# Registration change event so wait_for_diagnostics can re-loop
# when the server announces a new dynamic provider.
self._registration_event = asyncio.Event()
@property
def is_running(self) -> bool:
return self._state == "running" and self._proc is not None and self._proc.returncode is None
@property
def state(self) -> str:
return self._state
async def start(self) -> None:
"""Spawn the server and complete the initialize handshake.
Raises any exception encountered during spawn/init. On failure
the process is killed and the client is left in state
``"error"`` — re-call ``start()`` to retry.
"""
if self._state in ("running", "starting"):
return
self._state = "starting"
try:
await self._spawn()
await self._initialize()
self._state = "running"
except Exception:
self._state = "error"
await self._cleanup_process()
raise
async def _spawn(self) -> None:
env = dict(os.environ)
if self._env:
env.update(self._env)
try:
self._proc = await asyncio.create_subprocess_exec(
self._command[0],
*self._command[1:],
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
cwd=self._cwd,
)
except FileNotFoundError as e:
raise LSPProtocolError(
f"LSP server binary not found: {self._command[0]} ({e})"
) from e
# Drain stderr at debug level — if we don't, the pipe buffer
# fills and the server hangs.
self._stderr_task = asyncio.create_task(self._drain_stderr())
# Start the reader loop.
self._reader_task = asyncio.create_task(self._reader_loop())
async def _drain_stderr(self) -> None:
if self._proc is None or self._proc.stderr is None:
return
try:
while True:
line = await self._proc.stderr.readline()
if not line:
break
text = line.decode("utf-8", errors="replace").rstrip()
if text:
logger.debug("[%s] stderr: %s", self.server_id, text[:1000])
except (asyncio.CancelledError, OSError):
pass
async def _reader_loop(self) -> None:
if self._proc is None or self._proc.stdout is None:
return
try:
while True:
msg = await read_message(self._proc.stdout)
if msg is None:
logger.debug("[%s] server closed stdout cleanly", self.server_id)
break
kind, key = classify_message(msg)
if kind == "response":
self._dispatch_response(key, msg)
elif kind == "request":
asyncio.create_task(self._dispatch_request(key, msg))
elif kind == "notification":
self._dispatch_notification(key, msg)
else:
logger.warning("[%s] dropping invalid message: %r", self.server_id, msg)
except LSPProtocolError as e:
logger.warning("[%s] protocol error in reader loop: %s", self.server_id, e)
except (asyncio.CancelledError, OSError):
pass
finally:
# Wake up any pending requests so they can fail fast.
for fut in list(self._pending.values()):
if not fut.done():
fut.set_exception(LSPProtocolError("server connection closed"))
self._pending.clear()
async def _initialize(self) -> None:
params = {
"rootUri": file_uri(self.workspace_root),
"rootPath": self.workspace_root,
"processId": os.getpid(),
"workspaceFolders": [
{"name": "workspace", "uri": file_uri(self.workspace_root)}
],
"initializationOptions": self._init_options,
"capabilities": {
"window": {"workDoneProgress": True},
"workspace": {
"configuration": True,
"workspaceFolders": True,
"didChangeWatchedFiles": {"dynamicRegistration": True},
"diagnostics": {"refreshSupport": False},
},
"textDocument": {
"synchronization": {
"dynamicRegistration": False,
"didOpen": True,
"didChange": True,
"didSave": True,
"willSave": False,
"willSaveWaitUntil": False,
},
"diagnostic": {
"dynamicRegistration": True,
"relatedDocumentSupport": True,
},
"publishDiagnostics": {
"relatedInformation": True,
"tagSupport": {"valueSet": [1, 2]},
"versionSupport": True,
"codeDescriptionSupport": True,
"dataSupport": False,
},
"hover": {"contentFormat": ["markdown", "plaintext"]},
"definition": {"linkSupport": True},
"references": {},
"documentSymbol": {"hierarchicalDocumentSymbolSupport": True},
},
"general": {"positionEncodings": ["utf-16"]},
},
}
result = await asyncio.wait_for(
self._send_request("initialize", params),
timeout=INITIALIZE_TIMEOUT,
)
self._initialize_result = result
self._sync_kind = self._extract_sync_kind(result.get("capabilities") or {})
await self._send_notification("initialized", {})
if self._init_options:
# Some servers (vtsls, eslint) want config pushed via
# didChangeConfiguration even if it was sent in
# initializationOptions.
await self._send_notification(
"workspace/didChangeConfiguration",
{"settings": self._init_options},
)
@staticmethod
def _extract_sync_kind(capabilities: dict) -> int:
sync = capabilities.get("textDocumentSync")
if isinstance(sync, int):
return sync
if isinstance(sync, dict):
change = sync.get("change")
if isinstance(change, int):
return change
return 1 # default to Full
async def shutdown(self) -> None:
"""Best-effort graceful shutdown.
Sends ``shutdown`` + ``exit``, then SIGTERMs/SIGKILLs the
process if it doesn't exit cleanly. Idempotent.
"""
if self._stopping:
return
self._stopping = True
try:
if self.is_running:
try:
await asyncio.wait_for(self._send_request("shutdown", None), timeout=2.0)
except (asyncio.TimeoutError, LSPRequestError, LSPProtocolError):
pass
try:
await self._send_notification("exit", None)
except Exception:
pass
finally:
self._state = "stopped"
await self._cleanup_process()
async def _cleanup_process(self) -> None:
if self._reader_task is not None and not self._reader_task.done():
self._reader_task.cancel()
try:
await self._reader_task
except (asyncio.CancelledError, Exception): # noqa: BLE001
pass
if self._stderr_task is not None and not self._stderr_task.done():
self._stderr_task.cancel()
try:
await self._stderr_task
except (asyncio.CancelledError, Exception): # noqa: BLE001
pass
proc = self._proc
self._proc = None
if proc is None:
return
if proc.returncode is None:
try:
proc.terminate()
try:
await asyncio.wait_for(proc.wait(), timeout=SHUTDOWN_GRACE)
except asyncio.TimeoutError:
try:
proc.kill()
await proc.wait()
except ProcessLookupError:
pass
except ProcessLookupError:
pass
# ------------------------------------------------------------------
# request / notification plumbing
# ------------------------------------------------------------------
async def _send_request(self, method: str, params: Any) -> Any:
if self._proc is None or self._proc.stdin is None or self._proc.stdin.is_closing():
raise LSPProtocolError(f"cannot send {method!r}: stdin closed")
loop = asyncio.get_running_loop()
req_id = self._next_id
self._next_id += 1
fut: asyncio.Future = loop.create_future()
self._pending[req_id] = fut
try:
self._proc.stdin.write(encode_message(make_request(req_id, method, params)))
await self._proc.stdin.drain()
except (BrokenPipeError, ConnectionResetError, OSError) as e:
self._pending.pop(req_id, None)
raise LSPProtocolError(f"send failed for {method!r}: {e}") from e
try:
return await fut
finally:
self._pending.pop(req_id, None)
async def _send_request_with_retry(self, method: str, params: Any, *, timeout: float) -> Any:
"""Send a request, retrying on ``ContentModified`` (-32801).
Other errors propagate. The retry policy matches Claude Code's
``LSPServerInstance.sendRequest`` — 3 attempts with delays
0.5s, 1.0s, 2.0s.
"""
for attempt in range(MAX_CONTENT_MODIFIED_RETRIES + 1):
try:
return await asyncio.wait_for(self._send_request(method, params), timeout=timeout)
except LSPRequestError as e:
if e.code == ERROR_CONTENT_MODIFIED and attempt < MAX_CONTENT_MODIFIED_RETRIES:
await asyncio.sleep(RETRY_BASE_DELAY * (2 ** attempt))
continue
raise
async def _send_notification(self, method: str, params: Any) -> None:
if self._proc is None or self._proc.stdin is None or self._proc.stdin.is_closing():
return
try:
self._proc.stdin.write(encode_message(make_notification(method, params)))
await self._proc.stdin.drain()
except (BrokenPipeError, ConnectionResetError, OSError) as e:
logger.debug("[%s] notify %s failed: %s", self.server_id, method, e)
async def _send_response(self, req_id: Any, result: Any) -> None:
if self._proc is None or self._proc.stdin is None or self._proc.stdin.is_closing():
return
try:
self._proc.stdin.write(encode_message(make_response(req_id, result)))
await self._proc.stdin.drain()
except (BrokenPipeError, ConnectionResetError, OSError):
pass
async def _send_error_response(self, req_id: Any, code: int, message: str) -> None:
if self._proc is None or self._proc.stdin is None or self._proc.stdin.is_closing():
return
try:
self._proc.stdin.write(encode_message(make_error_response(req_id, code, message)))
await self._proc.stdin.drain()
except (BrokenPipeError, ConnectionResetError, OSError):
pass
def _dispatch_response(self, req_id: int, msg: dict) -> None:
fut = self._pending.get(req_id)
if fut is None or fut.done():
return
if "error" in msg:
err = msg["error"] or {}
fut.set_exception(
LSPRequestError(
code=int(err.get("code", -32000)),
message=str(err.get("message", "unknown")),
data=err.get("data"),
)
)
else:
fut.set_result(msg.get("result"))
async def _dispatch_request(self, req_id: Any, msg: dict) -> None:
method = msg.get("method", "")
params = msg.get("params")
handler = self._request_handlers.get(method)
if handler is None:
await self._send_error_response(req_id, ERROR_METHOD_NOT_FOUND, f"method not found: {method}")
return
try:
result = await handler(params)
except Exception as e: # noqa: BLE001 — protocol must not blow up
logger.warning("[%s] request handler %s failed: %s", self.server_id, method, e)
await self._send_error_response(req_id, -32000, f"handler failed: {e}")
return
await self._send_response(req_id, result)
def _dispatch_notification(self, method: str, msg: dict) -> None:
handler = self._notification_handlers.get(method)
if handler is None:
return
try:
handler(msg.get("params"))
except Exception as e: # noqa: BLE001
logger.debug("[%s] notification handler %s failed: %s", self.server_id, method, e)
# ------------------------------------------------------------------
# built-in server-→-client request handlers
# ------------------------------------------------------------------
async def _handle_work_done_create(self, params: Any) -> Any:
# Acknowledge progress tokens — required by some servers.
return None
async def _handle_workspace_configuration(self, params: Any) -> Any:
# Walk dotted sections through initializationOptions. Mirrors
# OpenCode's `client.ts:198-220` — return null when missing.
if not isinstance(params, dict):
return [None]
items = params.get("items") or []
out: List[Any] = []
for item in items:
if not isinstance(item, dict):
out.append(None)
continue
section = item.get("section")
if not section or not self._init_options:
out.append(self._init_options or None)
continue
cur: Any = self._init_options
for part in str(section).split("."):
if isinstance(cur, dict) and part in cur:
cur = cur[part]
else:
cur = None
break
out.append(cur)
return out
async def _handle_register_capability(self, params: Any) -> Any:
if not isinstance(params, dict):
return None
for reg in params.get("registrations") or []:
if not isinstance(reg, dict):
continue
method = reg.get("method")
reg_id = reg.get("id")
if method == "textDocument/diagnostic" and reg_id:
self._diagnostic_registrations[str(reg_id)] = reg
self._registration_event.set()
return None
async def _handle_unregister_capability(self, params: Any) -> Any:
if not isinstance(params, dict):
return None
for unreg in params.get("unregisterations") or []:
if not isinstance(unreg, dict):
continue
reg_id = unreg.get("id")
if reg_id:
self._diagnostic_registrations.pop(str(reg_id), None)
return None
async def _handle_workspace_folders(self, params: Any) -> Any:
return [{"name": "workspace", "uri": file_uri(self.workspace_root)}]
async def _handle_diagnostic_refresh(self, params: Any) -> Any:
# We don't honour refresh — we re-pull on every touchFile.
return None
# ------------------------------------------------------------------
# publishDiagnostics handler
# ------------------------------------------------------------------
def _handle_publish_diagnostics(self, params: Any) -> None:
if not isinstance(params, dict):
return
uri = params.get("uri")
if not isinstance(uri, str):
return
path = uri_to_path(uri)
diagnostics = params.get("diagnostics") or []
if not isinstance(diagnostics, list):
diagnostics = []
version = params.get("version")
loop_time = asyncio.get_event_loop().time()
if self._seed_first_push and path not in self._first_push_seen:
# First push: seed without firing the event so a waiter
# doesn't resolve on the very first push (which arrives
# before the user-triggered didChange could've produced
# fresh diagnostics).
self._first_push_seen.add(path)
self._push_diagnostics[path] = diagnostics
self._published[path] = loop_time
if isinstance(version, int):
self._published_version[path] = version
return
self._push_diagnostics[path] = diagnostics
self._published[path] = loop_time
if isinstance(version, int):
self._published_version[path] = version
self._first_push_seen.add(path)
# Bump the monotonic push counter and wake every waiter. We
# keep the Event sticky-set so any wait already in progress
# resolves; waiters re-check their predicate after waking and
# decide whether to keep waiting. ``_push_counter`` is what
# they actually compare against to detect a fresh event.
self._push_counter += 1
self._push_event.set()
# ------------------------------------------------------------------
# public file-sync API
# ------------------------------------------------------------------
async def open_file(self, path: str, *, language_id: str = "plaintext") -> int:
"""Send didOpen (first time) or didChange (subsequent) for ``path``.
Returns the new document version number that the agent's
``wait_for_diagnostics`` should match against.
"""
if not self.is_running:
raise LSPProtocolError("client not running")
abs_path = os.path.abspath(path)
try:
text = Path(abs_path).read_text(encoding="utf-8", errors="replace")
except OSError as e:
raise LSPProtocolError(f"cannot read {abs_path}: {e}") from e
uri = file_uri(abs_path)
existing = self._files.get(abs_path)
if existing is not None:
# Re-open: bump version, fire didChangeWatchedFiles + didChange.
await self._send_notification(
"workspace/didChangeWatchedFiles",
{"changes": [{"uri": uri, "type": 2}]}, # 2 = CHANGED
)
new_version = existing["version"] + 1
old_text = existing["text"]
content_changes: List[Dict[str, Any]]
if self._sync_kind == 2:
content_changes = [
{
"range": {
"start": {"line": 0, "character": 0},
"end": _end_position(old_text),
},
"text": text,
}
]
else:
content_changes = [{"text": text}]
await self._send_notification(
"textDocument/didChange",
{
"textDocument": {"uri": uri, "version": new_version},
"contentChanges": content_changes,
},
)
self._files[abs_path] = {"version": new_version, "text": text}
return new_version
# First open: didChangeWatchedFiles CREATED + didOpen.
await self._send_notification(
"workspace/didChangeWatchedFiles",
{"changes": [{"uri": uri, "type": 1}]}, # 1 = CREATED
)
# Clear any stale push/pull entries — fresh open should start
# from scratch.
self._push_diagnostics.pop(abs_path, None)
self._pull_diagnostics.pop(abs_path, None)
self._published.pop(abs_path, None)
self._published_version.pop(abs_path, None)
await self._send_notification(
"textDocument/didOpen",
{
"textDocument": {
"uri": uri,
"languageId": language_id,
"version": 0,
"text": text,
}
},
)
self._files[abs_path] = {"version": 0, "text": text}
return 0
async def save_file(self, path: str) -> None:
"""Send didSave for ``path``. Some linters re-scan only on save."""
if not self.is_running:
return
abs_path = os.path.abspath(path)
await self._send_notification(
"textDocument/didSave",
{"textDocument": {"uri": file_uri(abs_path)}},
)
# ------------------------------------------------------------------
# diagnostics: pull + wait
# ------------------------------------------------------------------
async def _pull_document_diagnostics(self, path: str) -> None:
"""Send ``textDocument/diagnostic`` for one file.
Stores results into :attr:`_pull_diagnostics`. Silently
no-ops on errors (server may not support the pull endpoint).
"""
try:
params: Dict[str, Any] = {
"textDocument": {"uri": file_uri(os.path.abspath(path))}
}
result = await self._send_request_with_retry(
"textDocument/diagnostic",
params,
timeout=DIAGNOSTICS_REQUEST_TIMEOUT,
)
except (LSPRequestError, LSPProtocolError, asyncio.TimeoutError) as e:
logger.debug("[%s] document diagnostic pull failed: %s", self.server_id, e)
return
if not isinstance(result, dict):
return
items = result.get("items")
if isinstance(items, list):
self._pull_diagnostics[os.path.abspath(path)] = items
related = result.get("relatedDocuments")
if isinstance(related, dict):
for uri, sub in related.items():
if not isinstance(sub, dict):
continue
sub_items = sub.get("items")
if isinstance(sub_items, list):
self._pull_diagnostics[uri_to_path(uri)] = sub_items
async def wait_for_diagnostics(
self,
path: str,
version: int,
*,
mode: str = "document",
) -> None:
"""Wait for the server to publish diagnostics for ``path`` at ``version``.
``mode`` is ``"document"`` (5s budget, document pulls) or
``"full"`` (10s budget, also workspace pulls). Best-effort —
returns silently on timeout. Does NOT throw if the server
doesn't support pull diagnostics; we still get the push side.
"""
budget = DIAGNOSTICS_FULL_WAIT if mode == "full" else DIAGNOSTICS_DOCUMENT_WAIT
deadline = asyncio.get_event_loop().time() + budget
abs_path = os.path.abspath(path)
while True:
remaining = deadline - asyncio.get_event_loop().time()
if remaining <= 0:
return
# Concurrent: document pull + push wait.
pull_task = asyncio.create_task(self._pull_document_diagnostics(abs_path))
push_task = asyncio.create_task(self._wait_for_fresh_push(abs_path, version, remaining))
done, pending = await asyncio.wait(
{pull_task, push_task},
timeout=remaining,
return_when=asyncio.FIRST_COMPLETED,
)
for t in pending:
t.cancel()
for t in pending:
try:
await t
except (asyncio.CancelledError, Exception): # noqa: BLE001
pass
# If we got a fresh push for our version, we're done.
current_v = self._published_version.get(abs_path)
if abs_path in self._published and (
current_v is None or current_v >= version
):
return
# Pull may have populated _pull_diagnostics — that's also
# success.
if abs_path in self._pull_diagnostics:
return
# Loop until budget runs out.
async def _wait_for_fresh_push(self, path: str, version: int, timeout: float) -> None:
"""Wait until a publishDiagnostics arrives for ``path`` at ``version``+."""
deadline = asyncio.get_event_loop().time() + timeout
baseline = self._push_counter
while True:
current_v = self._published_version.get(path)
if path in self._published and (current_v is None or current_v >= version):
# Debounce — wait a tick in case more diagnostics arrive
# immediately after. TS often emits in pairs. We
# snapshot the counter so we wake on a *new* push, not
# on the one that satisfied us a moment ago.
debounce_baseline = self._push_counter
debounce_deadline = asyncio.get_event_loop().time() + PUSH_DEBOUNCE
while self._push_counter == debounce_baseline:
remaining = debounce_deadline - asyncio.get_event_loop().time()
if remaining <= 0:
break
self._push_event.clear()
try:
await asyncio.wait_for(self._push_event.wait(), timeout=remaining)
except asyncio.TimeoutError:
break
return
remaining = deadline - asyncio.get_event_loop().time()
if remaining <= 0:
return
if self._push_counter > baseline:
# New event arrived but predicate still false — re-check
# immediately without waiting again.
baseline = self._push_counter
continue
self._push_event.clear()
try:
await asyncio.wait_for(self._push_event.wait(), timeout=min(remaining, 0.5))
except asyncio.TimeoutError:
continue
def diagnostics_for(self, path: str) -> List[Dict[str, Any]]:
"""Return current merged + deduped diagnostics for one file.
Diagnostics from push and pull stores are concatenated and
deduplicated by ``(severity, code, message, range)`` content
key. Empty list if the server hasn't published anything.
"""
abs_path = os.path.abspath(path)
push = self._push_diagnostics.get(abs_path) or []
pull = self._pull_diagnostics.get(abs_path) or []
return _dedupe(push, pull)
def _dedupe(*lists: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
seen: Set[str] = set()
out: List[Dict[str, Any]] = []
for lst in lists:
for d in lst:
if not isinstance(d, dict):
continue
key = _diagnostic_key(d)
if key in seen:
continue
seen.add(key)
out.append(d)
return out
def _diagnostic_key(d: Dict[str, Any]) -> str:
"""Content-equality key for a diagnostic.
Matches the structural-equality used in claude-code's
``areDiagnosticsEqual`` — message + severity + source + code +
range coords. The range is reduced to a tuple to keep the key
stable across dict orderings.
"""
rng = d.get("range") or {}
start = rng.get("start") or {}
end = rng.get("end") or {}
code = d.get("code")
if code is not None and not isinstance(code, str):
code = str(code)
return "\x00".join(
[
str(d.get("severity") or 1),
str(code or ""),
str(d.get("source") or ""),
str(d.get("message") or "").strip(),
f"{start.get('line', 0)}:{start.get('character', 0)}-{end.get('line', 0)}:{end.get('character', 0)}",
]
)
__all__ = [
"LSPClient",
"file_uri",
"uri_to_path",
"INITIALIZE_TIMEOUT",
"DIAGNOSTICS_DOCUMENT_WAIT",
"DIAGNOSTICS_FULL_WAIT",
]

213
plugins/lsp/eventlog.py Normal file
View File

@@ -0,0 +1,213 @@
"""Structured logging with steady-state silence for the LSP layer.
The LSP layer fires on every write_file/patch. In a busy session
that's hundreds of events. We want users to be able to ``rg`` the
log for "did LSP fire on that edit?" without drowning in noise.
The level model:
- ``DEBUG`` for steady-state events that have no novel signal:
``clean``, ``feature off``, ``extension not mapped``, ``no project
root for already-announced file``, ``server unavailable for
already-announced binary``. These never reach ``agent.log`` at the
default INFO threshold.
- ``INFO`` for state transitions worth surfacing exactly once per
session: ``active for <root>`` the first time a (server_id,
workspace_root) client starts, ``no project root for <path>``
the first time we see that file. Plus every diagnostic event
(those are inherently rare and per-edit, exactly what users grep
for).
- ``WARNING`` for action-required failures: ``server unavailable``
(binary not on PATH) the first time per (server_id, binary),
``no server configured`` once per language. Per-call WARNING for
timeouts and unexpected bridge exceptions.
The dedup is in-process module-level sets. Each set grows at most by
the number of distinct (server_id, root) and (server_id, binary)
pairs touched in one Python process — bytes of memory in even an
aggressive monorepo session. Bounded LRU was rejected: evicting an
entry would risk re-firing the WARNING/INFO line we explicitly want
to suppress.
Grep recipe::
tail -f ~/.hermes/logs/agent.log | rg 'lsp\\['
"""
from __future__ import annotations
import logging
import os
import threading
from typing import Tuple
# Dedicated logger name so the documented grep recipe survives a
# ``logging.getLogger(__name__)`` rename of any internal module.
event_log = logging.getLogger("hermes.lint.lsp")
# ---------------------------------------------------------------------------
# Once-per-X dedup sets
# ---------------------------------------------------------------------------
_announce_lock = threading.Lock()
_announced_active: set = set() # keys: (server_id, workspace_root)
_announced_unavailable: set = set() # keys: (server_id, binary_path_or_name)
_announced_no_root: set = set() # keys: (server_id, file_path)
_announced_no_server: set = set() # keys: (server_id,)
def _short_path(file_path: str) -> str:
"""Render *file_path* relative to the cwd when sensible, else absolute.
Keeps log lines readable for the common case (the user is inside
the project they're editing) without emitting brittle ``../../..``
chains for the cross-tree case.
"""
if not file_path:
return file_path
try:
rel = os.path.relpath(file_path)
except ValueError:
return file_path
if rel.startswith(".." + os.sep) or rel == "..":
return file_path
return rel
def _emit(server_id: str, level: int, message: str) -> None:
event_log.log(level, "lsp[%s] %s", server_id, message)
def _announce_once(bucket: set, key: Tuple) -> bool:
"""Return True if *key* has not been announced for *bucket* yet.
Atomically marks the key as announced so concurrent callers
cannot both win the race and double-log.
"""
with _announce_lock:
if key in bucket:
return False
bucket.add(key)
return True
# ---------------------------------------------------------------------------
# Public event helpers — call these from the LSP layer.
# ---------------------------------------------------------------------------
def log_clean(server_id: str, file_path: str) -> None:
"""No diagnostics emitted for *file_path*. DEBUG (silent at default)."""
_emit(server_id, logging.DEBUG, f"clean ({_short_path(file_path)})")
def log_disabled(server_id: str, file_path: str, reason: str) -> None:
"""LSP intentionally skipped for this file (feature off, ext unmapped,
backend not local, etc.). DEBUG."""
_emit(server_id, logging.DEBUG, f"skipped: {reason} ({_short_path(file_path)})")
def log_active(server_id: str, workspace_root: str) -> None:
"""A new LSP client started for (server_id, workspace_root).
INFO once per (server_id, workspace_root); DEBUG thereafter.
Lets users verify "is LSP actually running?" with a single grep.
"""
key = (server_id, workspace_root)
if _announce_once(_announced_active, key):
_emit(server_id, logging.INFO, f"active for {workspace_root}")
else:
_emit(server_id, logging.DEBUG, f"reused client for {workspace_root}")
def log_diagnostics(server_id: str, file_path: str, count: int) -> None:
"""Diagnostics arrived for a file. INFO every time — these are the
failure signals users actually want to grep for, and they are
inherently rare per edit."""
_emit(server_id, logging.INFO, f"{count} diags ({_short_path(file_path)})")
def log_no_project_root(server_id: str, file_path: str) -> None:
"""File had no recognised project marker. INFO once per file,
DEBUG thereafter."""
key = (server_id, file_path)
if _announce_once(_announced_no_root, key):
_emit(server_id, logging.INFO, f"no project root for {_short_path(file_path)}")
else:
_emit(server_id, logging.DEBUG, f"no project root for {_short_path(file_path)}")
def log_server_unavailable(server_id: str, binary_or_pkg: str) -> None:
"""The server binary couldn't be resolved. WARNING once per
(server_id, binary), DEBUG thereafter so a hundred subsequent
.py edits don't spam the log."""
key = (server_id, binary_or_pkg)
if _announce_once(_announced_unavailable, key):
_emit(
server_id,
logging.WARNING,
f"server unavailable: {binary_or_pkg} not found "
"(install via `hermes lsp install <id>` or set lsp.servers.<id>.command)",
)
else:
_emit(server_id, logging.DEBUG, f"server still unavailable: {binary_or_pkg}")
def log_no_server_configured(server_id: str) -> None:
"""No spawn recipe for this language. WARNING once."""
if _announce_once(_announced_no_server, (server_id,)):
_emit(server_id, logging.WARNING, "no server configured")
def log_timeout(server_id: str, file_path: str, kind: str = "diagnostics") -> None:
"""A request to the server timed out. WARNING every time — these are
inherently novel events worth surfacing on each occurrence."""
_emit(
server_id,
logging.WARNING,
f"{kind} timed out for {_short_path(file_path)}",
)
def log_server_error(server_id: str, file_path: str, exc: BaseException) -> None:
"""An unexpected exception bubbled out of the LSP layer. WARNING."""
_emit(
server_id,
logging.WARNING,
f"unexpected error for {_short_path(file_path)}: {type(exc).__name__}: {exc}",
)
def log_spawn_failed(server_id: str, workspace_root: str, exc: BaseException) -> None:
"""The LSP server failed to spawn or initialize. WARNING."""
_emit(
server_id,
logging.WARNING,
f"spawn/initialize failed for {workspace_root}: {type(exc).__name__}: {exc}",
)
def reset_announce_caches() -> None:
"""Test-only: clear the dedup caches. Production code never calls this."""
with _announce_lock:
_announced_active.clear()
_announced_unavailable.clear()
_announced_no_root.clear()
_announced_no_server.clear()
__all__ = [
"event_log",
"log_clean",
"log_disabled",
"log_active",
"log_diagnostics",
"log_no_project_root",
"log_server_unavailable",
"log_no_server_configured",
"log_timeout",
"log_server_error",
"log_spawn_failed",
"reset_announce_caches",
]

347
plugins/lsp/install.py Normal file
View File

@@ -0,0 +1,347 @@
"""Auto-installation of LSP server binaries.
Tries to install missing servers using whatever package manager is
appropriate. All installs go to a Hermes-owned bin staging dir,
``<HERMES_HOME>/lsp/bin/``, so we don't pollute the user's global
toolchain.
Strategies:
- ``auto`` — attempt to install with the best available package
manager. This is the default.
- ``manual`` — never install; if a binary is missing, the server is
silently skipped and the user is told about it via ``hermes lsp
status``.
- ``off`` — same as ``manual`` for now (kept distinct so we can
evolve behavior later, e.g. logging differently).
The actual installs happen synchronously the first time a server is
needed and concurrent calls to :func:`try_install` for the same
package are deduplicated via a per-package lock.
Failure modes are non-fatal: every install path is wrapped in
try/except and returns ``None`` on failure. The tool layer then
falls back to its in-process syntax checker, exactly as if the user
hadn't enabled LSP at all.
"""
from __future__ import annotations
import logging
import os
import shutil
import subprocess
import sys
import threading
from pathlib import Path
from typing import Dict, Optional
logger = logging.getLogger("agent.lsp.install")
# Package-name → install-strategy hint registry. Each entry is a
# tuple of strategy name + package name + executable name. When the
# install completes, we look for the executable in
# ``<HERMES_HOME>/lsp/bin/`` first, then on PATH.
INSTALL_RECIPES: Dict[str, Dict[str, str]] = {
# Python
"pyright": {"strategy": "npm", "pkg": "pyright", "bin": "pyright-langserver"},
# JS/TS family
"typescript-language-server": {
"strategy": "npm",
"pkg": "typescript-language-server",
"bin": "typescript-language-server",
},
"@vue/language-server": {
"strategy": "npm",
"pkg": "@vue/language-server",
"bin": "vue-language-server",
},
"svelte-language-server": {
"strategy": "npm",
"pkg": "svelte-language-server",
"bin": "svelteserver",
},
"@astrojs/language-server": {
"strategy": "npm",
"pkg": "@astrojs/language-server",
"bin": "astro-ls",
},
"yaml-language-server": {
"strategy": "npm",
"pkg": "yaml-language-server",
"bin": "yaml-language-server",
},
"bash-language-server": {
"strategy": "npm",
"pkg": "bash-language-server",
"bin": "bash-language-server",
},
"intelephense": {"strategy": "npm", "pkg": "intelephense", "bin": "intelephense"},
"dockerfile-language-server-nodejs": {
"strategy": "npm",
"pkg": "dockerfile-language-server-nodejs",
"bin": "docker-langserver",
},
# Go
"gopls": {"strategy": "go", "pkg": "golang.org/x/tools/gopls@latest", "bin": "gopls"},
# Rust — too heavy (hundreds of MB to bootstrap). We do NOT
# auto-install rust-analyzer; users install via rustup.
"rust-analyzer": {"strategy": "manual", "pkg": "", "bin": "rust-analyzer"},
# C/C++ — manual (clangd ships with LLVM, very heavy)
"clangd": {"strategy": "manual", "pkg": "", "bin": "clangd"},
# Lua — manual (LuaLS is platform-specific binaries from GitHub
# releases; complex enough that we punt to the user)
"lua-language-server": {"strategy": "manual", "pkg": "", "bin": "lua-language-server"},
}
_install_locks: Dict[str, threading.Lock] = {}
_install_results: Dict[str, Optional[str]] = {}
_install_lock_meta = threading.Lock()
def hermes_lsp_bin_dir() -> Path:
"""Return the Hermes-owned bin staging dir for LSP servers."""
home = os.environ.get("HERMES_HOME")
if home is None:
home = os.path.join(os.path.expanduser("~"), ".hermes")
p = Path(home) / "lsp" / "bin"
p.mkdir(parents=True, exist_ok=True)
return p
def _existing_binary(name: str) -> Optional[str]:
"""Probe the staging dir + PATH for a binary named ``name``."""
staged = hermes_lsp_bin_dir() / name
if staged.exists() and os.access(staged, os.X_OK):
return str(staged)
on_path = shutil.which(name)
if on_path:
return on_path
return None
def _get_lock(pkg: str) -> threading.Lock:
with _install_lock_meta:
lock = _install_locks.get(pkg)
if lock is None:
lock = threading.Lock()
_install_locks[pkg] = lock
return lock
def try_install(pkg: str, strategy: str = "auto") -> Optional[str]:
"""Try to install ``pkg`` and return the binary path if successful.
``strategy`` is ``"auto"``, ``"manual"``, or ``"off"``. In
``manual``/``off`` mode, this function only probes for an
existing binary and returns ``None`` if not found.
The install is cached per-package — a second call returns the
same path (or ``None``) without reinstalling. Concurrent calls
are serialized.
"""
if strategy not in ("auto",):
# Only ``auto`` triggers an actual install. In manual/off,
# we still check whether the binary already exists.
recipe = INSTALL_RECIPES.get(pkg, {})
bin_name = recipe.get("bin", pkg)
return _existing_binary(bin_name)
if pkg in _install_results:
return _install_results[pkg]
lock = _get_lock(pkg)
with lock:
# Double-check after acquiring lock.
if pkg in _install_results:
return _install_results[pkg]
result = _do_install(pkg)
_install_results[pkg] = result
return result
def _do_install(pkg: str) -> Optional[str]:
recipe = INSTALL_RECIPES.get(pkg)
if recipe is None:
# Not in our registry — best-effort: just probe PATH.
return shutil.which(pkg)
strategy = recipe.get("strategy", "manual")
bin_name = recipe.get("bin", pkg)
# Check if already present (shutil.which or staging dir)
existing = _existing_binary(bin_name)
if existing:
return existing
if strategy == "manual":
logger.debug("[install] %s requires manual install (recipe=%s)", pkg, recipe)
return None
if strategy == "npm":
return _install_npm(recipe.get("pkg", pkg), bin_name)
if strategy == "go":
return _install_go(recipe.get("pkg", pkg), bin_name)
if strategy == "pip":
return _install_pip(recipe.get("pkg", pkg), bin_name)
logger.warning("[install] unknown strategy %r for %s", strategy, pkg)
return None
def _install_npm(pkg: str, bin_name: str) -> Optional[str]:
"""Install an npm package into our staging dir.
Uses ``npm install --prefix`` so the binaries land in
``<staging>/node_modules/.bin/<bin_name>`` and we symlink them up
one level for direct PATH-style access.
"""
npm = shutil.which("npm")
if npm is None:
logger.info("[install] cannot install %s: npm not on PATH", pkg)
return None
staging = hermes_lsp_bin_dir().parent # <HERMES_HOME>/lsp/
try:
logger.info("[install] npm install --prefix %s %s", staging, pkg)
proc = subprocess.run(
[npm, "install", "--prefix", str(staging), "--silent", "--no-fund", "--no-audit", pkg],
check=False,
capture_output=True,
text=True,
timeout=300,
)
if proc.returncode != 0:
logger.warning(
"[install] npm install failed for %s: %s", pkg, proc.stderr.strip()[:500]
)
return None
except (subprocess.TimeoutExpired, OSError) as e:
logger.warning("[install] npm install errored for %s: %s", pkg, e)
return None
# Find the bin
nm_bin = staging / "node_modules" / ".bin" / bin_name
if os.name == "nt":
# On Windows npm sometimes drops `.cmd` shims
candidates = [nm_bin, nm_bin.with_suffix(".cmd")]
else:
candidates = [nm_bin]
for c in candidates:
if c.exists():
# Symlink into our `lsp/bin/` for stable PATH access.
link = hermes_lsp_bin_dir() / c.name
if not link.exists():
try:
link.symlink_to(c)
except (OSError, NotImplementedError):
# Symlinks fail on some Windows setups — copy instead.
try:
shutil.copy2(c, link)
except OSError:
return str(c)
return str(link if link.exists() else c)
logger.warning("[install] npm install for %s succeeded but bin %s not found", pkg, bin_name)
return None
def _install_go(pkg: str, bin_name: str) -> Optional[str]:
"""Install a Go module to GOBIN=<staging>."""
go = shutil.which("go")
if go is None:
logger.info("[install] cannot install %s: go not on PATH", pkg)
return None
staging = hermes_lsp_bin_dir()
env = dict(os.environ)
env["GOBIN"] = str(staging)
try:
logger.info("[install] go install %s (GOBIN=%s)", pkg, staging)
proc = subprocess.run(
[go, "install", pkg],
check=False,
capture_output=True,
text=True,
timeout=600,
env=env,
)
if proc.returncode != 0:
logger.warning(
"[install] go install failed for %s: %s", pkg, proc.stderr.strip()[:500]
)
return None
except (subprocess.TimeoutExpired, OSError) as e:
logger.warning("[install] go install errored for %s: %s", pkg, e)
return None
bin_path = staging / bin_name
if os.name == "nt":
bin_path = bin_path.with_suffix(".exe")
if bin_path.exists():
return str(bin_path)
logger.warning("[install] go install for %s succeeded but bin %s not found", pkg, bin_name)
return None
def _install_pip(pkg: str, bin_name: str) -> Optional[str]:
"""Install a Python package into a hermes-owned target dir.
We avoid polluting the user's site-packages by using
``pip install --target``. Bins go into
``<staging>/python-packages/bin/`` which we symlink into
``<staging>/bin``. Note: this only works for packages that ship a
console script.
"""
pip_target = hermes_lsp_bin_dir().parent / "python-packages"
pip_target.mkdir(parents=True, exist_ok=True)
try:
logger.info("[install] pip install --target %s %s", pip_target, pkg)
proc = subprocess.run(
[sys.executable, "-m", "pip", "install", "--target", str(pip_target), "--quiet", pkg],
check=False,
capture_output=True,
text=True,
timeout=300,
)
if proc.returncode != 0:
logger.warning(
"[install] pip install failed for %s: %s", pkg, proc.stderr.strip()[:500]
)
return None
except (subprocess.TimeoutExpired, OSError) as e:
logger.warning("[install] pip install errored for %s: %s", pkg, e)
return None
# Look for the script
bin_path = pip_target / "bin" / bin_name
if bin_path.exists():
link = hermes_lsp_bin_dir() / bin_name
if not link.exists():
try:
link.symlink_to(bin_path)
except (OSError, NotImplementedError):
try:
shutil.copy2(bin_path, link)
except OSError:
return str(bin_path)
return str(link if link.exists() else bin_path)
return None
def detect_status(pkg: str) -> str:
"""Return ``installed``, ``missing``, or ``manual-only`` for a package.
Used by the ``hermes lsp status`` CLI to give users a quick
overview of what's available without spawning anything.
"""
recipe = INSTALL_RECIPES.get(pkg)
bin_name = recipe.get("bin", pkg) if recipe else pkg
if _existing_binary(bin_name):
return "installed"
if recipe and recipe.get("strategy") == "manual":
return "manual-only"
return "missing"
__all__ = [
"INSTALL_RECIPES",
"try_install",
"detect_status",
"hermes_lsp_bin_dir",
]

536
plugins/lsp/manager.py Normal file
View File

@@ -0,0 +1,536 @@
"""Service-level orchestration for LSP clients.
The :class:`LSPService` is the bridge between the synchronous
file_operations layer and the async :class:`agent.lsp.client.LSPClient`.
Design choices:
- A **single asyncio event loop** runs in a background thread. All
client work happens on that loop. Synchronous callers from
``tools/file_operations.py`` use :meth:`get_diagnostics_sync` to
open + wait + drain in one blocking call.
- One client per ``(server_id, workspace_root)`` key. Lazy spawn:
the first request for a key spawns the client; subsequent requests
re-use it.
- A **broken-set** records ``(server_id, workspace_root)`` pairs that
failed to spawn or initialize. These are never retried for the
life of the service. Mirrors OpenCode's design.
- A **delta baseline** map keeps "diagnostics-as-of-the-last-snapshot"
per file. ``snapshot_baseline()`` is called BEFORE a write; the
next ``get_diagnostics_sync()`` returns only diagnostics that
weren't in the baseline. This is the lift from Claude Code's
``beforeFileEdited`` / ``getNewDiagnostics`` pattern, except wired
to the local LSP layer instead of MCP IDE RPC.
The service is **off by default** — call :meth:`is_active` to check
whether it's actually doing anything. When LSP is disabled in
config, when no git workspace can be detected, when all configured
servers are missing binaries and auto-install is off, ``is_active``
returns False and the file_operations layer falls through to the
in-process syntax check.
"""
from __future__ import annotations
import asyncio
import logging
import os
import threading
import time
from concurrent.futures import Future as ConcurrentFuture
from typing import Any, Dict, List, Optional, Tuple
from plugins.lsp import eventlog
from plugins.lsp.client import (
DIAGNOSTICS_DOCUMENT_WAIT,
LSPClient,
file_uri,
)
from plugins.lsp.servers import (
ServerContext,
ServerDef,
SpawnSpec,
find_server_for_file,
language_id_for,
)
from plugins.lsp.workspace import (
clear_cache,
is_inside_workspace,
resolve_workspace_for_file,
)
logger = logging.getLogger("agent.lsp.manager")
DEFAULT_IDLE_TIMEOUT = 600 # seconds; servers idle for >10min get reaped
class _BackgroundLoop:
"""A daemon thread that owns one asyncio event loop.
Provides :meth:`run` for synchronous callers — submits a coroutine
to the loop and blocks until it finishes (or a timeout fires).
"""
def __init__(self) -> None:
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._thread: Optional[threading.Thread] = None
self._ready = threading.Event()
def start(self) -> None:
if self._thread is not None:
return
self._thread = threading.Thread(
target=self._run_forever,
name="hermes-lsp-loop",
daemon=True,
)
self._thread.start()
self._ready.wait(timeout=5.0)
def _run_forever(self) -> None:
loop = asyncio.new_event_loop()
self._loop = loop
asyncio.set_event_loop(loop)
self._ready.set()
try:
loop.run_forever()
finally:
try:
loop.close()
except Exception: # noqa: BLE001
pass
def run(self, coro, *, timeout: Optional[float] = None) -> Any:
"""Submit a coroutine to the loop and block until done.
Returns the coroutine's result, or raises its exception.
"""
if self._loop is None:
raise RuntimeError("background loop not started")
fut: ConcurrentFuture = asyncio.run_coroutine_threadsafe(coro, self._loop)
try:
return fut.result(timeout=timeout)
except Exception:
fut.cancel()
raise
def stop(self) -> None:
loop = self._loop
if loop is None:
return
try:
loop.call_soon_threadsafe(loop.stop)
except RuntimeError:
pass
if self._thread is not None:
self._thread.join(timeout=2.0)
self._loop = None
self._thread = None
class LSPService:
"""The process-wide LSP service.
Created once via :meth:`create_from_config`; the
:func:`agent.lsp.get_service` accessor manages the singleton.
Most callers should use that accessor rather than constructing
:class:`LSPService` directly.
"""
# ------------------------------------------------------------------
# construction + factory
# ------------------------------------------------------------------
def __init__(
self,
*,
enabled: bool,
wait_mode: str,
wait_timeout: float,
install_strategy: str,
binary_overrides: Optional[Dict[str, List[str]]] = None,
env_overrides: Optional[Dict[str, Dict[str, str]]] = None,
init_overrides: Optional[Dict[str, Dict[str, Any]]] = None,
disabled_servers: Optional[List[str]] = None,
idle_timeout: float = DEFAULT_IDLE_TIMEOUT,
) -> None:
self._enabled = enabled
self._wait_mode = wait_mode if wait_mode in ("document", "full") else "document"
self._wait_timeout = wait_timeout
self._install_strategy = install_strategy
self._binary_overrides = binary_overrides or {}
self._env_overrides = env_overrides or {}
self._init_overrides = init_overrides or {}
self._disabled_servers = set(disabled_servers or [])
self._idle_timeout = idle_timeout
self._loop = _BackgroundLoop()
if self._enabled:
self._loop.start()
# Per-(server_id, workspace_root) state
self._clients: Dict[Tuple[str, str], LSPClient] = {}
self._broken: set = set()
self._spawning: Dict[Tuple[str, str], asyncio.Future] = {}
self._last_used: Dict[Tuple[str, str], float] = {}
self._state_lock = threading.Lock()
# Delta baseline: file path → snapshot of diagnostics taken
# immediately before a write. ``get_diagnostics_sync`` filters
# out anything in the baseline so the agent only sees errors
# introduced by the current edit.
self._delta_baseline: Dict[str, List[Dict[str, Any]]] = {}
@classmethod
def create_from_config(cls) -> Optional["LSPService"]:
"""Build a service from ``hermes_cli.config`` settings.
Returns ``None`` if the config can't be loaded. The service
itself returns ``is_active()`` False when LSP is disabled.
"""
try:
from hermes_cli.config import load_config
cfg = load_config()
except Exception as e: # noqa: BLE001
logger.debug("LSP config load failed: %s", e)
return None
lsp_cfg = (cfg.get("lsp") or {}) if isinstance(cfg, dict) else {}
if not isinstance(lsp_cfg, dict):
lsp_cfg = {}
enabled = bool(lsp_cfg.get("enabled", True))
wait_mode = lsp_cfg.get("wait_mode", "document")
wait_timeout = float(lsp_cfg.get("wait_timeout", DIAGNOSTICS_DOCUMENT_WAIT))
install_strategy = lsp_cfg.get("install_strategy", "auto")
servers_cfg = lsp_cfg.get("servers") or {}
disabled = []
binary_overrides: Dict[str, List[str]] = {}
env_overrides: Dict[str, Dict[str, str]] = {}
init_overrides: Dict[str, Dict[str, Any]] = {}
if isinstance(servers_cfg, dict):
for name, sub in servers_cfg.items():
if not isinstance(sub, dict):
continue
if sub.get("disabled"):
disabled.append(name)
cmd = sub.get("command")
if isinstance(cmd, list) and cmd:
binary_overrides[name] = cmd
env = sub.get("env")
if isinstance(env, dict):
env_overrides[name] = {k: str(v) for k, v in env.items()}
init = sub.get("initialization_options")
if isinstance(init, dict):
init_overrides[name] = init
return cls(
enabled=enabled,
wait_mode=wait_mode,
wait_timeout=wait_timeout,
install_strategy=install_strategy,
binary_overrides=binary_overrides,
env_overrides=env_overrides,
init_overrides=init_overrides,
disabled_servers=disabled,
)
# ------------------------------------------------------------------
# public API
# ------------------------------------------------------------------
def is_active(self) -> bool:
"""Return True iff this service should be consulted at all."""
return self._enabled
def enabled_for(self, file_path: str) -> bool:
"""Return True iff LSP should run for this specific file.
Gates on workspace detection (file or cwd inside a git worktree)
and on whether any registered server matches the extension.
"""
if not self._enabled:
return False
srv = find_server_for_file(file_path)
if srv is None or srv.server_id in self._disabled_servers:
return False
ws_root, gated_in = resolve_workspace_for_file(file_path)
return bool(ws_root and gated_in)
def snapshot_baseline(self, file_path: str) -> None:
"""Snapshot current diagnostics for ``file_path`` as the delta baseline.
Called BEFORE a write so the next ``get_diagnostics_sync()``
can filter out pre-existing errors. Best-effort — failures
are silently swallowed so a flaky server can't break a write.
"""
if not self.enabled_for(file_path):
return
try:
diags = self._loop.run(self._snapshot_async(file_path), timeout=8.0)
self._delta_baseline[os.path.abspath(file_path)] = diags or []
except Exception as e: # noqa: BLE001
logger.debug("baseline snapshot failed for %s: %s", file_path, e)
# Set empty baseline so the next call still does the
# comparison (any post-edit diagnostic will be considered
# "new" — safe default).
self._delta_baseline[os.path.abspath(file_path)] = []
def get_diagnostics_sync(
self,
file_path: str,
*,
delta: bool = True,
timeout: Optional[float] = None,
) -> List[Dict[str, Any]]:
"""Synchronously open ``file_path`` in the right server, wait for
diagnostics, return them.
If ``delta`` is True (default), the result is filtered against
any baseline previously captured via :meth:`snapshot_baseline`.
Diagnostics present in the baseline are removed so the caller
only sees errors introduced by the current edit.
Returns an empty list when LSP is disabled, when no workspace
can be detected, when no server matches, or when the server
can't be spawned. Never raises.
"""
if not self.enabled_for(file_path):
return []
# Resolve server_id eagerly so we can emit structured logs even
# when the request errors out below.
srv = find_server_for_file(file_path)
server_id = srv.server_id if srv else "?"
try:
t = timeout if timeout is not None else self._wait_timeout + 2.0
diags = self._loop.run(self._open_and_wait_async(file_path), timeout=t) or []
except asyncio.TimeoutError as e:
eventlog.log_timeout(server_id, file_path)
logger.debug("LSP diagnostics timeout for %s: %s", file_path, e)
return []
except Exception as e: # noqa: BLE001
eventlog.log_server_error(server_id, file_path, e)
logger.debug("LSP diagnostics fetch failed for %s: %s", file_path, e)
return []
abs_path = os.path.abspath(file_path)
if delta:
baseline = self._delta_baseline.get(abs_path) or []
if baseline:
seen = {_diag_key(d) for d in baseline}
diags = [d for d in diags if _diag_key(d) not in seen]
# Roll baseline forward — next call returns deltas relative
# to the just-emitted state, mirroring claude-code's
# diagnosticTracking.
try:
fresh = self._loop.run(self._current_diags_async(file_path), timeout=2.0) or []
except Exception: # noqa: BLE001
fresh = []
if fresh:
self._delta_baseline[abs_path] = fresh
if diags:
eventlog.log_diagnostics(server_id, file_path, len(diags))
else:
eventlog.log_clean(server_id, file_path)
return diags
def shutdown(self) -> None:
"""Tear down all clients and stop the background loop."""
if not self._enabled:
return
try:
self._loop.run(self._shutdown_async(), timeout=10.0)
except Exception as e: # noqa: BLE001
logger.debug("LSP shutdown error: %s", e)
self._loop.stop()
clear_cache()
# ------------------------------------------------------------------
# async internals
# ------------------------------------------------------------------
async def _snapshot_async(self, file_path: str) -> List[Dict[str, Any]]:
client = await self._get_or_spawn(file_path)
if client is None:
return []
try:
version = await client.open_file(file_path, language_id=language_id_for(file_path))
await client.wait_for_diagnostics(file_path, version, mode=self._wait_mode)
except Exception as e: # noqa: BLE001
logger.debug("snapshot open/wait failed: %s", e)
return []
self._last_used[(client.server_id, client.workspace_root)] = time.time()
return list(client.diagnostics_for(file_path))
async def _open_and_wait_async(self, file_path: str) -> List[Dict[str, Any]]:
client = await self._get_or_spawn(file_path)
if client is None:
return []
try:
version = await client.open_file(file_path, language_id=language_id_for(file_path))
await client.save_file(file_path)
await client.wait_for_diagnostics(file_path, version, mode=self._wait_mode)
except Exception as e: # noqa: BLE001
logger.debug("open/wait failed for %s: %s", file_path, e)
return []
self._last_used[(client.server_id, client.workspace_root)] = time.time()
return list(client.diagnostics_for(file_path))
async def _current_diags_async(self, file_path: str) -> List[Dict[str, Any]]:
ws, gated = resolve_workspace_for_file(file_path)
srv = find_server_for_file(file_path)
if not (ws and gated and srv):
return []
with self._state_lock:
client = self._clients.get((srv.server_id, ws))
if client is None:
return []
return list(client.diagnostics_for(file_path))
async def _get_or_spawn(self, file_path: str) -> Optional[LSPClient]:
srv = find_server_for_file(file_path)
if srv is None:
return None
if srv.server_id in self._disabled_servers:
eventlog.log_disabled(srv.server_id, file_path, "disabled in config")
return None
ws_root, gated = resolve_workspace_for_file(file_path)
if not (ws_root and gated):
eventlog.log_no_project_root(srv.server_id, file_path)
return None
per_server_root = srv.resolve_root(file_path, ws_root)
if per_server_root is None:
eventlog.log_disabled(
srv.server_id, file_path, "exclude marker hit (server gated off)"
)
return None # exclude marker hit, server gated off
key = (srv.server_id, per_server_root)
if key in self._broken:
return None
with self._state_lock:
client = self._clients.get(key)
if client is not None and client.is_running:
eventlog.log_active(srv.server_id, per_server_root)
return client
spawning = self._spawning.get(key)
if spawning is not None:
try:
return await spawning
except Exception: # noqa: BLE001
return None
# Begin spawn
loop = asyncio.get_running_loop()
spawn_future: asyncio.Future = loop.create_future()
with self._state_lock:
self._spawning[key] = spawn_future
try:
ctx = ServerContext(
workspace_root=per_server_root,
install_strategy=self._install_strategy,
binary_overrides=self._binary_overrides,
env_overrides=self._env_overrides,
init_overrides=self._init_overrides,
)
spec = srv.build_spawn(per_server_root, ctx)
if spec is None:
# ``build_spawn`` returns None when the binary can't be
# located (auto-install disabled, manual-only server,
# or install attempt failed). Surface this once via
# the structured logger so the user can act on it.
eventlog.log_server_unavailable(srv.server_id, srv.server_id)
self._broken.add(key)
spawn_future.set_result(None)
return None
client = LSPClient(
server_id=srv.server_id,
workspace_root=spec.workspace_root,
command=spec.command,
env=spec.env,
cwd=spec.cwd,
initialization_options=spec.initialization_options,
seed_diagnostics_on_first_push=spec.seed_diagnostics_on_first_push or srv.seed_first_push,
)
try:
await client.start()
except Exception as e: # noqa: BLE001
eventlog.log_spawn_failed(srv.server_id, per_server_root, e)
self._broken.add(key)
spawn_future.set_result(None)
return None
with self._state_lock:
self._clients[key] = client
self._last_used[key] = time.time()
eventlog.log_active(srv.server_id, per_server_root)
spawn_future.set_result(client)
return client
finally:
with self._state_lock:
self._spawning.pop(key, None)
async def _shutdown_async(self) -> None:
with self._state_lock:
clients = list(self._clients.values())
self._clients.clear()
self._broken.clear()
self._last_used.clear()
await asyncio.gather(
*(c.shutdown() for c in clients),
return_exceptions=True,
)
# ------------------------------------------------------------------
# status / introspection (used by ``hermes lsp status``)
# ------------------------------------------------------------------
def get_status(self) -> Dict[str, Any]:
"""Return a snapshot of the service for the CLI status command."""
with self._state_lock:
clients = [
{
"server_id": k[0],
"workspace_root": k[1],
"state": c.state,
"running": c.is_running,
}
for k, c in self._clients.items()
]
broken = list(self._broken)
return {
"enabled": self._enabled,
"wait_mode": self._wait_mode,
"wait_timeout": self._wait_timeout,
"install_strategy": self._install_strategy,
"clients": clients,
"broken": broken,
"disabled_servers": sorted(self._disabled_servers),
}
def _diag_key(d: Dict[str, Any]) -> str:
"""Content equality key used for delta filtering. Mirrors
:func:`agent.lsp.client._diagnostic_key`."""
rng = d.get("range") or {}
start = rng.get("start") or {}
end = rng.get("end") or {}
code = d.get("code")
if code is not None and not isinstance(code, str):
code = str(code)
return "\x00".join(
[
str(d.get("severity") or 1),
str(code or ""),
str(d.get("source") or ""),
str(d.get("message") or "").strip(),
f"{start.get('line', 0)}:{start.get('character', 0)}-{end.get('line', 0)}:{end.get('character', 0)}",
]
)
__all__ = ["LSPService"]

11
plugins/lsp/plugin.yaml Normal file
View File

@@ -0,0 +1,11 @@
name: lsp
version: "1.0.0"
description: >-
Semantic diagnostics from real language servers (pyright, gopls,
rust-analyzer, typescript-language-server, etc.) surfaced on
write_file/patch. Opt-in: add 'lsp' to plugins.enabled in config.yaml.
author: NousResearch
hooks:
- pre_tool_call
- transform_tool_result
- on_session_end

196
plugins/lsp/protocol.py Normal file
View File

@@ -0,0 +1,196 @@
"""Minimal LSP JSON-RPC 2.0 framer over async streams.
LSP wire format:
Content-Length: <bytes>\\r\\n
\\r\\n
<utf-8 JSON body>
The body is a JSON-RPC 2.0 envelope: request, response, or notification.
This module replaces what ``vscode-jsonrpc/node`` would do in a
TypeScript implementation. We keep it deliberately small — just the
framer + envelope helpers — so :class:`agent.lsp.client.LSPClient` can
focus on protocol semantics.
"""
from __future__ import annotations
import asyncio
import json
import logging
from typing import Any, Optional, Tuple
logger = logging.getLogger("agent.lsp.protocol")
# LSP error codes we care about. Full list in
# https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#errorCodes
ERROR_CONTENT_MODIFIED = -32801
ERROR_REQUEST_CANCELLED = -32800
ERROR_METHOD_NOT_FOUND = -32601
class LSPProtocolError(Exception):
"""Raised when the wire protocol is violated.
Distinct from :class:`LSPRequestError` which represents a server
returning a JSON-RPC error response — that's protocol-conformant.
This exception means the framing or envelope itself is broken.
"""
class LSPRequestError(Exception):
"""Raised when an LSP request returns an error response.
Carries the JSON-RPC ``code``, ``message``, and optional ``data``.
"""
def __init__(self, code: int, message: str, data: Any = None) -> None:
super().__init__(f"LSP error {code}: {message}")
self.code = code
self.message = message
self.data = data
def encode_message(obj: dict) -> bytes:
"""Encode a JSON-RPC envelope as a Content-Length framed byte string.
The body is encoded as compact UTF-8 JSON (no spaces between
separators) — matches what ``vscode-jsonrpc`` emits and keeps the
Content-Length count exact.
"""
body = json.dumps(obj, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
header = f"Content-Length: {len(body)}\r\n\r\n".encode("ascii")
return header + body
async def read_message(reader: asyncio.StreamReader) -> Optional[dict]:
"""Read one Content-Length framed JSON-RPC message from the stream.
Returns ``None`` on clean EOF (server closed stdout cleanly between
messages — typical shutdown). Raises :class:`LSPProtocolError` on
malformed framing.
The reader is advanced to just past the JSON body on success.
"""
headers: dict = {}
header_bytes = 0
while True:
try:
line = await reader.readuntil(b"\r\n")
except asyncio.IncompleteReadError as e:
# EOF while reading headers. If we hadn't started a header
# block, treat as clean EOF; otherwise the framing is bad.
if not e.partial and not headers:
return None
raise LSPProtocolError(
f"unexpected EOF while reading LSP headers (partial={e.partial!r})"
) from e
# Defensive cap against a server streaming headers without ever
# emitting CRLF-CRLF. Caps total header bytes at 8 KiB — a
# well-behaved server fits in well under 200 bytes.
header_bytes += len(line)
if header_bytes > 8192:
raise LSPProtocolError(
f"LSP header block exceeded 8 KiB without terminator"
)
line = line[:-2] # strip CRLF
if not line:
break # blank line ends header block
try:
key, _, value = line.decode("ascii").partition(":")
except UnicodeDecodeError as e:
raise LSPProtocolError(f"non-ASCII LSP header: {line!r}") from e
if not key:
raise LSPProtocolError(f"malformed LSP header line: {line!r}")
headers[key.strip().lower()] = value.strip()
cl = headers.get("content-length")
if cl is None:
raise LSPProtocolError(f"LSP message missing Content-Length: {headers!r}")
try:
n = int(cl)
except ValueError as e:
raise LSPProtocolError(f"non-integer Content-Length: {cl!r}") from e
if n < 0 or n > 64 * 1024 * 1024: # 64 MiB sanity cap
raise LSPProtocolError(f"unreasonable Content-Length: {n}")
try:
body = await reader.readexactly(n)
except asyncio.IncompleteReadError as e:
raise LSPProtocolError(
f"truncated LSP body: expected {n} bytes, got {len(e.partial)}"
) from e
try:
return json.loads(body.decode("utf-8"))
except json.JSONDecodeError as e:
raise LSPProtocolError(f"invalid JSON in LSP body: {e}") from e
except UnicodeDecodeError as e:
raise LSPProtocolError(f"non-UTF-8 LSP body: {e}") from e
def make_request(req_id: int, method: str, params: Any) -> dict:
"""Build a JSON-RPC 2.0 request envelope."""
msg: dict = {"jsonrpc": "2.0", "id": req_id, "method": method}
if params is not None:
msg["params"] = params
return msg
def make_notification(method: str, params: Any) -> dict:
"""Build a JSON-RPC 2.0 notification envelope (no ``id``)."""
msg: dict = {"jsonrpc": "2.0", "method": method}
if params is not None:
msg["params"] = params
return msg
def make_response(req_id: Any, result: Any) -> dict:
"""Build a JSON-RPC 2.0 success response envelope."""
return {"jsonrpc": "2.0", "id": req_id, "result": result}
def make_error_response(req_id: Any, code: int, message: str, data: Any = None) -> dict:
"""Build a JSON-RPC 2.0 error response envelope."""
err: dict = {"code": code, "message": message}
if data is not None:
err["data"] = data
return {"jsonrpc": "2.0", "id": req_id, "error": err}
def classify_message(msg: dict) -> Tuple[str, Any]:
"""Return ``(kind, key)`` where kind is one of ``request``,
``response``, ``notification``, ``invalid``.
The key is the request id for request/response, the method name
for notifications, and ``None`` for invalid messages.
"""
if not isinstance(msg, dict):
return "invalid", None
if msg.get("jsonrpc") != "2.0":
return "invalid", None
has_id = "id" in msg
has_method = "method" in msg
if has_id and has_method:
return "request", msg["id"]
if has_id and ("result" in msg or "error" in msg):
return "response", msg["id"]
if has_method and not has_id:
return "notification", msg["method"]
return "invalid", None
__all__ = [
"ERROR_CONTENT_MODIFIED",
"ERROR_REQUEST_CANCELLED",
"ERROR_METHOD_NOT_FOUND",
"LSPProtocolError",
"LSPRequestError",
"encode_message",
"read_message",
"make_request",
"make_notification",
"make_response",
"make_error_response",
"classify_message",
]

78
plugins/lsp/reporter.py Normal file
View File

@@ -0,0 +1,78 @@
"""Format LSP diagnostics for inclusion in tool output.
The model sees a compact, severity-filtered, line-bounded summary of
diagnostics introduced by the latest edit. Format matches what
OpenCode's ``lsp/diagnostic.ts`` and Claude Code's
``formatDiagnosticsSummary`` produce — ``<diagnostics>`` blocks with
1-indexed line/column, capped at ``MAX_PER_FILE`` errors.
"""
from __future__ import annotations
from typing import Any, Dict, List
# Severity-1 only by default — warnings/info/hints would flood the
# agent. Lift this in config under ``lsp.severities`` if needed.
SEVERITY_NAMES = {1: "ERROR", 2: "WARN", 3: "INFO", 4: "HINT"}
DEFAULT_SEVERITIES = frozenset({1}) # ERROR only
MAX_PER_FILE = 20
MAX_TOTAL_CHARS = 4000
def format_diagnostic(d: Dict[str, Any]) -> str:
"""One-line representation of a single diagnostic."""
sev = SEVERITY_NAMES.get(d.get("severity") or 1, "ERROR")
rng = d.get("range") or {}
start = rng.get("start") or {}
line = int(start.get("line", 0)) + 1
col = int(start.get("character", 0)) + 1
msg = str(d.get("message") or "").rstrip()
code = d.get("code")
code_part = f" [{code}]" if code not in (None, "") else ""
source = d.get("source")
source_part = f" ({source})" if source else ""
return f"{sev} [{line}:{col}] {msg}{code_part}{source_part}"
def report_for_file(
file_path: str,
diagnostics: List[Dict[str, Any]],
*,
severities: frozenset = DEFAULT_SEVERITIES,
max_per_file: int = MAX_PER_FILE,
) -> str:
"""Build a ``<diagnostics file=...>`` block for one file.
Returns an empty string when no diagnostics pass the severity
filter, so callers can do ``if block:`` to skip empty cases.
"""
if not diagnostics:
return ""
filtered = [d for d in diagnostics if (d.get("severity") or 1) in severities]
if not filtered:
return ""
limited = filtered[:max_per_file]
extra = len(filtered) - len(limited)
lines = [format_diagnostic(d) for d in limited]
body = "\n".join(lines)
if extra > 0:
body += f"\n... and {extra} more"
return f"<diagnostics file=\"{file_path}\">\n{body}\n</diagnostics>"
def truncate(s: str, *, limit: int = MAX_TOTAL_CHARS) -> str:
"""Hard-cap a formatted summary string."""
if len(s) <= limit:
return s
marker = "\n…[truncated]"
return s[: limit - len(marker)] + marker
__all__ = [
"SEVERITY_NAMES",
"DEFAULT_SEVERITIES",
"MAX_PER_FILE",
"format_diagnostic",
"report_for_file",
"truncate",
]

1025
plugins/lsp/servers.py Normal file

File diff suppressed because it is too large Load Diff

223
plugins/lsp/workspace.py Normal file
View File

@@ -0,0 +1,223 @@
"""Workspace and project-root resolution for LSP.
Two concerns live here:
1. **Workspace gate** — the upper-level "is this directory a project?"
check. Hermes only runs LSP when the cwd (or the file being edited)
sits inside a git worktree. Files outside any git root never
trigger LSP, even if a server is configured. This keeps Telegram
gateway users on user-home cwd's from spawning daemons.
2. **NearestRoot** — the per-server project-root walk. Each language
server cares about a different marker (``pyproject.toml`` for
Python, ``Cargo.toml`` for Rust, ``go.mod`` for Go, etc.) and
wants the directory containing that marker. ``nearest_root()``
walks up from a starting path looking for any of a list of marker
files, optionally bailing if an exclude marker shows up first.
"""
from __future__ import annotations
import logging
import os
from pathlib import Path
from typing import Iterable, Optional, Tuple
logger = logging.getLogger("agent.lsp.workspace")
# Cache: cwd → (worktree_root, is_git) so repeated calls don't re-stat.
# Cleared on shutdown. Keyed by absolute resolved path so symlink
# folds collapse to one entry.
_workspace_cache: dict = {}
def normalize_path(path: str) -> str:
"""Normalize a path for use as a stable map key.
Resolves ``~``, makes absolute, and collapses ``.``/``..``. We do
NOT resolve symlinks here — symlink stability matters for some
LSP servers (rust-analyzer cares about Cargo workspace identity)
and we want the canonical path the user typed when possible.
"""
return os.path.abspath(os.path.expanduser(path))
def find_git_worktree(start: str) -> Optional[str]:
"""Walk up from ``start`` looking for a ``.git`` entry (file or dir).
Returns the directory containing ``.git``, or ``None`` if no git
root is found before hitting the filesystem root.
A ``.git`` *file* (not directory) means we're inside a git
worktree set up via ``git worktree add`` — both forms count.
"""
try:
start_path = Path(normalize_path(start))
if start_path.is_file():
start_path = start_path.parent
except (OSError, RuntimeError, ValueError):
# Pathological input (loop in symlinks, encoding error, etc.) —
# bail out rather than crash the lint hook.
return None
# Cache check
cached = _workspace_cache.get(str(start_path))
if cached is not None:
root, _is_git = cached
return root
cur = start_path
# Defensive cap: the deepest reasonable monorepo is well under 64
# levels. Caps the walk so a pathological cwd or a symlink cycle
# we somehow traverse can't keep us looping.
for _ in range(64):
git_marker = cur / ".git"
try:
if git_marker.exists():
resolved = str(cur)
_workspace_cache[str(start_path)] = (resolved, True)
return resolved
except OSError:
# Permission error on a parent dir — bail out cleanly.
break
parent = cur.parent
if parent == cur:
break
cur = parent
_workspace_cache[str(start_path)] = (None, False)
return None
def is_inside_workspace(path: str, workspace_root: str) -> bool:
"""Return True iff ``path`` is inside (or equal to) ``workspace_root``.
Uses absolute paths but does not resolve symlinks — a file accessed
via a symlink that points outside the workspace still counts as
outside. This is the conservative interpretation; matches LSP
behaviour where servers reject didOpen for unrelated files.
"""
p = normalize_path(path)
root = normalize_path(workspace_root)
if p == root:
return True
# Use os.path.commonpath to handle case-insensitive filesystems
# correctly on macOS/Windows.
try:
common = os.path.commonpath([p, root])
except ValueError:
# Different drives on Windows.
return False
return common == root
def nearest_root(
start: str,
markers: Iterable[str],
*,
excludes: Optional[Iterable[str]] = None,
ceiling: Optional[str] = None,
) -> Optional[str]:
"""Walk up from ``start`` looking for any of the given marker files.
Returns the **directory containing** the first matched marker, or
``None`` if no marker is found before hitting ``ceiling`` (or the
filesystem root if no ceiling).
If ``excludes`` is provided and an exclude marker matches *first*
in the upward walk, returns ``None`` — the server is gated off
for that file. Mirrors OpenCode's NearestRoot exclude semantics
(e.g. typescript skips deno projects when ``deno.json`` is found
before ``package.json``).
"""
start_path = Path(normalize_path(start))
try:
if start_path.is_file():
start_path = start_path.parent
except (OSError, RuntimeError, ValueError):
return None
ceiling_path = Path(normalize_path(ceiling)) if ceiling else None
markers_list = list(markers)
excludes_list = list(excludes) if excludes else []
cur = start_path
# Defensive cap matching ``find_git_worktree``. Bounded walk
# protects against pathological inputs even though the
# parent-equality stop normally terminates within ~10 steps.
for _ in range(64):
# Check excludes first — if an exclude is found at this level,
# the server is gated off for this file.
for exc in excludes_list:
try:
if (cur / exc).exists():
return None
except OSError:
continue
# Then check markers.
for marker in markers_list:
try:
if (cur / marker).exists():
return str(cur)
except OSError:
continue
# Stop conditions.
if ceiling_path is not None and cur == ceiling_path:
return None
parent = cur.parent
if parent == cur:
return None
cur = parent
return None
def resolve_workspace_for_file(
file_path: str,
*,
cwd: Optional[str] = None,
) -> Tuple[Optional[str], bool]:
"""Resolve the workspace root for a file.
Returns ``(workspace_root, gated_in)`` where ``gated_in`` is True
iff LSP should run for this file at all. Currently the gate is
"file is inside a git worktree found by walking up from cwd OR
from the file itself".
The cwd path takes precedence — if the agent was launched in a
git project, that worktree is the workspace, and any edit inside
it (regardless of where the file lives) is in-scope. If the cwd
isn't in a git worktree, we try the file's own location as a
fallback.
Returns ``(None, False)`` when neither path is in a git worktree.
"""
cwd = cwd or os.getcwd()
cwd_root = find_git_worktree(cwd)
if cwd_root is not None:
if is_inside_workspace(file_path, cwd_root):
return cwd_root, True
# File is outside the cwd's worktree — try the file's own
# location as a secondary anchor. Useful for monorepos where
# the user opens an unrelated checkout.
file_root = find_git_worktree(file_path)
if file_root is not None:
return file_root, True
return None, False
def clear_cache() -> None:
"""Clear the workspace-resolution cache.
Called on service shutdown so a subsequent re-init doesn't pick
up stale results from a previous session.
"""
_workspace_cache.clear()
__all__ = [
"find_git_worktree",
"is_inside_workspace",
"nearest_root",
"normalize_path",
"resolve_workspace_for_file",
"clear_cache",
]

View File

@@ -0,0 +1 @@
"""Pytest helpers for LSP-related tests."""

View File

@@ -0,0 +1,159 @@
#!/usr/bin/env python3
"""A minimal in-process LSP server used by tests.
Speaks just enough LSP to drive :class:`plugins.lsp.client.LSPClient`
through a full lifecycle: ``initialize``, ``initialized``,
``textDocument/didOpen``, ``textDocument/didChange``, then a
``textDocument/publishDiagnostics`` notification followed by
``shutdown`` + ``exit``.
Behaviour (all behaviours selectable via env var ``MOCK_LSP_SCRIPT``):
- ``"clean"`` — initialize, accept didOpen/didChange, push empty
diagnostics on every open/change, exit cleanly on shutdown.
- ``"errors"`` — same as ``clean`` but the published diagnostics
carry one severity-1 entry pointing at line 0:0.
- ``"crash"`` — exit immediately after responding to ``initialize``
(simulates a crashing server).
- ``"slow"`` — same as ``clean`` but sleeps 1s before responding to
``initialize`` (lets us test timeout behaviour).
The script writes JSON-RPC framed messages to stdout and reads from
stdin. No third-party dependencies — uses only stdlib so it runs
under whatever Python the test process picks up.
"""
from __future__ import annotations
import json
import os
import sys
import time
def read_message():
"""Read one Content-Length framed JSON-RPC message from stdin."""
headers = {}
while True:
line = sys.stdin.buffer.readline()
if not line:
return None
line = line.rstrip(b"\r\n")
if not line:
break
k, _, v = line.decode("ascii").partition(":")
headers[k.strip().lower()] = v.strip()
n = int(headers["content-length"])
body = sys.stdin.buffer.read(n)
return json.loads(body.decode("utf-8"))
def write_message(obj):
body = json.dumps(obj, separators=(",", ":")).encode("utf-8")
sys.stdout.buffer.write(f"Content-Length: {len(body)}\r\n\r\n".encode("ascii"))
sys.stdout.buffer.write(body)
sys.stdout.buffer.flush()
def main():
script = os.environ.get("MOCK_LSP_SCRIPT", "clean")
while True:
msg = read_message()
if msg is None:
return 0
if "id" in msg and msg.get("method") == "initialize":
if script == "slow":
time.sleep(1.0)
write_message(
{
"jsonrpc": "2.0",
"id": msg["id"],
"result": {
"capabilities": {
"textDocumentSync": 1, # Full
"diagnosticProvider": {"interFileDependencies": False, "workspaceDiagnostics": False},
},
"serverInfo": {"name": "mock-lsp", "version": "0.1"},
},
}
)
if script == "crash":
return 0
continue
if msg.get("method") == "initialized":
continue
if msg.get("method") == "workspace/didChangeConfiguration":
continue
if msg.get("method") == "workspace/didChangeWatchedFiles":
continue
if msg.get("method") in ("textDocument/didOpen", "textDocument/didChange"):
params = msg.get("params") or {}
td = params.get("textDocument") or {}
uri = td.get("uri", "")
version = td.get("version", 0)
diagnostics = []
if script == "errors":
diagnostics = [
{
"range": {
"start": {"line": 0, "character": 0},
"end": {"line": 0, "character": 5},
},
"severity": 1,
"code": "MOCK001",
"source": "mock-lsp",
"message": "synthetic error from mock-lsp",
}
]
write_message(
{
"jsonrpc": "2.0",
"method": "textDocument/publishDiagnostics",
"params": {
"uri": uri,
"version": version,
"diagnostics": diagnostics,
},
}
)
continue
if msg.get("method") == "textDocument/diagnostic":
# Pull endpoint — return empty.
write_message(
{
"jsonrpc": "2.0",
"id": msg["id"],
"result": {"kind": "full", "items": []},
}
)
continue
if msg.get("method") == "textDocument/didSave":
continue
if msg.get("method") == "shutdown":
write_message({"jsonrpc": "2.0", "id": msg["id"], "result": None})
continue
if msg.get("method") == "exit":
return 0
# Unknown request: respond with method-not-found.
if "id" in msg:
write_message(
{
"jsonrpc": "2.0",
"id": msg["id"],
"error": {"code": -32601, "message": f"method not found: {msg.get('method')}"},
}
)
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,154 @@
"""Integration test: LSP plugin skips non-local paths.
The host-side LSP server can't see files inside Docker/Modal/SSH
sandboxes. The plugin's ``_pre_tool_call`` uses ``os.path.exists``
on the parent directory as a heuristic local-only gate. These tests
verify the plugin hooks skip when the path clearly doesn't exist on
the host filesystem.
"""
from __future__ import annotations
import os
import sys
from unittest.mock import patch
import pytest
@pytest.fixture(autouse=True)
def _isolate_plugin_state():
"""Reset plugin module state between tests."""
# Import the plugin and clear any service state
from plugins.lsp import _baselines
_baselines.clear()
yield
_baselines.clear()
def test_pre_tool_call_skips_nonexistent_parent_dir():
"""pre_tool_call returns early when the path's parent dir doesn't exist (Docker/SSH heuristic)."""
from plugins import lsp as lsp_plugin
# Simulate a path that doesn't exist on host (e.g., inside Docker)
fake_path = "/nonexistent-docker-container-fs/app/main.py"
# Mock _ensure_service to return a mock service
mock_service = type("MockService", (), {
"is_active": lambda self: True,
"enabled_for": lambda self, p: True,
"snapshot_baseline": lambda self, p: None,
})()
with patch.object(lsp_plugin, "_service", mock_service):
lsp_plugin._pre_tool_call(
tool_name="write_file",
args={"path": fake_path},
session_id="test-session",
tool_call_id="call-1",
)
# Baseline should NOT be captured because parent dir doesn't exist
assert ("test-session", os.path.normpath(fake_path)) not in lsp_plugin._baselines
def test_pre_tool_call_proceeds_for_local_path(tmp_path):
"""pre_tool_call captures baseline when path exists locally."""
from plugins import lsp as lsp_plugin
# Create a real file so the parent-dir check passes
test_file = tmp_path / "test.py"
test_file.write_text("x = 1\n")
mock_service = type("MockService", (), {
"is_active": lambda self: True,
"enabled_for": lambda self, p: True,
"snapshot_baseline": lambda self, p: None,
})()
with patch.object(lsp_plugin, "_service", mock_service):
lsp_plugin._pre_tool_call(
tool_name="write_file",
args={"path": str(test_file)},
session_id="test-session",
tool_call_id="call-2",
)
# Baseline SHOULD be captured because the local path exists
assert ("test-session", str(test_file)) in lsp_plugin._baselines
def test_pre_tool_call_skips_non_write_tools():
"""pre_tool_call is a no-op for tools other than write_file/patch."""
from plugins import lsp as lsp_plugin
lsp_plugin._pre_tool_call(
tool_name="terminal",
args={"command": "ls"},
session_id="test-session",
tool_call_id="call-3",
)
assert len(lsp_plugin._baselines) == 0
def test_pre_tool_call_skips_v4a_patch():
"""pre_tool_call skips V4A multi-file patches (has 'patch' key, no 'path' key)."""
from plugins import lsp as lsp_plugin
mock_service = type("MockService", (), {
"is_active": lambda self: True,
"enabled_for": lambda self, p: True,
"snapshot_baseline": lambda self, p: None,
})()
with patch.object(lsp_plugin, "_service", mock_service):
lsp_plugin._pre_tool_call(
tool_name="patch",
args={"patch": "*** Begin Patch\n*** Update File: foo.py\n..."},
session_id="test-session",
tool_call_id="call-4",
)
assert len(lsp_plugin._baselines) == 0
def test_transform_tool_result_injects_diagnostics(tmp_path):
"""transform_tool_result appends lsp_diagnostics field to JSON result."""
from plugins import lsp as lsp_plugin
test_file = tmp_path / "test.py"
abs_path = str(test_file)
# Pre-populate a baseline entry (simulating pre_tool_call ran)
lsp_plugin._baselines.add(("test-session", abs_path))
# Mock service that returns a diagnostic
mock_service = type("MockService", (), {
"is_active": lambda self: True,
"enabled_for": lambda self, p: True,
"get_diagnostics_sync": lambda self, p, delta=True, timeout=3.0: [
{
"severity": 1,
"range": {"start": {"line": 1, "character": 4}},
"message": "Type error: str is not int",
"code": "reportReturnType",
"source": "Pyright",
}
],
})()
with patch.object(lsp_plugin, "_service", mock_service):
result = lsp_plugin._transform_tool_result(
tool_name="write_file",
args={"path": abs_path},
result='{"bytes_written": 42, "dirs_created": false}',
session_id="test-session",
tool_call_id="call-5",
)
assert result is not None
import json
data = json.loads(result)
assert "lsp_diagnostics" in data
assert "reportReturnType" in data["lsp_diagnostics"]
assert "bytes_written" in data # Original fields preserved

View File

@@ -0,0 +1,149 @@
"""End-to-end client tests against the in-process mock LSP server.
Spins up :file:`_mock_lsp_server.py` as an actual subprocess, drives
it through real LSP traffic, and asserts diagnostic flow. This is
the closest thing we have to integration coverage without requiring
pyright/gopls/etc. to be installed in CI.
"""
from __future__ import annotations
import asyncio
import os
import sys
from pathlib import Path
import pytest
from plugins.lsp.client import LSPClient
# These tests spawn a real subprocess (mock LSP server) and terminate it
# via SIGTERM on shutdown. The conftest live-system guard blocks os.kill
# for PIDs outside the test process subtree; bypass it here because this
# is intentional subprocess lifecycle management.
pytestmark = pytest.mark.live_system_guard_bypass
MOCK_SERVER = str(Path(__file__).parent / "_mock_lsp_server.py")
def _client(workspace: Path, script: str = "clean") -> LSPClient:
env = {"MOCK_LSP_SCRIPT": script, "PYTHONPATH": os.environ.get("PYTHONPATH", "")}
return LSPClient(
server_id=f"mock-{script}",
workspace_root=str(workspace),
command=[sys.executable, MOCK_SERVER],
env=env,
cwd=str(workspace),
)
@pytest.mark.asyncio
async def test_client_lifecycle_clean(tmp_path: Path):
"""Full lifecycle: spawn, initialize, open, get clean diagnostics, shutdown."""
f = tmp_path / "x.py"
f.write_text("print('hi')\n")
client = _client(tmp_path, "clean")
await client.start()
try:
assert client.is_running
version = await client.open_file(str(f), language_id="python")
assert version == 0
await client.wait_for_diagnostics(str(f), version, mode="document")
diags = client.diagnostics_for(str(f))
assert diags == []
finally:
await client.shutdown()
assert not client.is_running
@pytest.mark.asyncio
async def test_client_receives_published_errors(tmp_path: Path):
f = tmp_path / "x.py"
f.write_text("print('hi')\n")
client = _client(tmp_path, "errors")
await client.start()
try:
version = await client.open_file(str(f), language_id="python")
await client.wait_for_diagnostics(str(f), version, mode="document")
diags = client.diagnostics_for(str(f))
assert len(diags) == 1
d = diags[0]
assert d["severity"] == 1
assert d["code"] == "MOCK001"
assert d["source"] == "mock-lsp"
assert "synthetic error" in d["message"]
finally:
await client.shutdown()
@pytest.mark.asyncio
async def test_client_didchange_bumps_version(tmp_path: Path):
f = tmp_path / "x.py"
f.write_text("print('hi')\n")
client = _client(tmp_path, "errors")
await client.start()
try:
v0 = await client.open_file(str(f), language_id="python")
f.write_text("print('hi 2')\n")
v1 = await client.open_file(str(f), language_id="python") # re-open path = didChange
assert v1 == v0 + 1
await client.wait_for_diagnostics(str(f), v1, mode="document")
# Mock pushed a diagnostic for both events; merged view has one
# entry (push store keyed by file path).
diags = client.diagnostics_for(str(f))
assert len(diags) == 1
finally:
await client.shutdown()
@pytest.mark.asyncio
async def test_client_handles_crashing_server(tmp_path: Path):
"""When the server exits right after initialize, subsequent requests
fail gracefully (not hang)."""
f = tmp_path / "x.py"
f.write_text("")
client = _client(tmp_path, "crash")
await client.start() # should succeed (mock answers initialize before crashing)
# Give the OS a moment to deliver the EOF.
await asyncio.sleep(0.2)
# The reader loop should detect EOF and mark pending requests as failed.
try:
await asyncio.wait_for(
client.open_file(str(f), language_id="python"), timeout=2.0
)
except Exception:
pass # any exception is acceptable; the contract is "doesn't hang"
await client.shutdown()
@pytest.mark.asyncio
async def test_client_shutdown_idempotent(tmp_path: Path):
"""Calling shutdown twice must be safe."""
f = tmp_path / "x.py"
f.write_text("")
client = _client(tmp_path, "clean")
await client.start()
await client.shutdown()
await client.shutdown() # must not raise
@pytest.mark.asyncio
async def test_client_diagnostics_are_deduped(tmp_path: Path):
"""Repeated identical pushes must not produce duplicate diagnostics."""
f = tmp_path / "x.py"
f.write_text("")
client = _client(tmp_path, "errors")
await client.start()
try:
for _ in range(3):
v = await client.open_file(str(f), language_id="python")
await client.wait_for_diagnostics(str(f), v, mode="document")
diags = client.diagnostics_for(str(f))
# Push store overwrites on every notification — should have 1.
assert len(diags) == 1
finally:
await client.shutdown()

View File

@@ -0,0 +1,199 @@
"""Tests for the structured logging dedup model.
The contract: a 1000-write session in one project should emit exactly
ONE INFO line ("active for <root>") at the default INFO threshold.
Steady-state events stay at DEBUG; first-time-seen events surface
once at INFO/WARNING.
"""
from __future__ import annotations
import logging
import pytest
from plugins.lsp import eventlog
@pytest.fixture(autouse=True)
def _reset():
eventlog.reset_announce_caches()
yield
eventlog.reset_announce_caches()
@pytest.fixture
def caplog_lsp(caplog):
caplog.set_level(logging.DEBUG, logger="hermes.lint.lsp")
return caplog
# ---------------------------------------------------------------------------
# Steady-state silence (DEBUG)
# ---------------------------------------------------------------------------
def test_clean_emits_at_debug(caplog_lsp):
for _ in range(10):
eventlog.log_clean("pyright", "/proj/x.py")
info_records = [r for r in caplog_lsp.records if r.levelno >= logging.INFO]
debug_records = [r for r in caplog_lsp.records if r.levelno == logging.DEBUG]
assert info_records == []
assert len(debug_records) == 10
def test_disabled_emits_at_debug(caplog_lsp):
eventlog.log_disabled("pyright", "/x.py", "feature off")
eventlog.log_disabled("pyright", "/x.py", "ext not mapped")
assert all(r.levelno == logging.DEBUG for r in caplog_lsp.records)
# ---------------------------------------------------------------------------
# State transitions: INFO once, DEBUG thereafter
# ---------------------------------------------------------------------------
def test_active_for_fires_once_per_root(caplog_lsp):
for _ in range(50):
eventlog.log_active("pyright", "/proj")
info_records = [
r for r in caplog_lsp.records
if r.levelno == logging.INFO and "active for" in r.getMessage()
]
assert len(info_records) == 1
def test_active_for_fires_per_distinct_root(caplog_lsp):
eventlog.log_active("pyright", "/proj-a")
eventlog.log_active("pyright", "/proj-b")
info = [r for r in caplog_lsp.records if r.levelno == logging.INFO]
assert len(info) == 2
def test_active_for_separate_per_server(caplog_lsp):
eventlog.log_active("pyright", "/proj")
eventlog.log_active("typescript", "/proj")
info = [r for r in caplog_lsp.records if r.levelno == logging.INFO]
assert len(info) == 2
def test_no_project_root_fires_once_per_path(caplog_lsp):
for _ in range(5):
eventlog.log_no_project_root("pyright", "/orphan.py")
info = [r for r in caplog_lsp.records if r.levelno == logging.INFO]
assert len(info) == 1
# ---------------------------------------------------------------------------
# Diagnostics events fire INFO every time
# ---------------------------------------------------------------------------
def test_diagnostics_always_info(caplog_lsp):
for i in range(5):
eventlog.log_diagnostics("pyright", f"/x{i}.py", 1)
info = [r for r in caplog_lsp.records if r.levelno == logging.INFO]
assert len(info) == 5
assert all("diags" in r.getMessage() for r in info)
# ---------------------------------------------------------------------------
# Action-required: WARNING once, DEBUG thereafter (or per call for novel events)
# ---------------------------------------------------------------------------
def test_server_unavailable_warns_once_per_binary(caplog_lsp):
for _ in range(20):
eventlog.log_server_unavailable("pyright", "pyright-langserver")
warns = [r for r in caplog_lsp.records if r.levelno == logging.WARNING]
assert len(warns) == 1
assert "pyright-langserver" in warns[0].getMessage()
def test_server_unavailable_separate_per_binary(caplog_lsp):
eventlog.log_server_unavailable("pyright", "pyright-langserver")
eventlog.log_server_unavailable("typescript", "typescript-language-server")
warns = [r for r in caplog_lsp.records if r.levelno == logging.WARNING]
assert len(warns) == 2
def test_no_server_configured_warns_once(caplog_lsp):
for _ in range(10):
eventlog.log_no_server_configured("pyright")
warns = [r for r in caplog_lsp.records if r.levelno == logging.WARNING]
assert len(warns) == 1
def test_timeout_warns_every_call(caplog_lsp):
for _ in range(3):
eventlog.log_timeout("pyright", "/x.py")
warns = [r for r in caplog_lsp.records if r.levelno == logging.WARNING]
assert len(warns) == 3
def test_server_error_warns_every_call(caplog_lsp):
for _ in range(3):
eventlog.log_server_error("pyright", "/x.py", RuntimeError("boom"))
warns = [r for r in caplog_lsp.records if r.levelno == logging.WARNING]
assert len(warns) == 3
def test_spawn_failed_warns(caplog_lsp):
eventlog.log_spawn_failed("pyright", "/proj", FileNotFoundError("nope"))
warns = [r for r in caplog_lsp.records if r.levelno == logging.WARNING]
assert len(warns) == 1
assert "spawn/initialize failed" in warns[0].getMessage()
# ---------------------------------------------------------------------------
# Format: log lines all carry the lsp[<server_id>] prefix for grep
# ---------------------------------------------------------------------------
def test_log_lines_use_lsp_prefix(caplog_lsp):
eventlog.log_clean("pyright", "/x.py")
eventlog.log_active("pyright", "/proj")
eventlog.log_diagnostics("typescript", "/y.ts", 2)
for r in caplog_lsp.records:
assert r.getMessage().startswith("lsp[")
# ---------------------------------------------------------------------------
# Steady-state contract: 1000 clean writes → 1 INFO at most
# ---------------------------------------------------------------------------
def test_thousand_clean_writes_emit_one_info(caplog_lsp):
"""A long session writes lots of files cleanly; agent.log should
show ONE 'active for' INFO and zero other INFO lines."""
eventlog.log_active("pyright", "/proj")
for _ in range(1000):
eventlog.log_clean("pyright", "/proj/x.py")
info_records = [r for r in caplog_lsp.records if r.levelno == logging.INFO]
assert len(info_records) == 1
assert "active for" in info_records[0].getMessage()
# ---------------------------------------------------------------------------
# Path shortening
# ---------------------------------------------------------------------------
def test_short_path_uses_relative_when_inside_cwd(tmp_path, monkeypatch):
monkeypatch.chdir(tmp_path)
sub = tmp_path / "x.py"
sub.write_text("")
out = eventlog._short_path(str(sub))
assert out == "x.py"
def test_short_path_keeps_absolute_when_outside(tmp_path, monkeypatch):
monkeypatch.chdir(tmp_path / "a") if (tmp_path / "a").exists() else None
monkeypatch.chdir(tmp_path)
other = "/var/log/foo.txt"
out = eventlog._short_path(other)
# Outside cwd: keeps absolute (no leading "../")
assert out == "/var/log/foo.txt" or not out.startswith("..")
def test_short_path_handles_empty_string():
assert eventlog._short_path("") == ""

View File

@@ -0,0 +1,203 @@
"""Integration test: full hook flow pre_tool_call → write → transform_tool_result.
Verifies that the plugin hook wiring correctly:
1. Captures a baseline in pre_tool_call
2. Passes through a write (no interference)
3. Injects diagnostics in transform_tool_result
Uses a mocked LSP service to avoid requiring pyright/gopls in CI.
"""
from __future__ import annotations
import json
import os
from unittest.mock import patch
import pytest
@pytest.fixture(autouse=True)
def _isolate():
"""Clear plugin state between tests."""
from plugins import lsp as lsp_plugin
lsp_plugin._baselines.clear()
old_service = lsp_plugin._service
yield
lsp_plugin._baselines.clear()
lsp_plugin._service = old_service
class FakeLSPService:
"""Minimal LSP service mock that returns canned diagnostics."""
def __init__(self, diagnostics=None):
self._diagnostics = diagnostics or []
def is_active(self):
return True
def enabled_for(self, path):
return path.endswith(".py") or path.endswith(".ts")
def snapshot_baseline(self, path):
pass # no-op, just marks that we visited
def get_diagnostics_sync(self, path, delta=True, timeout=3.0):
return self._diagnostics
def shutdown(self):
pass
def test_full_hook_flow_produces_diagnostics(tmp_path):
"""Exercise pre_tool_call → (write) → transform_tool_result end-to-end."""
from plugins import lsp as lsp_plugin
test_file = tmp_path / "broken.py"
test_file.write_text("x: int = 'oops'\n")
abs_path = str(test_file)
fake_service = FakeLSPService(diagnostics=[
{
"severity": 1,
"range": {"start": {"line": 0, "character": 9}},
"message": 'Expression of type "str" is incompatible with declared type "int"',
"code": "reportAssignmentType",
"source": "Pyright",
}
])
with patch.object(lsp_plugin, "_service", fake_service):
# Step 1: pre_tool_call captures baseline
lsp_plugin._pre_tool_call(
tool_name="write_file",
args={"path": abs_path, "content": "x: int = 'oops'\n"},
session_id="test-session",
tool_call_id="call-001",
)
assert ("test-session", abs_path) in lsp_plugin._baselines
# Step 2: simulate the write completing (tool output)
tool_result = json.dumps({
"bytes_written": 16,
"dirs_created": False,
"lint": None,
})
# Step 3: transform_tool_result injects diagnostics
transformed = lsp_plugin._transform_tool_result(
tool_name="write_file",
args={"path": abs_path, "content": "x: int = 'oops'\n"},
result=tool_result,
session_id="test-session",
tool_call_id="call-001",
)
# Verify: result is valid JSON with lsp_diagnostics field
assert transformed is not None
data = json.loads(transformed)
assert "lsp_diagnostics" in data
assert "reportAssignmentType" in data["lsp_diagnostics"]
assert "Pyright" in data["lsp_diagnostics"]
# Original fields preserved
assert data["bytes_written"] == 16
assert data["dirs_created"] is False
# Baseline consumed (removed after use)
assert ("test-session", abs_path) not in lsp_plugin._baselines
def test_hook_flow_returns_none_when_no_diagnostics(tmp_path):
"""transform_tool_result returns None (no modification) when LSP is clean."""
from plugins import lsp as lsp_plugin
test_file = tmp_path / "clean.py"
test_file.write_text("x: int = 42\n")
abs_path = str(test_file)
fake_service = FakeLSPService(diagnostics=[]) # Clean — no errors
with patch.object(lsp_plugin, "_service", fake_service):
lsp_plugin._pre_tool_call(
tool_name="write_file",
args={"path": abs_path, "content": "x: int = 42\n"},
session_id="test-session",
tool_call_id="call-002",
)
transformed = lsp_plugin._transform_tool_result(
tool_name="write_file",
args={"path": abs_path, "content": "x: int = 42\n"},
result='{"bytes_written": 12}',
session_id="test-session",
tool_call_id="call-002",
)
# No diagnostics → return None → result unchanged
assert transformed is None
def test_hook_flow_no_baseline_means_no_injection(tmp_path):
"""transform_tool_result does nothing if pre_tool_call didn't fire."""
from plugins import lsp as lsp_plugin
test_file = tmp_path / "no_baseline.py"
abs_path = str(test_file)
fake_service = FakeLSPService(diagnostics=[
{"severity": 1, "range": {"start": {"line": 0, "character": 0}},
"message": "error", "code": "E1", "source": "test"}
])
with patch.object(lsp_plugin, "_service", fake_service):
# Skip pre_tool_call — simulate a case where it didn't fire
transformed = lsp_plugin._transform_tool_result(
tool_name="write_file",
args={"path": abs_path},
result='{"bytes_written": 5}',
session_id="test-session",
tool_call_id="call-003",
)
# No baseline was captured, so no injection
assert transformed is None
def test_hook_flow_patch_tool(tmp_path):
"""Hook flow works for patch tool (single-path mode)."""
from plugins import lsp as lsp_plugin
test_file = tmp_path / "patched.py"
test_file.write_text("def f() -> int:\n return 'wrong'\n")
abs_path = str(test_file)
fake_service = FakeLSPService(diagnostics=[
{
"severity": 1,
"range": {"start": {"line": 1, "character": 11}},
"message": 'Cannot return "str" from function with return type "int"',
"code": "reportReturnType",
"source": "Pyright",
}
])
with patch.object(lsp_plugin, "_service", fake_service):
lsp_plugin._pre_tool_call(
tool_name="patch",
args={"path": abs_path, "old_string": "return 42", "new_string": "return 'wrong'"},
session_id="test-session",
tool_call_id="call-004",
)
transformed = lsp_plugin._transform_tool_result(
tool_name="patch",
args={"path": abs_path, "old_string": "return 42", "new_string": "return 'wrong'"},
result='{"success": true, "diff": "..."}',
session_id="test-session",
tool_call_id="call-004",
)
assert transformed is not None
data = json.loads(transformed)
assert "lsp_diagnostics" in data
assert "reportReturnType" in data["lsp_diagnostics"]

View File

@@ -0,0 +1,197 @@
"""Tests for the LSP protocol framing layer.
The framer is small but load-bearing — Content-Length parsing is the
single most common reason for hand-rolled LSP clients to silently
deadlock. These tests exercise:
- exact wire format of outgoing messages (encode_message)
- partial-read tolerance + EOF handling (read_message)
- envelope helpers (request, response, notification, error)
- message classification
"""
from __future__ import annotations
import asyncio
import json
import pytest
from plugins.lsp.protocol import (
ERROR_CONTENT_MODIFIED,
ERROR_METHOD_NOT_FOUND,
LSPProtocolError,
LSPRequestError,
classify_message,
encode_message,
make_error_response,
make_notification,
make_request,
make_response,
read_message,
)
# ---------------------------------------------------------------------------
# encode_message
# ---------------------------------------------------------------------------
def test_encode_message_uses_compact_separators_and_utf8():
msg = {"jsonrpc": "2.0", "id": 1, "method": "x", "params": {"k": "ä"}}
out = encode_message(msg)
# Header is plain ASCII Content-Length CRLF CRLF
header_end = out.index(b"\r\n\r\n") + 4
header = out[:header_end].decode("ascii")
body = out[header_end:]
assert "Content-Length:" in header
declared = int(header.split("Content-Length:")[1].split("\r\n")[0].strip())
# Declared length must equal actual body bytes.
assert declared == len(body)
# Body parses as JSON and round-trips.
parsed = json.loads(body.decode("utf-8"))
assert parsed == msg
# Body uses compact separators (no spaces between kv).
assert b'"id":1' in body
def test_encode_message_handles_unicode_in_strings():
msg = {"jsonrpc": "2.0", "method": "log", "params": {"text": "🚀 ünıcödé"}}
out = encode_message(msg)
header_end = out.index(b"\r\n\r\n") + 4
declared = int(out[: out.index(b"\r\n")].split(b": ")[1])
assert declared == len(out[header_end:])
assert json.loads(out[header_end:].decode("utf-8")) == msg
# ---------------------------------------------------------------------------
# read_message
# ---------------------------------------------------------------------------
async def _stream_from_bytes(data: bytes) -> asyncio.StreamReader:
"""Build an asyncio.StreamReader pre-populated with ``data``."""
reader = asyncio.StreamReader()
reader.feed_data(data)
reader.feed_eof()
return reader
@pytest.mark.asyncio
async def test_read_message_round_trip():
msg = {"jsonrpc": "2.0", "method": "ping"}
reader = await _stream_from_bytes(encode_message(msg))
parsed = await read_message(reader)
assert parsed == msg
@pytest.mark.asyncio
async def test_read_message_clean_eof_returns_none():
reader = await _stream_from_bytes(b"")
assert await read_message(reader) is None
@pytest.mark.asyncio
async def test_read_message_truncated_body_raises():
msg = encode_message({"jsonrpc": "2.0", "method": "x"})
truncated = msg[: -3] # cut the body
reader = await _stream_from_bytes(truncated)
with pytest.raises(LSPProtocolError):
await read_message(reader)
@pytest.mark.asyncio
async def test_read_message_missing_content_length_raises():
bad = b"X-Other: 5\r\n\r\n12345"
reader = await _stream_from_bytes(bad)
with pytest.raises(LSPProtocolError):
await read_message(reader)
@pytest.mark.asyncio
async def test_read_message_two_messages_back_to_back():
a = encode_message({"jsonrpc": "2.0", "method": "a"})
b = encode_message({"jsonrpc": "2.0", "method": "b"})
reader = await _stream_from_bytes(a + b)
assert (await read_message(reader))["method"] == "a"
assert (await read_message(reader))["method"] == "b"
@pytest.mark.asyncio
async def test_read_message_rejects_runaway_header():
"""A pathological server that streams headers without ever emitting
the CRLF-CRLF terminator must not loop forever — the 8 KiB cap kicks
in and surfaces a protocol error."""
flood = (b"X-Junk: " + b"A" * 200 + b"\r\n") * 60 # ~12 KiB worth
reader = await _stream_from_bytes(flood)
with pytest.raises(LSPProtocolError) as exc:
await read_message(reader)
assert "8 KiB" in str(exc.value)
# ---------------------------------------------------------------------------
# envelope helpers
# ---------------------------------------------------------------------------
def test_make_request_includes_id_and_method():
msg = make_request(7, "ping", {"v": 1})
assert msg == {"jsonrpc": "2.0", "id": 7, "method": "ping", "params": {"v": 1}}
def test_make_request_omits_params_when_none():
msg = make_request(7, "ping", None)
assert "params" not in msg
def test_make_notification_omits_id():
msg = make_notification("log", {"line": "hi"})
assert "id" not in msg
assert msg["method"] == "log"
def test_make_response_carries_result():
msg = make_response(7, {"ok": True})
assert msg["id"] == 7 and msg["result"] == {"ok": True}
def test_make_error_response_shape():
msg = make_error_response(7, ERROR_CONTENT_MODIFIED, "stale", {"hint": "retry"})
assert msg["error"]["code"] == ERROR_CONTENT_MODIFIED
assert msg["error"]["message"] == "stale"
assert msg["error"]["data"] == {"hint": "retry"}
# ---------------------------------------------------------------------------
# classify_message
# ---------------------------------------------------------------------------
def test_classify_message_request():
msg = {"jsonrpc": "2.0", "id": 1, "method": "x"}
assert classify_message(msg) == ("request", 1)
def test_classify_message_response():
msg = {"jsonrpc": "2.0", "id": 1, "result": None}
assert classify_message(msg) == ("response", 1)
def test_classify_message_notification():
msg = {"jsonrpc": "2.0", "method": "log"}
assert classify_message(msg) == ("notification", "log")
def test_classify_message_invalid():
assert classify_message({"id": 1})[0] == "invalid"
assert classify_message({"jsonrpc": "1.0", "method": "x"})[0] == "invalid"
# ---------------------------------------------------------------------------
# LSPRequestError
# ---------------------------------------------------------------------------
def test_lsp_request_error_carries_code_and_data():
e = LSPRequestError(ERROR_METHOD_NOT_FOUND, "no", {"x": 1})
assert e.code == ERROR_METHOD_NOT_FOUND
assert e.message == "no"
assert e.data == {"x": 1}

View File

@@ -0,0 +1,94 @@
"""Tests for the diagnostic reporter (formatting layer)."""
from __future__ import annotations
from plugins.lsp.reporter import (
DEFAULT_SEVERITIES,
MAX_PER_FILE,
format_diagnostic,
report_for_file,
truncate,
)
def _diag(line=0, col=0, sev=1, code="E001", source="ls", msg="oops"):
return {
"range": {
"start": {"line": line, "character": col},
"end": {"line": line, "character": col + 1},
},
"severity": sev,
"code": code,
"source": source,
"message": msg,
}
def test_format_diagnostic_uses_one_indexed_position():
line = format_diagnostic(_diag(line=4, col=2))
assert "[5:3]" in line # +1 on both
def test_format_diagnostic_includes_severity_label():
assert format_diagnostic(_diag(sev=1)).startswith("ERROR")
assert format_diagnostic(_diag(sev=2)).startswith("WARN")
assert format_diagnostic(_diag(sev=3)).startswith("INFO")
assert format_diagnostic(_diag(sev=4)).startswith("HINT")
def test_format_diagnostic_includes_code_and_source():
line = format_diagnostic(_diag(code="X42", source="src"))
assert "[X42]" in line
assert "(src)" in line
def test_format_diagnostic_omits_missing_optional_fields():
line = format_diagnostic(
{
"range": {
"start": {"line": 0, "character": 0},
"end": {"line": 0, "character": 0},
},
"severity": 1,
"message": "bare",
}
)
assert "[" not in line.split("]", 1)[1] # no extra brackets after the position
assert "(" not in line
def test_report_for_file_returns_empty_when_only_warnings():
"""Default severity filter is ERROR-only."""
report = report_for_file("/x.py", [_diag(sev=2)])
assert report == ""
def test_report_for_file_emits_block_with_errors():
diag = _diag(msg="real error")
report = report_for_file("/x.py", [diag])
assert "<diagnostics file=\"/x.py\">" in report
assert "real error" in report
assert "</diagnostics>" in report
def test_report_for_file_caps_at_max_per_file():
diags = [_diag(line=i) for i in range(MAX_PER_FILE + 5)]
report = report_for_file("/x.py", diags)
assert "and 5 more" in report
def test_report_for_file_respects_custom_severities():
diag = _diag(sev=2, msg="warn")
report = report_for_file("/x.py", [diag], severities=frozenset({1, 2}))
assert "warn" in report
def test_truncate_below_limit_unchanged():
s = "abc" * 100
assert truncate(s, limit=4000) == s
def test_truncate_above_limit_appends_marker():
s = "x" * 10000
out = truncate(s, limit=200)
assert out.endswith("[truncated]")
assert len(out) <= 200

View File

@@ -0,0 +1,149 @@
"""Tests for the synchronous LSPService wrapper.
Drives the service through ``snapshot_baseline`` →
``get_diagnostics_sync`` against the mock LSP server, exercising the
delta filter that ``tools/file_operations._check_lint_delta`` relies
on.
"""
from __future__ import annotations
import os
import sys
from pathlib import Path
import pytest
from plugins.lsp.manager import LSPService
from plugins.lsp.servers import (
SERVERS,
ServerContext,
ServerDef,
SpawnSpec,
find_server_for_file,
)
MOCK_SERVER = str(Path(__file__).parent / "_mock_lsp_server.py")
def _install_mock_server(monkeypatch, script: str = "errors", server_id: str = "pyright"):
"""Replace one registered server with a wrapper that spawns the mock.
We reuse ``pyright`` so .py files route to it. This keeps the
test free of any LSP toolchain dependency.
"""
target_index = next(i for i, s in enumerate(SERVERS) if s.server_id == server_id)
original = SERVERS[target_index]
def _spawn(root: str, ctx: ServerContext) -> SpawnSpec:
env = {"MOCK_LSP_SCRIPT": script}
return SpawnSpec(
command=[sys.executable, MOCK_SERVER],
workspace_root=root,
cwd=root,
env=env,
initialization_options={},
)
replacement = ServerDef(
server_id=server_id,
extensions=original.extensions,
resolve_root=lambda fp, ws: ws, # always use workspace root
build_spawn=_spawn,
seed_first_push=False,
description="mock " + server_id,
)
# Patch the SERVERS list element directly + restore on teardown.
SERVERS[target_index] = replacement
yield
SERVERS[target_index] = original
@pytest.fixture
def mock_pyright(monkeypatch, tmp_path):
"""Install the mock as ``pyright`` and create a fake git workspace."""
repo = tmp_path / "repo"
repo.mkdir()
(repo / ".git").mkdir()
(repo / "pyproject.toml").write_text("") # so pyright's root resolver finds it
monkeypatch.chdir(str(repo))
gen = _install_mock_server(monkeypatch, "errors", "pyright")
next(gen)
yield repo
try:
next(gen)
except StopIteration:
pass
def test_service_returns_empty_when_disabled(tmp_path):
svc = LSPService(
enabled=False,
wait_mode="document",
wait_timeout=2.0,
install_strategy="auto",
)
assert not svc.is_active()
f = tmp_path / "x.py"
f.write_text("")
assert svc.get_diagnostics_sync(str(f)) == []
svc.shutdown()
def test_service_skips_files_outside_workspace(tmp_path):
"""Files outside any git worktree must not trigger LSP."""
svc = LSPService(
enabled=True,
wait_mode="document",
wait_timeout=2.0,
install_strategy="manual",
)
f = tmp_path / "x.py"
f.write_text("")
# No .git anywhere — service should report not enabled for this file.
assert not svc.enabled_for(str(f))
svc.shutdown()
def test_service_e2e_delta_filter(mock_pyright):
"""End-to-end: snapshot baseline → wait → delta returned."""
repo = mock_pyright
f = repo / "x.py"
f.write_text("print('hi')\n")
svc = LSPService(
enabled=True,
wait_mode="document",
wait_timeout=3.0,
install_strategy="manual",
)
try:
assert svc.enabled_for(str(f))
# Baseline first — server pushes 1 error.
svc.snapshot_baseline(str(f))
# Re-poll: same error is in baseline, so delta is empty.
new_diags = svc.get_diagnostics_sync(str(f))
assert new_diags == []
finally:
svc.shutdown()
def test_service_status_includes_clients(mock_pyright):
repo = mock_pyright
f = repo / "x.py"
f.write_text("")
svc = LSPService(
enabled=True,
wait_mode="document",
wait_timeout=3.0,
install_strategy="manual",
)
try:
svc.get_diagnostics_sync(str(f))
info = svc.get_status()
assert info["enabled"] is True
assert any(c["server_id"] == "pyright" for c in info["clients"])
finally:
svc.shutdown()

View File

@@ -0,0 +1,139 @@
"""Tests for workspace + project-root resolution."""
from __future__ import annotations
import os
from pathlib import Path
import pytest
from plugins.lsp.workspace import (
clear_cache,
find_git_worktree,
is_inside_workspace,
nearest_root,
normalize_path,
resolve_workspace_for_file,
)
@pytest.fixture(autouse=True)
def _clear():
clear_cache()
yield
clear_cache()
def test_find_git_worktree_returns_none_outside_repo(tmp_path: Path):
sub = tmp_path / "sub"
sub.mkdir()
assert find_git_worktree(str(sub)) is None
def test_find_git_worktree_finds_dotgit(tmp_path: Path):
repo = tmp_path / "repo"
repo.mkdir()
(repo / ".git").mkdir()
sub = repo / "src" / "deep"
sub.mkdir(parents=True)
assert find_git_worktree(str(sub)) == str(repo)
def test_find_git_worktree_handles_dotgit_file(tmp_path: Path):
"""``.git`` can also be a file (gitfile pointing into a worktree)."""
repo = tmp_path / "repo"
repo.mkdir()
(repo / ".git").write_text("gitdir: /elsewhere\n")
assert find_git_worktree(str(repo)) == str(repo)
def test_is_inside_workspace_true_for_subpath(tmp_path: Path):
root = tmp_path / "p"
root.mkdir()
sub = root / "x" / "y.py"
sub.parent.mkdir(parents=True)
sub.write_text("")
assert is_inside_workspace(str(sub), str(root))
def test_is_inside_workspace_false_for_unrelated(tmp_path: Path):
a = tmp_path / "a"
b = tmp_path / "b"
a.mkdir()
b.mkdir()
f = b / "x.py"
f.write_text("")
assert not is_inside_workspace(str(f), str(a))
def test_nearest_root_finds_first_marker(tmp_path: Path):
root = tmp_path / "p"
deep = root / "src" / "pkg"
deep.mkdir(parents=True)
(root / "pyproject.toml").write_text("")
found = nearest_root(str(deep / "mod.py"), ["pyproject.toml"])
assert found == str(root)
def test_nearest_root_excludes_take_priority(tmp_path: Path):
"""If an exclude marker matches first, return None."""
root = tmp_path / "p"
sub = root / "deno-app"
sub.mkdir(parents=True)
(sub / "deno.json").write_text("{}")
(root / "package.json").write_text("{}") # would match if not for exclude
found = nearest_root(
str(sub / "main.ts"),
["package.json"],
excludes=["deno.json"],
)
assert found is None
def test_nearest_root_returns_none_when_no_marker(tmp_path: Path):
f = tmp_path / "x.py"
f.write_text("")
assert nearest_root(str(f), ["pyproject.toml"]) is None
def test_resolve_workspace_for_file_uses_cwd_first(tmp_path: Path, monkeypatch):
repo = tmp_path / "repo"
(repo / ".git").mkdir(parents=True)
file_path = repo / "x.py"
file_path.write_text("")
# cwd is inside the repo
monkeypatch.chdir(str(repo))
root, gated = resolve_workspace_for_file(str(file_path))
assert root == str(repo)
assert gated is True
def test_resolve_workspace_for_file_no_repo_returns_none(tmp_path: Path, monkeypatch):
monkeypatch.chdir(str(tmp_path))
f = tmp_path / "x.py"
f.write_text("")
root, gated = resolve_workspace_for_file(str(f))
assert root is None
assert gated is False
def test_resolve_workspace_falls_back_to_file_location(tmp_path: Path, monkeypatch):
"""When cwd isn't a git repo but the file is inside one, we still
discover the workspace from the file's path."""
not_a_repo = tmp_path / "loose"
not_a_repo.mkdir()
monkeypatch.chdir(str(not_a_repo))
repo = tmp_path / "actual-repo"
(repo / ".git").mkdir(parents=True)
f = repo / "x.py"
f.write_text("")
root, gated = resolve_workspace_for_file(str(f))
assert root == str(repo)
assert gated is True
def test_normalize_path_expands_tilde(monkeypatch):
monkeypatch.setenv("HOME", "/home/user")
p = normalize_path("~/x.py")
assert p == os.path.abspath("/home/user/x.py")

View File

@@ -0,0 +1,180 @@
---
sidebar_position: 17
title: "LSP — Semantic Diagnostics"
description: "Real language servers (pyright, gopls, rust-analyzer, …) surfacing type errors on write_file and patch."
---
# LSP Plugin — Semantic Diagnostics
The LSP plugin runs real language servers (pyright, gopls, rust-analyzer, typescript-language-server, and ~20 more) in the background and surfaces their diagnostics when the agent writes files. The agent sees type errors, undefined names, and missing imports **introduced by its edit** — not just syntax errors.
## Enable
Add `lsp` to your enabled plugins:
```yaml
# ~/.hermes/config.yaml
plugins:
enabled:
- lsp
```
Or use the CLI:
```bash
hermes plugins enable lsp
```
That's it. On the next session, the plugin activates for any file edit inside a git repository.
## Install Language Servers
The plugin **detects** servers already on your PATH — it doesn't auto-install anything. Use `hermes lsp status` to see what's available:
```bash
hermes lsp status
```
```
LSP Service
===========
enabled: True
Registered Servers
==================
✓ pyright [installed ] .py, .pyi
✓ typescript [installed ] .ts, .tsx, .js, .jsx
· gopls [missing ] .go
? rust-analyzer [manual-only] .rs
```
To install a server into the Hermes-managed staging directory (`$HERMES_HOME/lsp/bin/`):
```bash
hermes lsp install pyright # npm-based
hermes lsp install gopls # go install
hermes lsp install bash-language-server
hermes lsp install-all # try all recipes
```
Servers that are too heavy to auto-install (rust-analyzer, clangd, lua-language-server) are marked `manual-only` — install them through your normal toolchain (`rustup component add rust-analyzer`, etc.).
### Other ways to make servers available
- **System PATH**: If `pyright-langserver` is already on your PATH (e.g., from `npm install -g pyright`), the plugin finds it automatically.
- **Custom path**: Pin a specific binary in config:
```yaml
lsp:
servers:
gopls:
command: ["/usr/local/go/bin/gopls", "serve"]
```
## How It Works
On every `write_file` or `patch` call inside a git workspace:
1. **Before the write**: plugin snapshots current diagnostics for the file (baseline)
2. **After the write**: plugin queries the language server for fresh diagnostics
3. **Delta**: only errors *introduced by this edit* are surfaced (pre-existing errors filtered out)
4. **Injection**: diagnostics appear as an `lsp_diagnostics` field in the tool result JSON
The agent sees output like:
```json
{
"bytes_written": 42,
"dirs_created": false,
"lsp_diagnostics": "<diagnostics file=\"/path/to/foo.py\">\nERROR [2:12] Type \"str\" is not assignable to return type \"int\" [reportReturnType] (Pyright)\n</diagnostics>"
}
```
### When LSP stays dormant
- **No git workspace**: files outside a git repo don't trigger LSP
- **No matching server**: if you edit a `.rs` file and rust-analyzer isn't installed, LSP silently skips
- **Remote backends**: Docker, SSH, Modal — the host-side LSP can't see container files, so it skips
- **Plugin disabled**: if `lsp` isn't in `plugins.enabled`, nothing happens
- **Cold start**: first write after server spawn may timeout (3s) — diagnostics appear on subsequent writes
## Configuration
```yaml
# ~/.hermes/config.yaml
lsp:
enabled: true # master toggle (default: true when plugin is enabled)
wait_mode: document # "document" or "full" (workspace-wide)
wait_timeout: 5.0 # max seconds to wait for diagnostics
install_strategy: manual # "manual" = detect only; "auto" = install on first use
servers: # per-server overrides
pyright:
disabled: false # set true to skip even when installed
command: ["pyright-langserver", "--stdio"] # pin binary
env: # extra env vars for the process
PYTHONPATH: "/my/stubs"
initialization_options: # LSP initializationOptions
python:
analysis:
typeCheckingMode: "strict"
```
## CLI Commands
| Command | Description |
|---------|-------------|
| `hermes lsp status` | Service state + per-server install status |
| `hermes lsp list` | All registered servers (26 languages) |
| `hermes lsp install <id>` | Install a server binary |
| `hermes lsp install-all` | Try every auto-install recipe |
| `hermes lsp restart` | Tear down running servers (next edit re-spawns) |
| `hermes lsp which <id>` | Print resolved binary path |
## Supported Languages
| Language | Server | Install |
|----------|--------|---------|
| Python | pyright | `hermes lsp install pyright` |
| TypeScript/JavaScript | typescript-language-server | `hermes lsp install typescript-language-server` |
| Go | gopls | `hermes lsp install gopls` |
| Rust | rust-analyzer | manual (rustup) |
| C/C++ | clangd | manual (LLVM) |
| Vue | @vue/language-server | `hermes lsp install @vue/language-server` |
| Svelte | svelte-language-server | `hermes lsp install svelte-language-server` |
| Bash/Zsh | bash-language-server | `hermes lsp install bash-language-server` |
| YAML | yaml-language-server | `hermes lsp install yaml-language-server` |
| PHP | intelephense | `hermes lsp install intelephense` |
| Lua | lua-language-server | manual |
| Dockerfile | dockerfile-language-server | `hermes lsp install dockerfile-language-server-nodejs` |
| Terraform | terraform-ls | manual |
| Dart | dart language-server | manual |
| Haskell | haskell-language-server | manual |
| Julia | LanguageServer.jl | manual |
| Clojure | clojure-lsp | manual |
| Nix | nixd | manual |
| Zig | zls | manual |
| Gleam | gleam lsp | manual |
| Elixir | elixir-ls | manual |
| OCaml | ocaml-lsp | manual |
| Kotlin | kotlin-language-server | manual |
| Java | jdtls | manual |
| Prisma | prisma language-server | manual |
| Astro | @astrojs/language-server | `hermes lsp install @astrojs/language-server` |
## Troubleshooting
**"No diagnostics appearing"**
1. Check `hermes lsp status` — is the server installed?
2. Is the file inside a git repository? (`git rev-parse --git-dir` should succeed)
3. Check logs: `hermes logs --level WARNING | grep lsp`
**"Server unavailable" warning in logs**
The binary isn't on PATH or in `$HERMES_HOME/lsp/bin/`. Run `hermes lsp install <id>`.
**"First write has no diagnostics, second does"**
Normal. The language server needs time to index the project on cold start. The 3-second timeout keeps writes fast — diagnostics appear once the server is warm.
**Performance**
- Warm server: diagnostics in 200500ms (pyright), 12s (typescript-language-server)
- Cold start: 530s indexing (project-size dependent) — writes succeed immediately, diagnostics arrive on subsequent edits
- Servers stay alive for the session duration (one process per language per project root)

View File

@@ -58,6 +58,7 @@ const sidebars: SidebarsConfig = {
'user-guide/features/skins',
'user-guide/features/plugins',
'user-guide/features/built-in-plugins',
'user-guide/features/lsp',
],
},
{