Compare commits

..

1 Commits

Author SHA1 Message Date
Austin Pickett
cc60cbfeb5 feat(plugins): install from a subdirectory within a repo
Support installing a plugin that lives in a subdirectory of a larger
repo (docs/tests at root, plugin in a subdir) without forcing a
dedicated single-plugin repo.

Identifier syntax:
  owner/repo/path/to/plugin        (shorthand + subpath)
  <url>.git/path/to/plugin         (.git boundary on GitHub-style URLs)
  <url>#path/to/plugin             (explicit fragment, any scheme)

_resolve_git_url now returns (git_url, subdir); _install_plugin_core
reads the manifest from and moves only the subdir, so root-level docs
and tests no longer leak into ~/.hermes/plugins. _resolve_subdir_within
guards against path traversal, missing dirs, and non-directories.

Both the CLI (hermes plugins install) and the dashboard install endpoint
inherit this for free since they share _install_plugin_core. Dashboard
install hint + placeholder updated to advertise the subdir syntax.
2026-06-08 22:01:11 -04:00
25 changed files with 642 additions and 1703 deletions

70
cli.py
View File

@@ -6218,20 +6218,27 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
choices visible and lets the normal Enter key binding submit the typed
or highlighted choice.
**Platform note (Windows — issue #33961):**
Earlier code bypassed the modal on ``sys.platform == "win32"`` and fell
back to a raw ``input()`` prompt. When the confirm was triggered from the
``process_loop`` daemon thread (the normal case) that ``input()`` ran off
the main thread and deadlocked against prompt_toolkit's stdin ownership —
the user saw a frozen cursor and Ctrl-C was swallowed (bare ``/reset``
froze; ``/reset now`` worked only because it skips the prompt entirely).
**Platform note (Windows dead-lock — issue #30768):**
The queue-based modal relies on prompt_toolkit key bindings receiving
keyboard events and calling ``_submit_slash_confirm_response``. On
Windows (PowerShell / Windows Terminal) the prompt_toolkit input
channel can become unresponsive when the modal is entered from the
``process_loop`` daemon thread, causing a dead-lock: the user sees the
confirmation panel but keystrokes never reach the key bindings and the
``response_queue.get()`` blocks until the 120-second timeout expires.
Native Windows now uses the same path as Linux/macOS: the modal is set up
on ``self._app.loop`` via ``call_soon_threadsafe`` and answered by the
normal prompt_toolkit key bindings (the same input channel that already
handles ordinary typing on Windows). The raw ``input()`` fallback is kept
only for the genuinely safe cases: no running app (unit tests /
non-interactive), no resolvable event loop, or a scheduling failure.
To avoid this, we fall back to ``_prompt_text_input`` (a simple
``input()``-based prompt) when any of these conditions hold:
* ``sys.platform == "win32"`` — native Windows console (ConPTY /
win32_input) does not support the modal reliably.
* ``self._app`` is not set — unit tests / non-interactive contexts.
On non-Windows platforms the modal itself is still safe from the
``process_loop`` daemon thread as long as the main-thread event loop
owns the prompt_toolkit buffer mutations. When we are off the main
thread, schedule the modal snapshot / restore work on ``self._app.loop``
via ``call_soon_threadsafe`` and keep the queue-based response path.
"""
import threading
import time as _time
@@ -6244,25 +6251,22 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
if not getattr(self, "_app", None):
return self._prompt_text_input("Choice [1/2/3]: ")
# On Windows the prompt_toolkit input channel can deadlock when the
# modal is entered from the process_loop daemon thread — keystrokes
# never reach the key bindings, so response_queue.get() blocks for
# the full timeout (issue #30768). Fall back to the simpler
# stdin-based prompt which works reliably on Windows.
if sys.platform == "win32":
return self._prompt_text_input("Choice [1/2/3]: ")
try:
app_loop = self._app.loop
except Exception:
app_loop = None
in_main_thread = threading.current_thread() is threading.main_thread()
def _stdin_fallback() -> str | None:
# On native Windows a raw input() from a non-main thread deadlocks
# against prompt_toolkit's stdin ownership (#33961). With an app
# running we cannot safely prompt off the main thread, so cancel
# cleanly (None) rather than hang the terminal.
if sys.platform == "win32" and not in_main_thread:
self._invalidate()
return None
return self._prompt_text_input("Choice [1/2/3]: ")
if not in_main_thread and app_loop is None:
return _stdin_fallback()
return self._prompt_text_input("Choice [1/2/3]: ")
response_queue = queue.Queue()
@@ -6303,7 +6307,7 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
return ready.wait(timeout=5)
if not _run_on_app_loop(_setup_modal):
return _stdin_fallback()
return self._prompt_text_input("Choice [1/2/3]: ")
_last_countdown_refresh = _time.monotonic()
try:
@@ -7272,7 +7276,7 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
elif canonical == "copy":
self._handle_copy_command(cmd_original)
elif canonical == "debug":
self._handle_debug_command(cmd_original)
self._handle_debug_command()
elif canonical == "update":
if self._handle_update_command():
return False
@@ -8219,10 +8223,9 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
print(" ⚠️ MCP reload timed out (30s). Some servers may not have reconnected.")
# Inline-skip tokens that bypass the destructive-slash confirmation modal.
# A general escape hatch for non-interactive use (scripting/automation) and
# for the degraded path where the modal can't be marshaled onto the app loop
# — lets users self-serve without flipping approvals.destructive_slash_confirm
# in config. (Native Windows now drives the modal normally — see #33961.)
# Matches the escape-hatch pattern users on broken modal platforms
# (currently native Windows PowerShell — issue #30768) need to self-serve
# without having to flip approvals.destructive_slash_confirm in config.
_DESTRUCTIVE_SKIP_TOKENS = frozenset({"now", "--yes", "-y"})
@classmethod
@@ -8280,9 +8283,8 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
Inline-skip: if ``cmd_original`` contains ``now``, ``--yes``, or
``-y`` as an argument (e.g. ``/reset now``, ``/new --yes My title``),
the modal is bypassed and ``"once"`` is returned immediately. This is
an escape hatch for non-interactive use and for the degraded path where
the modal can't be marshaled onto the app loop (native Windows itself now
drives the modal normally — see #33961). Callers are responsible
an escape hatch for platforms where the prompt_toolkit modal hangs
(issue #30768 — native Windows PowerShell). Callers are responsible
for stripping the skip tokens from any remaining argument parsing
(see :meth:`_split_destructive_skip`).

View File

@@ -1795,11 +1795,9 @@ class BasePlatformAdapter(ABC):
# Whether this platform renders triple-backtick fenced code blocks (i.e.
# ``format_message`` translates/preserves markdown fences into a real code
# block). Capability flag for markdown-aware presentation choices.
# block). Drives presentation choices like rendering a ``terminal`` tool
# call's command as a ```bash block instead of a flat preview line.
# Default False (plain-text platforms); markdown-rendering adapters set True.
# Note: tool-progress deliberately does NOT use this to render a terminal
# command as a ```bash block — that exposed full commands in chat. Progress
# shows a short truncated preview only (see gateway/run.py progress_callback).
supports_code_blocks: bool = False
def __init__(self, config: PlatformConfig, platform: Platform):

View File

@@ -181,8 +181,6 @@ def _strip_mdv2(text: str) -> str:
"""
# Remove escape backslashes before special characters
cleaned = re.sub(r'\\([_*\[\]()~`>#\+\-=|{}.!\\])', r'\1', text)
# Remove standard markdown bold (**text** → text) BEFORE MarkdownV2 bold
cleaned = re.sub(r'\*\*([^*]+)\*\*', r'\1', cleaned)
# Remove MarkdownV2 bold markers that format_message converted from **bold**
cleaned = re.sub(r'\*([^*]+)\*', r'\1', cleaned)
# Remove MarkdownV2 italic markers that format_message converted from *italic*
@@ -2210,17 +2208,11 @@ class TelegramAdapter(BasePlatformAdapter):
# "Message is not modified" is a no-op, not an error
if "not modified" in str(fmt_err).lower():
return SendResult(success=True, message_id=message_id)
# Fallback: strip MarkdownV2 escapes and retry as clean plain text
logger.warning(
"[%s] MarkdownV2 edit failed, falling back to plain text: %s",
self.name,
fmt_err,
)
_plain = _strip_mdv2(content) if content else content
# Fallback: retry without markdown formatting
await self._bot.edit_message_text(
chat_id=int(chat_id),
message_id=int(message_id),
text=_plain,
text=content,
)
return SendResult(success=True, message_id=message_id)
except Exception as e:

View File

@@ -12971,10 +12971,32 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
# Build progress message with primary argument preview
from agent.display import get_tool_emoji
emoji = get_tool_emoji(tool_name, default="⚙️")
# Markdown-capable platforms render a terminal command as a native
# ```bash fenced block (full command, no quotes, no label, no
# truncation) instead of the noisy `terminal: "cmd…"` line. Gated
# on the adapter's ``supports_code_blocks`` capability so every
# markdown-rendering platform (and plugin adapters that opt in) gets
# it, while plain-text platforms keep the compact line.
_bash_block = None
try:
_progress_adapter = self.adapters.get(source.platform)
except Exception:
_progress_adapter = None
if (
getattr(_progress_adapter, "supports_code_blocks", False)
and tool_name == "terminal"
and isinstance(args, dict)
and isinstance(args.get("command"), str)
and args["command"].strip()
):
_bash_block = f"```bash\n{args['command'].rstrip()}\n```"
# Verbose mode: show detailed arguments, respects tool_preview_length
if progress_mode == "verbose":
if args:
if _bash_block is not None:
msg = _bash_block
elif args:
from agent.display import get_tool_preview_max_len
_pl = get_tool_preview_max_len()
args_str = json.dumps(args, ensure_ascii=False, default=str)
@@ -12994,7 +13016,9 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
# "all" / "new" modes: short preview, respects tool_preview_length
# config (defaults to 40 chars when unset to keep gateway messages
# compact — unlike CLI spinners, these persist as permanent messages).
if preview:
if _bash_block is not None:
msg = _bash_block
elif preview:
from agent.display import get_tool_preview_max_len
_pl = get_tool_preview_max_len()
_cap = _pl if _pl > 0 else 40
@@ -13106,8 +13130,6 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
"message_id": message_id,
"content": content,
}
if getattr(adapter, "REQUIRES_EDIT_FINALIZE", False):
kwargs["finalize"] = True
if _edit_accepts_metadata:
kwargs["metadata"] = _progress_metadata
return await adapter.edit_message(**kwargs)

View File

@@ -2090,25 +2090,12 @@ class CLICommandsMixin:
else:
_cprint(f" {_ACCENT}{feature_name} set to {label} (session only){_RST}")
def _handle_debug_command(self, cmd_original: str = ""):
"""Handle /debug — upload debug report + logs and print share URLs.
Accepts optional destination words after the command:
- ``/debug`` → upload to the public paste service (default)
- ``/debug nous`` → upload to Nous-internal storage (private, staff-only)
- ``/debug local`` → render the report to stdout, no upload
``nous`` and ``local`` are mutually exclusive; if both are given,
``local`` wins (it never touches the network).
"""
def _handle_debug_command(self):
"""Handle /debug — upload debug report + logs and print paste URLs."""
from hermes_cli.debug import run_debug_share
from types import SimpleNamespace
words = {w.lower() for w in cmd_original.split()[1:]}
local = "local" in words
nous = "nous" in words and not local
args = SimpleNamespace(lines=200, expire=7, local=local, nous=nous)
args = SimpleNamespace(lines=200, expire=7, local=False)
run_debug_share(args)
def _handle_update_command(self) -> bool:

View File

@@ -217,8 +217,7 @@ COMMAND_REGISTRY: list[CommandDef] = [
cli_only=True, args_hint="<path>"),
CommandDef("update", "Update Hermes Agent to the latest version", "Info"),
CommandDef("version", "Show Hermes Agent version", "Info", aliases=("v",)),
CommandDef("debug", "Upload debug report (system info + logs) and get shareable links", "Info",
args_hint="[nous|local]"),
CommandDef("debug", "Upload debug report (system info + logs) and get shareable links", "Info"),
# Exit
CommandDef("quit", "Exit the CLI (use --delete to also remove session history)", "Exit",

View File

@@ -9,16 +9,8 @@ Currently supports:
``~/.hermes/logs/*.log`` are not leaked into
the public paste service. Pass ``--no-redact``
to disable.
Pass ``--nous`` to upload instead to Nous-internal
storage (AWS S3) via a signed URL minted by the
Nous account service: the bundle is private
(viewable only by Nous staff / allowlisted mods via
a Google-login-gated viewer) and auto-deletes after
14 days, rather than going to a public paste.
"""
import datetime
import gzip
import io
import json
import logging
@@ -589,97 +581,6 @@ def collect_debug_report(
return buf.getvalue()
# ---------------------------------------------------------------------------
# Shared bundle collection (used by both the paste.rs and Nous-S3 paths)
# ---------------------------------------------------------------------------
# Bundle format identifier embedded in the Nous-S3 JSON envelope. The
# discord-support viewer keys off this string to parse the bundle.
_NOUS_BUNDLE_FORMAT = "hermes-debug-share/1"
def collect_share_bundle(
log_lines: int = 200,
redact: bool = True,
) -> dict[str, str]:
"""Collect the debug report + full logs as a label→text mapping.
Returns ``{"report": ..., "agent.log": ..., "gateway.log": ...,
"desktop.log": ...}`` where each value is the already-redacted (when
``redact`` is True) text that would be uploaded. Keys for logs that are
absent/empty are simply omitted.
This is the single source of collection + redaction shared by both
destinations: the paste.rs path (:func:`build_debug_share`) and the
Nous-S3 path (``--nous``). Centralising it guarantees the Nous bundle is
built from the *same* force-redacted snapshots as the public paste path —
redaction is the safety boundary, so the Nous path must never see raw
logs.
The dump header is prepended to each full log (mirroring the historical
paste behaviour) so every file is self-contained, and the redaction
banner is prepended when ``redact`` is True.
"""
dump_text = _capture_dump()
log_snapshots = _capture_default_log_snapshots(log_lines, redact=redact)
report = collect_debug_report(
log_lines=log_lines,
dump_text=dump_text,
log_snapshots=log_snapshots,
)
agent_log = log_snapshots["agent"].full_text
gateway_log = log_snapshots["gateway"].full_text
desktop_log = log_snapshots["desktop"].full_text
# Prepend dump header to each full log so every file is self-contained.
if agent_log:
agent_log = dump_text + "\n\n--- full agent.log ---\n" + agent_log
if gateway_log:
gateway_log = dump_text + "\n\n--- full gateway.log ---\n" + gateway_log
if desktop_log:
desktop_log = dump_text + "\n\n--- full desktop.log ---\n" + desktop_log
# Visible banner so reviewers know redaction was applied at upload time.
if redact:
report = _REDACTION_BANNER + report
if agent_log:
agent_log = _REDACTION_BANNER + agent_log
if gateway_log:
gateway_log = _REDACTION_BANNER + gateway_log
if desktop_log:
desktop_log = _REDACTION_BANNER + desktop_log
bundle: dict[str, str] = {"report": report}
if agent_log:
bundle["agent.log"] = agent_log
if gateway_log:
bundle["gateway.log"] = gateway_log
if desktop_log:
bundle["desktop.log"] = desktop_log
return bundle
def build_nous_bundle(bundle: dict[str, str], redact: bool = True) -> bytes:
"""Gzip-compress a :func:`collect_share_bundle` mapping into the Nous envelope.
The JSON shape is what the discord-support viewer (Repo 3) parses::
{"format": "hermes-debug-share/1",
"redacted": <bool>,
"created": <iso8601>,
"files": {"report": ..., "agent.log": ..., ...}}
"""
created = datetime.datetime.now(datetime.timezone.utc).isoformat()
envelope = {
"format": _NOUS_BUNDLE_FORMAT,
"redacted": bool(redact),
"created": created,
"files": bundle,
}
return gzip.compress(json.dumps(envelope).encode("utf-8"))
# ---------------------------------------------------------------------------
# CLI entry points
# ---------------------------------------------------------------------------
@@ -719,18 +620,45 @@ def build_debug_share(
"""
_best_effort_sweep_expired_pastes()
# Collect the report + full logs (force-redacted when redact=True) via the
# shared collector so the paste.rs and Nous-S3 paths build identical,
# identically-redacted bundles. The dump header + redaction banner are
# applied inside collect_share_bundle.
bundle = collect_share_bundle(log_lines=log_lines, redact=redact)
# Capture dump once — prepended to every paste for context.
# The dump is already redacted at extract time via dump.py:_redact;
# log_snapshots are redacted by _capture_default_log_snapshots when
# redact=True so credentials never reach the public paste service.
dump_text = _capture_dump()
log_snapshots = _capture_default_log_snapshots(log_lines, redact=redact)
if redact:
logger.info(
"hermes debug share: applied force-mode redaction to log snapshots before upload"
)
report = bundle["report"]
report = collect_debug_report(
log_lines=log_lines,
dump_text=dump_text,
log_snapshots=log_snapshots,
)
agent_log = log_snapshots["agent"].full_text
gateway_log = log_snapshots["gateway"].full_text
desktop_log = log_snapshots["desktop"].full_text
# Prepend dump header to each full log so every paste is self-contained.
if agent_log:
agent_log = dump_text + "\n\n--- full agent.log ---\n" + agent_log
if gateway_log:
gateway_log = dump_text + "\n\n--- full gateway.log ---\n" + gateway_log
if desktop_log:
desktop_log = dump_text + "\n\n--- full desktop.log ---\n" + desktop_log
# Visible banner so reviewers reading the public paste know redaction
# was applied at upload time. Banner is omitted under --no-redact.
if redact:
report = _REDACTION_BANNER + report
if agent_log:
agent_log = _REDACTION_BANNER + agent_log
if gateway_log:
gateway_log = _REDACTION_BANNER + gateway_log
if desktop_log:
desktop_log = _REDACTION_BANNER + desktop_log
urls: dict[str, str] = {}
failures: list[str] = []
@@ -739,8 +667,11 @@ def build_debug_share(
urls["Report"] = upload_to_pastebin(report, expiry_days=expiry)
# 2-4. Full logs (optional — failures are collected, not raised)
for label in ("agent.log", "gateway.log", "desktop.log"):
content = bundle.get(label)
for label, content in (
("agent.log", agent_log),
("gateway.log", gateway_log),
("desktop.log", desktop_log),
):
if not content:
continue
try:
@@ -765,23 +696,43 @@ def run_debug_share(args):
log_lines = getattr(args, "lines", 200)
expiry = getattr(args, "expire", 7)
local_only = getattr(args, "local", False)
nous = getattr(args, "nous", False)
redact = not getattr(args, "no_redact", False)
if local_only:
# Local-only path never uploads — render the report to stdout and bail
# before any network I/O. Reuses the shared collector so the rendered
# output matches exactly what would be uploaded.
# before any network I/O. Mirrors the upload path's collection logic.
_best_effort_sweep_expired_pastes()
print("Collecting debug report...")
bundle = collect_share_bundle(log_lines=log_lines, redact=redact)
print(bundle["report"])
for title, label in (
("FULL agent.log", "agent.log"),
("FULL gateway.log", "gateway.log"),
("FULL desktop.log", "desktop.log"),
dump_text = _capture_dump()
log_snapshots = _capture_default_log_snapshots(log_lines, redact=redact)
report = collect_debug_report(
log_lines=log_lines,
dump_text=dump_text,
log_snapshots=log_snapshots,
)
agent_log = log_snapshots["agent"].full_text
gateway_log = log_snapshots["gateway"].full_text
desktop_log = log_snapshots["desktop"].full_text
if agent_log:
agent_log = dump_text + "\n\n--- full agent.log ---\n" + agent_log
if gateway_log:
gateway_log = dump_text + "\n\n--- full gateway.log ---\n" + gateway_log
if desktop_log:
desktop_log = dump_text + "\n\n--- full desktop.log ---\n" + desktop_log
if redact:
report = _REDACTION_BANNER + report
if agent_log:
agent_log = _REDACTION_BANNER + agent_log
if gateway_log:
gateway_log = _REDACTION_BANNER + gateway_log
if desktop_log:
desktop_log = _REDACTION_BANNER + desktop_log
print(report)
for title, body in (
("FULL agent.log", agent_log),
("FULL gateway.log", gateway_log),
("FULL desktop.log", desktop_log),
):
body = bundle.get(label)
if body:
print(f"\n\n{'=' * 60}")
print(title)
@@ -789,10 +740,6 @@ def run_debug_share(args):
print(body)
return
if nous:
_run_debug_share_nous(log_lines=log_lines, redact=redact)
return
print(_PRIVACY_NOTICE)
print("Collecting debug report...")
print("Uploading...")
@@ -826,80 +773,6 @@ def run_debug_share(args):
print(f"\nShare these links with the Hermes team for support.")
_NOUS_PRIVACY_NOTICE = """\
⚠️ --nous: This uploads your debug bundle to Nous-INTERNAL storage (AWS S3),
NOT a public paste service. The following is included:
• System info (OS, Python/Hermes version, provider, which API keys are
configured — NOT the actual keys)
• Full agent.log, gateway.log, and desktop.log (up to 512 KB each — likely
contains conversation content, tool outputs, and file paths)
• The bundle is viewable only by Nous staff (and allowlisted Discord mods)
via a Google-login-gated viewer.
• It is NOT a public paste — there is no public URL to the contents.
• It auto-deletes after 14 days.
"""
def _run_debug_share_nous(*, log_lines: int, redact: bool) -> None:
"""Handle ``hermes debug share --nous``: upload the bundle to Nous-S3.
Collects the same force-redacted bundle as the paste path, gzips it into
the Nous envelope, requests a signed URL from NAS, uploads, and prints the
private viewer link. On any failure falls back to a clear error that
suggests ``--local``.
"""
from hermes_cli.diagnostics_upload import share_to_nous
print(_NOUS_PRIVACY_NOTICE)
if not redact:
print(
"⚠️ --no-redact is set: secrets in your logs will NOT be redacted "
"before upload.\n"
)
print("Collecting debug report...")
_best_effort_sweep_expired_pastes()
bundle = collect_share_bundle(log_lines=log_lines, redact=redact)
if redact:
logger.info(
"hermes debug share --nous: applied force-mode redaction before upload"
)
blob = build_nous_bundle(bundle, redact=redact)
print("Uploading to Nous diagnostics storage...")
try:
res = share_to_nous(blob)
except Exception as exc:
print(
f"\nNous upload failed: {exc}\n"
"\nThe Nous diagnostics service may be unavailable or not yet "
"provisioned.\n"
"Run `hermes debug share --local` to print the report instead, "
"or `hermes debug share` to upload to a public paste service.\n",
file=sys.stderr,
)
sys.exit(1)
view_url = res.get("viewUrl") or res.get("view_url")
print("\nDebug bundle uploaded to Nous (private):")
if view_url:
print(f" View URL {view_url}")
else:
print(f" (no view URL returned; upload id: {res.get('id', '?')})")
expires_at = res.get("expiresAt") or res.get("expires_at")
if expires_at:
print(f"\n⏱ Auto-deletes at {expires_at} (14-day retention).")
else:
print("\n⏱ Auto-deletes after 14 days.")
print(
"\nShare this private link with the Nous team — only Nous staff "
"(via Google login) can open it."
)
def run_debug_delete(args):
"""Delete one or more paste URLs uploaded by /debug."""
urls = getattr(args, "urls", [])
@@ -950,8 +823,6 @@ def run_debug(args):
print(" --lines N Number of log lines to include (default: 200)")
print(" --expire N Paste expiry in days (default: 7)")
print(" --local Print report locally instead of uploading")
print(" --nous Upload to Nous-internal storage (private, staff-only,")
print(" auto-deletes in 14 days) instead of a public paste")
print(" --no-redact Disable upload-time secret redaction (default: redact)")
print()
print("Options (delete):")

View File

@@ -1,138 +0,0 @@
"""Client for uploading ``hermes debug share`` bundles to Nous-internal S3.
This is the opt-in (``--nous``) destination for ``hermes debug share``.
Unlike the public paste.rs path, bundles uploaded here go to a Nous-owned
S3 bucket via a short-lived signed URL minted by the Nous account service
(NAS). The bucket auto-expires objects after 14 days, and the contents are
only viewable by Nous staff (and allowlisted Discord mods) through a
Google-OAuth-gated viewer.
Flow:
1. POST {NAS_BASE}/api/diagnostics/upload-url → {uploadUrl, viewUrl, id, ...}
(the request body carries ``sizeBytes``; NAS signs it into the presigned
URL's ``ContentLength``, so the PUT must send exactly that many bytes)
2. PUT <uploadUrl> (the gzipped bundle, Content-Type application/gzip)
NAS is stateless — the object's existence in S3 is the only state, so there is
no confirm/callback step.
Uses stdlib ``urllib`` only, matching ``debug.py`` style — no third-party deps.
"""
import json
import os
import urllib.request
# Base URL of the Nous account service that mints the signed upload URL.
# Overridable via env so the feature can be pointed at staging / a local dev
# NAS instance during testing.
NAS_BASE = os.environ.get(
"HERMES_DIAGNOSTICS_BASE_URL", "https://portal.nousresearch.com"
)
# Network timeout for each request (seconds). The upload itself can be larger
# (a gzipped log bundle), so the PUT gets a more generous window.
_REQUEST_TIMEOUT = 30
_UPLOAD_TIMEOUT = 120
_USER_AGENT = "hermes-agent/debug-share"
def request_upload_url(
content_type: str = "application/gzip",
size_bytes: int | None = None,
) -> dict:
"""Ask NAS to mint a presigned PUT URL for a diagnostics bundle.
POSTs a small JSON body to ``{NAS_BASE}/api/diagnostics/upload-url`` and
returns the parsed JSON response, expected to contain at least
``uploadUrl``, ``viewUrl`` and ``id`` (plus optional ``expiresAt`` /
``uploadExpiresInSeconds``).
Raises on non-2xx responses or unparseable JSON.
"""
payload: dict = {"contentType": content_type}
if size_bytes is not None:
payload["sizeBytes"] = int(size_bytes)
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
f"{NAS_BASE}/api/diagnostics/upload-url",
data=data,
method="POST",
headers={
"Content-Type": "application/json",
"Accept": "application/json",
"User-Agent": _USER_AGENT,
},
)
with urllib.request.urlopen(req, timeout=_REQUEST_TIMEOUT) as resp:
status = getattr(resp, "status", None)
if status is None:
status = resp.getcode()
if not (200 <= status < 300):
raise RuntimeError(
f"diagnostics upload-url request failed: HTTP {status}"
)
body = resp.read().decode("utf-8")
try:
result = json.loads(body)
except (ValueError, json.JSONDecodeError) as exc:
raise RuntimeError(
f"diagnostics upload-url returned non-JSON response: {body[:200]}"
) from exc
if not isinstance(result, dict) or not result.get("uploadUrl"):
raise RuntimeError(
"diagnostics upload-url response missing 'uploadUrl': "
f"{body[:200]}"
)
return result
def put_bundle(
upload_url: str,
data: bytes,
content_type: str = "application/gzip",
) -> None:
"""PUT the gzipped *data* bundle to a presigned *upload_url*.
Sets the ``Content-Type`` header (must match what NAS pinned when signing
the URL, otherwise S3 rejects the signature). Raises on non-2xx.
"""
req = urllib.request.Request(
upload_url,
data=data,
method="PUT",
headers={
"Content-Type": content_type,
"User-Agent": _USER_AGENT,
},
)
with urllib.request.urlopen(req, timeout=_UPLOAD_TIMEOUT) as resp:
status = getattr(resp, "status", None)
if status is None:
status = resp.getcode()
if not (200 <= status < 300):
raise RuntimeError(f"diagnostics bundle PUT failed: HTTP {status}")
def share_to_nous(report_bundle: bytes) -> dict:
"""Orchestrate the full Nous-S3 upload of a gzipped *report_bundle*.
Two steps: mint a presigned PUT URL (sending the exact ``sizeBytes`` NAS
signs into the URL's ``ContentLength``), then PUT the bundle. NAS is
stateless — the object's existence in S3 is the only state, so there is no
confirm/callback step. Returns the dict from :func:`request_upload_url`
(which carries ``viewUrl`` / ``id`` / expiry metadata) so the caller can
print the viewer link. Raises on any failure of either step.
"""
size_bytes = len(report_bundle)
info = request_upload_url(
content_type="application/gzip", size_bytes=size_bytes
)
put_bundle(info["uploadUrl"], report_bundle, content_type="application/gzip")
return info

View File

@@ -738,14 +738,11 @@ def run_doctor(args):
issues,
)
# Warn if model is set to a provider-prefixed name on a provider that doesn't use them.
# Vendor/model slugs are valid on aggregator-style providers and on any custom
# provider — bare "custom" or a named "custom:<name>" that fronts an OpenAI-compatible
# aggregator (e.g. custom:hpc-ai serving deepseek/deepseek-v4-flash) requires the prefix.
# Warn if model is set to a provider-prefixed name on a provider that doesn't use them
provider_for_policy = runtime_provider or catalog_provider
provider_policy_id = str(provider_for_policy or "").strip().lower()
providers_accepting_vendor_slugs = {
"openrouter",
"custom",
"auto",
"kilocode",
"opencode-zen",
@@ -753,16 +750,11 @@ def run_doctor(args):
"lmstudio",
"nous",
}
provider_accepts_vendor_slug = (
provider_policy_id in providers_accepting_vendor_slugs
or provider_policy_id == "custom"
or provider_policy_id.startswith("custom:")
)
if (
default_model
and "/" in default_model
and provider_policy_id
and not provider_accepts_vendor_slug
and provider_for_policy
and provider_for_policy not in providers_accepting_vendor_slugs
):
check_warn(
f"model.default '{default_model}' uses a vendor/model slug but provider is '{provider_raw}'",

View File

@@ -135,34 +135,89 @@ def _sanitize_plugin_name(
return target
def _resolve_git_url(identifier: str) -> str:
"""Turn an identifier into a cloneable Git URL.
def _resolve_git_url(identifier: str) -> tuple[str, Optional[str]]:
"""Turn an identifier into a cloneable Git URL and optional subdirectory.
Returns ``(git_url, subdir)`` where ``subdir`` is the path within the
cloned repository that contains the plugin (``None`` when the plugin lives
at the repo root).
Accepted formats:
- Full URL: https://github.com/owner/repo.git
- Full URL: git@github.com:owner/repo.git
- Full URL: ssh://git@github.com/owner/repo.git
- Shorthand: owner/repo → https://github.com/owner/repo.git
- Shorthand w/ subdir: owner/repo/path/to/plugin
→ (https://github.com/owner/repo.git, "path/to/plugin")
- Full URL w/ subdir (``.git`` boundary):
https://github.com/owner/repo.git/path/to/plugin
→ (https://github.com/owner/repo.git, "path/to/plugin")
- Any URL w/ explicit subdir fragment (works for every scheme, incl.
``file://`` and ssh): <url>#path/to/plugin
→ (<url>, "path/to/plugin")
NOTE: ``http://`` and ``file://`` schemes are accepted but will trigger a
security warning at install time.
"""
# Already a URL
# Already a URL.
if identifier.startswith(("https://", "http://", "git@", "ssh://", "file://")):
return identifier
# Explicit ``#subdir`` fragment — unambiguous for any scheme.
if "#" in identifier:
git_url, _, frag = identifier.partition("#")
return git_url, (frag.strip("/") or None)
# Natural ``.git/`` boundary (GitHub-style URLs).
marker = ".git/"
idx = identifier.find(marker)
if idx != -1:
git_url = identifier[: idx + len(".git")]
subdir = identifier[idx + len(marker) :].strip("/")
return git_url, (subdir or None)
return identifier, None
# owner/repo shorthand
parts = identifier.strip("/").split("/")
if len(parts) == 2:
owner, repo = parts
return f"https://github.com/{owner}/{repo}.git"
# owner/repo[/subdir...] shorthand
parts = [p for p in identifier.strip("/").split("/") if p]
if len(parts) >= 2:
owner, repo = parts[0], parts[1]
subdir = "/".join(parts[2:]).strip("/")
git_url = f"https://github.com/{owner}/{repo}.git"
return git_url, (subdir or None)
raise ValueError(
f"Invalid plugin identifier: '{identifier}'. "
"Use a Git URL or owner/repo shorthand."
"Use a Git URL or 'owner/repo' shorthand (optionally with a subdirectory: "
"'owner/repo/path/to/plugin')."
)
def _resolve_subdir_within(clone_root: Path, subdir: str) -> Path:
"""Resolve ``subdir`` inside ``clone_root``, rejecting path traversal.
Guards against ``..`` segments, absolute paths, and symlinks that would
escape the cloned repository. Returns the resolved directory path.
Raises ``PluginOperationError`` if the path escapes the clone, doesn't
exist, or is not a directory.
"""
clone_root = clone_root.resolve()
candidate = (clone_root / subdir).resolve()
# The resolved candidate must stay within the clone root.
if candidate != clone_root and clone_root not in candidate.parents:
raise PluginOperationError(
f"Plugin subdirectory '{subdir}' escapes the repository.",
)
if not candidate.exists():
raise PluginOperationError(
f"Plugin subdirectory '{subdir}' does not exist in the repository.",
)
if not candidate.is_dir():
raise PluginOperationError(
f"Plugin subdirectory '{subdir}' is not a directory.",
)
return candidate
def _repo_name_from_url(url: str) -> str:
"""Extract the repo name from a Git URL for the plugin directory name."""
# Strip trailing .git and slashes
@@ -372,14 +427,14 @@ def _install_plugin_core(identifier: str, *, force: bool) -> tuple[Path, dict, s
import tempfile
try:
git_url = _resolve_git_url(identifier)
git_url, subdir = _resolve_git_url(identifier)
except ValueError as e:
raise PluginOperationError(str(e)) from e
plugins_dir = _plugins_dir()
with tempfile.TemporaryDirectory() as tmp:
tmp_target = Path(tmp) / "plugin"
tmp_clone = Path(tmp) / "plugin"
git_exe = _resolve_git_executable()
if not git_exe:
@@ -387,7 +442,7 @@ def _install_plugin_core(identifier: str, *, force: bool) -> tuple[Path, dict, s
try:
result = subprocess.run(
[git_exe, "clone", "--depth", "1", git_url, str(tmp_target)],
[git_exe, "clone", "--depth", "1", git_url, str(tmp_clone)],
capture_output=True,
text=True,
timeout=60,
@@ -405,8 +460,16 @@ def _install_plugin_core(identifier: str, *, force: bool) -> tuple[Path, dict, s
err = (result.stderr or result.stdout or "").strip()
raise PluginOperationError(f"Git clone failed:\n{err}")
# Resolve the directory within the clone that holds the plugin.
if subdir:
tmp_target = _resolve_subdir_within(tmp_clone, subdir)
else:
tmp_target = tmp_clone
manifest = _read_manifest(tmp_target)
plugin_name = manifest.get("name") or _repo_name_from_url(git_url)
plugin_name = manifest.get("name") or (
subdir.rstrip("/").rsplit("/", 1)[-1] if subdir else _repo_name_from_url(git_url)
)
try:
target = _sanitize_plugin_name(plugin_name, plugins_dir)
@@ -471,7 +534,7 @@ def cmd_install(
console = Console()
try:
git_url = _resolve_git_url(identifier)
git_url, _subdir = _resolve_git_url(identifier)
except ValueError as e:
console.print(f"[red]Error:[/red] {e}")
sys.exit(1)
@@ -482,7 +545,10 @@ def cmd_install(
"Consider using https:// or git@ for production installs.",
)
console.print(f"[dim]Cloning {git_url}...[/dim]")
if _subdir:
console.print(f"[dim]Cloning {git_url} (subdir: {_subdir})...[/dim]")
else:
console.print(f"[dim]Cloning {git_url}...[/dim]")
try:
target, installed_manifest, installed_name = _install_plugin_core(
@@ -1473,7 +1539,7 @@ def dashboard_install_plugin(
"""Non-interactive install for the web dashboard. Returns a JSON-serializable dict."""
warnings: list[str] = []
try:
git_url = _resolve_git_url(identifier)
git_url, _subdir = _resolve_git_url(identifier)
if git_url.startswith(("http://", "file://")):
warnings.append(
"Insecure URL scheme; prefer https:// or git@ for production installs.",

View File

@@ -492,10 +492,7 @@ def get_label(provider_id: str) -> str:
def is_aggregator(provider: str) -> bool:
"""Return True when the provider is a multi-model aggregator."""
provider_norm = normalize_provider(provider or "")
if provider_norm.startswith("custom:"):
return True
pdef = get_provider(provider_norm)
pdef = get_provider(provider)
return pdef.is_aggregator if pdef else False

View File

@@ -29,7 +29,6 @@ Examples:
hermes debug share --expire 30 Keep paste for 30 days
hermes debug share --local Print report locally (no upload)
hermes debug share --no-redact Disable upload-time secret redaction
hermes debug share --nous Upload to Nous-internal storage (private)
hermes debug delete <url> Delete a previously uploaded paste
""",
)
@@ -65,17 +64,6 @@ Examples:
"into the public paste service."
),
)
share_parser.add_argument(
"--nous",
action="store_true",
help=(
"Upload the debug bundle to Nous-internal storage (AWS S3) instead "
"of a public paste service. The bundle is private — viewable only "
"by Nous staff (and allowlisted Discord mods) via a Google-login-"
"gated viewer — and auto-deletes after 14 days. Still force-redacts "
"secrets unless --no-redact is also passed."
),
)
delete_parser = debug_sub.add_parser(
"delete",
help="Delete a paste uploaded by 'hermes debug share'",

View File

@@ -41,19 +41,15 @@ except ImportError: # pragma: no cover - httpx is a hermes dependency
logger = logging.getLogger(__name__)
class PhotonDashboardAuthError(RuntimeError):
"""Raised when Photon rejects a device-flow token for the dashboard API."""
# ---------------------------------------------------------------------------
# Constants
# Hosted Photon allowlists registered device clients on the device-code
# endpoint — an unregistered client_id is rejected with
# `400 {"error":"invalid_client"}`. Use Photon's published CLI device
# client until the dashboard API registers Hermes as its own client_id.
DEFAULT_CLIENT_ID = "photon-cli"
DEFAULT_SCOPE = "openid profile email"
# Photon's published OAuth device-client identifier for first-party CLIs.
# We use a fixed "hermes-agent" client_id string — Photon's device endpoint
# accepts any opaque client_id and ties the bearer token to the approving
# user, not to the client. If Photon later requires registered clients,
# this is the one knob to update.
DEFAULT_CLIENT_ID = "hermes-agent"
DEFAULT_DASHBOARD_HOST = "https://app.photon.codes"
DEFAULT_SPECTRUM_HOST = "https://spectrum.photon.codes"
@@ -170,13 +166,6 @@ class DeviceCode:
interval: int
@dataclass(frozen=True)
class _DeviceTokenCandidate:
"""A token-like value extracted from the device-token response."""
source: str
token: str
def _dashboard_host() -> str:
return (os.getenv("PHOTON_DASHBOARD_HOST") or DEFAULT_DASHBOARD_HOST).rstrip("/")
@@ -186,7 +175,7 @@ def _spectrum_host() -> str:
def request_device_code(
*, client_id: str = DEFAULT_CLIENT_ID, scope: Optional[str] = DEFAULT_SCOPE,
*, client_id: str = DEFAULT_CLIENT_ID, scope: Optional[str] = None,
) -> DeviceCode:
"""POST ``/api/auth/device/code`` and return the device + user codes."""
if httpx is None:
@@ -243,22 +232,16 @@ def poll_for_token(
time.sleep(sleep)
continue
if resp.status_code == 200:
body: Dict[str, Any] = {}
try:
decoded = resp.json() or {}
body = decoded if isinstance(decoded, dict) else {}
except (TypeError, ValueError, json.JSONDecodeError):
body = {}
candidates = _device_response_token_candidates(
body, headers=getattr(resp, "headers", {}),
)
if not candidates:
token = resp.headers.get("set-auth-token")
if not token:
body = resp.json() or {}
session = body.get("session") or {}
token = session.get("access_token") or body.get("access_token")
if not token:
raise RuntimeError(
"Photon returned 200 but no token candidate in the "
"device-token response (expected access_token, "
"data.access_token, accessToken, or set-auth-token)."
"Photon returned 200 but no token in headers or body"
)
return candidates[0].token
return token
if resp.status_code == 400:
# RFC 8628 §3.5 — error codes are returned with 400.
body: Dict[str, Any] = {}
@@ -290,142 +273,6 @@ def poll_for_token(
raise TimeoutError("Photon device login timed out")
def _device_response_token_candidates(
body: Dict[str, Any],
*,
headers: Optional[Any] = None,
) -> list:
"""Extract de-duplicated token candidates from a device-token response.
Photon's device-token endpoint has returned tokens under several keys
across versions (``access_token``, ``accessToken``, ``data.*``) and the
documented ``set-auth-token`` response header. We collect every shape so
the caller can validate each against the dashboard API before trusting it.
"""
candidates: list = []
seen: set = set()
def add(source: str, value: Any) -> None:
token = _clean_bearer_token(value)
if not token or token in seen:
return
seen.add(token)
candidates.append(_DeviceTokenCandidate(source=source, token=token))
add("access_token", body.get("access_token"))
add("accessToken", body.get("accessToken"))
session = body.get("session")
if isinstance(session, dict):
add("session.access_token", session.get("access_token"))
data = body.get("data")
if isinstance(data, dict):
add("data.access_token", data.get("access_token"))
add("data.accessToken", data.get("accessToken"))
add("set-auth-token", _header_value(headers, "set-auth-token"))
return candidates
def _clean_bearer_token(value: Any) -> Optional[str]:
if not isinstance(value, str):
return None
token = value.strip()
if token.lower().startswith("bearer "):
token = token[7:].strip()
return token or None
def _header_value(headers: Optional[Any], name: str) -> Optional[str]:
if not headers:
return None
try:
value = headers.get(name)
if value:
return str(value)
except AttributeError:
pass
try:
for key, value in dict(headers).items():
if str(key).lower() == name.lower() and value:
return str(value)
except (TypeError, ValueError):
return None
return None
def _dashboard_get(path: str, token: str) -> Any:
if httpx is None:
raise RuntimeError("httpx is required for Photon device login")
url = f"{_dashboard_host()}{path}"
return httpx.get(
url,
headers={"Authorization": f"Bearer {token}"},
timeout=30.0,
)
def validate_photon_token(token: str) -> Dict[str, Any]:
"""Verify a device-flow token is usable for dashboard project APIs.
The device flow can return a token that authenticates the Better Auth
session lookup but is rejected by the project APIs. Validate against
``/api/auth/get-session`` and ``/api/projects/`` so we fail loudly at
login instead of saving a token that 404s/401s downstream.
"""
resp = _dashboard_get("/api/auth/get-session", token)
if resp.status_code in (401, 403):
raise PhotonDashboardAuthError(
"Photon issued a device token, but the dashboard session lookup "
"rejected it."
)
resp.raise_for_status()
data = resp.json()
user = data.get("user") if isinstance(data, dict) else None
if not isinstance(user, dict) or not user:
raise PhotonDashboardAuthError(
"Photon issued a device token, but the dashboard session lookup "
"did not recognize it."
)
projects_resp = _dashboard_get("/api/projects/", token)
if projects_resp.status_code in (401, 403):
raise PhotonDashboardAuthError(
"Photon device token was accepted for the session lookup but "
"rejected by the project API."
)
projects_resp.raise_for_status()
return user
def _validated_dashboard_token(candidates: list) -> str:
"""Return the first candidate token that passes dashboard validation."""
if not candidates:
raise RuntimeError(
"Photon returned 200 but no token candidate in the device-token "
"response."
)
dashboard_error: Optional[PhotonDashboardAuthError] = None
last_error: Optional[BaseException] = None
for candidate in candidates:
try:
validate_photon_token(candidate.token)
return candidate.token
except PhotonDashboardAuthError as exc:
dashboard_error = exc
last_error = exc
continue
except Exception as exc:
last_error = exc
continue
if dashboard_error is not None:
sources = ", ".join(c.source for c in candidates) or "none"
raise PhotonDashboardAuthError(
f"{dashboard_error} Device login returned no project-valid "
f"dashboard token (tried: {sources})."
) from dashboard_error
if last_error is not None:
raise last_error
raise RuntimeError("Photon did not return a usable dashboard token")
def login_device_flow(
*,
client_id: str = DEFAULT_CLIENT_ID,
@@ -450,13 +297,7 @@ def login_device_flow(
webbrowser.open(target, new=2)
except Exception:
pass
# Poll once for the approved token, then collect every candidate shape so
# we can validate against the dashboard API before persisting (avoids
# saving a token that authenticates the session lookup but 404s on the
# project APIs).
first_token = poll_for_token(code, client_id=client_id)
candidates = [_DeviceTokenCandidate(source="poll", token=first_token)]
token = _validated_dashboard_token(candidates)
token = poll_for_token(code, client_id=client_id)
store_photon_token(token)
return token

View File

@@ -45,7 +45,6 @@ ACP_REGISTRY_MANIFEST = REPO_ROOT / "acp_registry" / "agent.json"
# Auto-extracted from noreply emails + manual overrides
AUTHOR_MAP = {
"raysun12142006@gmail.com": "yanxue06",
"alberto.regalado@ymail.com": "ARegalado1",
"alchemistchaos@protonmail.com": "AlchemistChaos", # co-author only
"gilad@smiti.ai": "giladbau",
@@ -1242,7 +1241,6 @@ AUTHOR_MAP = {
"charliekerfoot@gmail.com": "CharlieKerfoot", # PR #18951
# Debug share upload-time redaction (May 2026)
"dhuysamen@gmail.com": "GodsBoy", # PR #19318
"github@nadyahermes.anonaddy.com": "ruangraung", # PR #42308
"mrcoferland@gmail.com": "mrcoferland", # PR #19023
"chenlinfeng@ruije.com.cn": "noOne-list", # PR #19050
"briansu@Mac-mini.attlocal.net": "likejudy", # PR #19052

View File

@@ -1,31 +1,29 @@
"""Regression tests for #30768, #32383, and #33961.
"""Regression tests for issue #30768 and #32383.
``_prompt_text_input_modal`` answers destructive-slash confirmations through a
queue-based modal driven by prompt_toolkit key bindings. When invoked from the
``process_loop`` daemon thread it sets the modal up on the app's event loop via
``call_soon_threadsafe``, so it is safe on every platform — including native
Windows (#33961), where the earlier ``sys.platform == "win32"`` → raw ``input()``
fallback deadlocked the daemon thread against prompt_toolkit's stdin ownership.
``_prompt_text_input_modal`` uses a queue-based modal that relies on
prompt_toolkit key bindings receiving keyboard events. On Windows the
prompt_toolkit input channel can deadlock when the modal is entered from
the ``process_loop`` daemon thread. The fix falls back to the simpler
``_prompt_text_input`` (stdin-based) prompt on Windows.
These tests verify:
1. Daemon-thread confirm uses the modal via the app loop on Linux AND native
Windows (#33961) — never the raw stdin fallback, never a hang.
2. Main-thread confirm with a running app uses the modal.
3. The raw stdin fallback is kept ONLY for the safe cases: no running app, and
(on win32, off-thread) a scheduling failure degrades to a clean cancel.
4. Empty choices returns None.
1. Windows detection triggers the stdin fallback
2. Non-Windows daemon threads still use the modal via the app loop
3. macOS/Linux main-thread path still uses the modal (no regression)
4. No-app path still uses the stdin fallback (existing behavior)
5. Empty choices returns None (existing behavior)
"""
import queue
import sys
import threading
import time
from unittest.mock import MagicMock, patch
import pytest
def _make_cli():
"""Minimal HermesCLI shell exposing the prompt/modal helpers."""
"""Minimal HermesCLI shell exposing prompt/modal helpers."""
import cli as cli_mod
obj = object.__new__(cli_mod.HermesCLI)
@@ -39,6 +37,9 @@ def _make_cli():
return obj
# ---------------------------------------------------------------------------
# Sample choices used across tests
# ---------------------------------------------------------------------------
_SAMPLE_CHOICES = [
("once", "Approve Once", "proceed this time only"),
("always", "Always Approve", "proceed and silence this prompt permanently"),
@@ -46,106 +47,119 @@ _SAMPLE_CHOICES = [
]
def _answer_modal_when_open(cli, response, stop=None):
"""Push ``response`` onto the modal's response_queue once it opens.
class TestModalWindowsFallback:
"""Windows dead-lock regression tests for _prompt_text_input_modal."""
Gives up after ~2s, or early when ``stop`` is set (the modal will never open,
e.g. a scheduling failure) so degraded-path tests don't wait the full budget.
"""
for _ in range(100):
if stop is not None and stop.is_set():
return
state = cli._slash_confirm_state
if state and "response_queue" in state:
state["response_queue"].put(response)
return
time.sleep(0.02)
def _run_on_daemon(call, cli, *, platform, response, schedule=None):
"""Invoke ``call`` on a daemon thread — as the process_loop does — answering
the modal with ``response`` once it opens.
Returns ``{result, stdin_called, capture, restore}``. ``schedule`` overrides
the ``call_soon_threadsafe`` side effect (default: run the callback inline);
pass a raiser to simulate a scheduling failure. Fails if the worker hangs,
which is the deadlock canary for #33961.
"""
outcome = {"capture": [], "restore": [], "result": None, "stdin_called": False}
done = threading.Event()
def _worker():
try:
with patch.object(sys, "platform", platform), \
patch.object(cli._app.loop, "call_soon_threadsafe", side_effect=schedule or (lambda cb: cb())), \
patch.object(cli, "_prompt_text_input") as mock_stdin, \
patch.object(cli, "_invalidate"), \
patch.object(cli, "_capture_modal_input_snapshot", side_effect=lambda: outcome["capture"].append(1)), \
patch.object(cli, "_restore_modal_input_snapshot", side_effect=lambda: outcome["restore"].append(1)):
outcome["result"] = call()
outcome["stdin_called"] = mock_stdin.called
finally:
done.set()
worker = threading.Thread(target=_worker, daemon=True)
answerer = threading.Thread(target=_answer_modal_when_open, args=(cli, response, done), daemon=True)
answerer.start()
worker.start()
worker.join(timeout=2.0)
answerer.join(timeout=2.0)
assert not worker.is_alive(), "daemon thread hung — modal deadlocked"
return outcome
class TestModal:
"""Behaviour of _prompt_text_input_modal across platforms and threads."""
@pytest.mark.parametrize("platform", ["linux", "win32"])
def test_daemon_thread_uses_modal_via_app_loop(self, platform):
"""Off the process_loop daemon thread, the confirm uses the modal via
call_soon_threadsafe on every platform — including native Windows, where
the old win32 early-return deadlocked on raw input() (#33961)."""
def test_windows_falls_back_to_stdin(self):
"""On Windows, _prompt_text_input_modal should use _prompt_text_input."""
cli = _make_cli()
outcome = _run_on_daemon(
lambda: cli._prompt_text_input_modal(
title="⚠️ /reset",
with patch.object(sys, "platform", "win32"), \
patch.object(cli, "_prompt_text_input", return_value="1") as mock_stdin:
result = cli._prompt_text_input_modal(
title="⚠️ /new — destroys conversation state",
detail="This starts a fresh session.",
choices=_SAMPLE_CHOICES,
timeout=5,
),
cli,
platform=platform,
response="once",
)
assert outcome["stdin_called"] is False, "must use the modal, not raw input()"
assert outcome["result"] == "once"
assert outcome["capture"] == [1]
assert outcome["restore"] == [1]
assert cli._slash_confirm_state is None
)
def test_main_thread_with_app_uses_modal(self):
"""On the main thread with a running app, the queue-based modal is used."""
# The stdin-based fallback was used, not the modal queue path.
mock_stdin.assert_called_once_with("Choice [1/2/3]: ")
assert result == "1"
def test_non_main_thread_uses_modal_via_app_loop(self):
"""Off the main thread on Linux, keep the modal path via app-loop setup."""
cli = _make_cli()
result_holder = {}
setup_calls = []
teardown_calls = []
def _call_soon_threadsafe(callback):
callback()
def run_on_daemon():
with patch.object(sys, "platform", "linux"), \
patch.object(cli._app.loop, "call_soon_threadsafe", side_effect=_call_soon_threadsafe), \
patch.object(cli, "_prompt_text_input") as mock_stdin, \
patch.object(cli, "_capture_modal_input_snapshot", side_effect=lambda: setup_calls.append("capture")), \
patch.object(cli, "_restore_modal_input_snapshot", side_effect=lambda: teardown_calls.append("restore")):
result_holder["result"] = cli._prompt_text_input_modal(
title="⚠️ /reset",
detail="This starts a fresh session.",
choices=_SAMPLE_CHOICES,
timeout=5,
)
result_holder["stdin_called"] = mock_stdin.called
def _submit_after_delay():
time.sleep(0.2)
state = cli._slash_confirm_state
if state and "response_queue" in state:
state["response_queue"].put("once")
submitter = threading.Thread(target=_submit_after_delay, daemon=True)
t = threading.Thread(target=run_on_daemon, daemon=True)
submitter.start()
t.start()
t.join(timeout=2.0)
submitter.join(timeout=2.0)
assert not t.is_alive(), "daemon thread hung — modal deadlocked"
assert result_holder["stdin_called"] is False
assert result_holder["result"] == "once"
assert setup_calls == ["capture"]
assert teardown_calls == ["restore"]
def test_main_thread_non_windows_uses_modal(self):
"""On macOS/Linux main thread, the queue-based modal is still used."""
cli = _make_cli()
# We need to simulate the modal receiving a response. We'll patch
# the response_queue to immediately return a value.
with patch.object(sys, "platform", "darwin"), \
patch.object(cli, "_capture_modal_input_snapshot"), \
patch.object(cli, "_restore_modal_input_snapshot"), \
patch.object(cli, "_invalidate"), \
patch.object(cli, "_prompt_text_input") as mock_stdin:
answerer = threading.Thread(target=_answer_modal_when_open, args=(cli, "once"), daemon=True)
answerer.start()
result = cli._prompt_text_input_modal(
title="⚠️ /new",
detail="This starts a fresh session.",
choices=_SAMPLE_CHOICES,
timeout=5,
)
answerer.join(timeout=2.0)
patch.object(cli, "_invalidate"):
# Start the modal in a way that it will receive a response
# immediately via the queue.
original_queue = queue.Queue
original_time = time.monotonic
mock_stdin.assert_not_called()
assert result == "once"
def _fake_modal_flow(*args, **kwargs):
"""Simulate the modal flow: set state, put response, return."""
# We'll directly test that the modal path is entered by
# checking that _slash_confirm_state was set.
pass
# Since we can't easily mock the internal queue, let's test
# that the modal path is entered by checking that
# _prompt_text_input was NOT called.
with patch.object(cli, "_prompt_text_input") as mock_stdin:
# Set up a response that will be put into the queue
# after the modal starts waiting.
def _submit_after_delay():
time.sleep(0.2)
state = cli._slash_confirm_state
if state and "response_queue" in state:
state["response_queue"].put("once")
submitter = threading.Thread(target=_submit_after_delay, daemon=True)
submitter.start()
result = cli._prompt_text_input_modal(
title="⚠️ /new",
detail="This starts a fresh session.",
choices=_SAMPLE_CHOICES,
timeout=5,
)
submitter.join(timeout=2.0)
# The stdin fallback should NOT have been called.
mock_stdin.assert_not_called()
# The result should be "once" from the simulated modal response.
assert result == "once"
def test_no_app_falls_back_to_stdin(self):
"""Without a running app (oneshot / non-interactive), use the stdin prompt."""
"""Without a prompt_toolkit app, always use stdin fallback."""
cli = _make_cli()
cli._app = None
@@ -159,102 +173,78 @@ class TestModal:
mock_stdin.assert_called_once_with("Choice [1/2/3]: ")
assert result == "3"
def test_windows_no_app_falls_back_to_stdin(self):
"""win32 without a running app keeps stdin — the only case where the raw
prompt is safe on Windows, since no app owns the console to deadlock."""
cli = _make_cli()
cli._app = None
with patch.object(sys, "platform", "win32"), \
patch.object(cli, "_prompt_text_input", return_value="1") as mock_stdin:
result = cli._prompt_text_input_modal(
title="⚠️ /new — destroys conversation state",
detail="This starts a fresh session.",
choices=_SAMPLE_CHOICES,
)
mock_stdin.assert_called_once_with("Choice [1/2/3]: ")
assert result == "1"
def test_windows_scheduling_failure_clean_cancels(self):
"""win32 off the main thread: if marshaling onto the app loop fails, cancel
cleanly (None) rather than fall to raw input() (which deadlocks on native
Windows) or hang. Asserts the _stdin_fallback guard (#33961)."""
cli = _make_cli()
def _raise(_cb):
raise RuntimeError("loop closed")
outcome = _run_on_daemon(
lambda: cli._prompt_text_input_modal(
title="⚠️ /reset",
detail="This starts a fresh session.",
choices=_SAMPLE_CHOICES,
timeout=5,
),
cli,
platform="win32",
response="once",
schedule=_raise,
)
assert outcome["stdin_called"] is False, "win32 off-thread must NOT call raw input()"
assert outcome["result"] is None
assert cli._slash_confirm_state is None
@pytest.mark.parametrize(
"platform, expect_stdin, expect_result",
[("win32", False, None), ("linux", True, "1")],
)
def test_daemon_thread_no_app_loop_uses_fallback(self, platform, expect_stdin, expect_result):
"""Off the daemon thread with no resolvable app loop (``self._app.loop``
is None / raises), the modal can never be scheduled, so the method short-
circuits at the app_loop-is-None site (cli.py ~7260) — a distinct path
from a call_soon_threadsafe failure. win32 clean-cancels (None) instead of
deadlocking on raw input(); other platforms keep the stdin prompt."""
cli = _make_cli()
cli._app.loop = None # forces app_loop is None, off the main thread
outcome = {"result": None, "stdin_called": False}
done = threading.Event()
def _worker():
try:
with patch.object(sys, "platform", platform), \
patch.object(cli, "_prompt_text_input", return_value="1") as mock_stdin, \
patch.object(cli, "_invalidate"):
outcome["result"] = cli._prompt_text_input_modal(
title="⚠️ /reset",
detail="This starts a fresh session.",
choices=_SAMPLE_CHOICES,
timeout=5,
)
outcome["stdin_called"] = mock_stdin.called
finally:
done.set()
worker = threading.Thread(target=_worker, daemon=True)
worker.start()
worker.join(timeout=2.0)
assert not worker.is_alive(), "daemon thread hung — modal deadlocked"
assert outcome["stdin_called"] is expect_stdin
assert outcome["result"] == expect_result
assert cli._slash_confirm_state is None
def test_empty_choices_returns_none(self):
"""Empty choices returns None without prompting."""
"""Empty choices list should return None without prompting."""
cli = _make_cli()
with patch.object(cli, "_prompt_text_input") as mock_stdin:
result = cli._prompt_text_input_modal(title="Test", detail="Test", choices=[])
result = cli._prompt_text_input_modal(
title="Test",
detail="Test",
choices=[],
)
mock_stdin.assert_not_called()
assert result is None
def test_windows_fallback_does_not_set_modal_state(self):
"""Verify Windows fallback doesn't leave _slash_confirm_state set."""
cli = _make_cli()
with patch.object(sys, "platform", "win32"), \
patch.object(cli, "_prompt_text_input", return_value="1"):
cli._prompt_text_input_modal(
title="⚠️ /reset",
detail="This starts a fresh session.",
choices=_SAMPLE_CHOICES,
)
assert cli._slash_confirm_state is None
def test_non_main_thread_modal_clears_state(self):
"""Verify daemon-thread modal teardown does not leave state behind."""
cli = _make_cli()
errors = []
def _call_soon_threadsafe(callback):
callback()
def run_on_daemon():
try:
with patch.object(sys, "platform", "linux"), \
patch.object(cli._app.loop, "call_soon_threadsafe", side_effect=_call_soon_threadsafe):
def _submit_after_delay():
time.sleep(0.2)
state = cli._slash_confirm_state
if state and "response_queue" in state:
state["response_queue"].put("cancel")
submitter = threading.Thread(target=_submit_after_delay, daemon=True)
submitter.start()
cli._prompt_text_input_modal(
title="⚠️ /new",
detail="This starts a fresh session.",
choices=_SAMPLE_CHOICES,
timeout=5,
)
submitter.join(timeout=2.0)
if cli._slash_confirm_state is not None:
errors.append("_slash_confirm_state should be None")
except Exception as exc:
errors.append(str(exc))
t = threading.Thread(target=run_on_daemon, daemon=True)
t.start()
t.join(timeout=2.0)
assert not errors, f"unexpected errors: {errors}"
assert cli._slash_confirm_state is None
class TestConfirmDestructiveSlashWindows:
"""End-to-end _confirm_destructive_slash on the native-Windows daemon thread."""
"""Integration-level tests for _confirm_destructive_slash on Windows."""
def _make_interactive_cli(self):
def test_confirm_destructive_slash_bypasses_modal_on_windows(self):
"""_confirm_destructive_slash should work on Windows via stdin fallback."""
cli = _make_cli()
cli.model = "test-model"
cli._agent_running = False
@@ -265,140 +255,37 @@ class TestConfirmDestructiveSlashWindows:
cli._pending_tool_info = {}
cli._tool_start_time = 0.0
cli._last_scrollback_tool = ""
return cli
@pytest.mark.parametrize(
"response, expected",
[("once", "once"), ("cancel", None)],
)
def test_confirm_destructive_slash_uses_modal_on_windows(self, response, expected):
"""On native Windows, the bare /new confirm drives the modal (not stdin)
and returns the chosen outcome — the bug #33961 froze this path."""
cli = self._make_interactive_cli()
with patch("cli.load_cli_config", return_value={"approvals": {"destructive_slash_confirm": True}}):
outcome = _run_on_daemon(
lambda: cli._confirm_destructive_slash(
"new",
"This starts a fresh session.\nThe current conversation history will be discarded.",
),
cli,
platform="win32",
response=response,
with patch.object(sys, "platform", "win32"), \
patch.object(cli, "_prompt_text_input", return_value="1"), \
patch("cli.load_cli_config", return_value={"approvals": {"destructive_slash_confirm": True}}):
result = cli._confirm_destructive_slash(
"new",
"This starts a fresh session.\nThe current conversation history will be discarded.",
)
assert outcome["stdin_called"] is False
assert outcome["result"] == expected
assert result == "once"
class TestNativeWindowsNoRawInputDeadlock:
"""Anti-regression guard exercising the REAL ``_prompt_text_input``.
Every other test here mocks ``_prompt_text_input`` away, so they only
assert *routing* (modal vs. stdin) — they cannot observe the actual hang
that #33961 was. The historical regression was precisely that
``_prompt_text_input_modal`` delegated to the *real* ``_prompt_text_input``
on native Windows, which on a non-main thread runs a bare ``input()`` that
blocks forever against prompt_toolkit's stdin ownership.
These tests let the real ``_prompt_text_input`` run with a blocking
``input()`` and assert the worker thread never hangs. They fail on the
pre-#33961 code (win32 → ``_prompt_text_input`` → off-main ``input()``)
and pass once the modal path / clean-cancel fallback is in place.
"""
def test_win32_daemon_thread_never_blocks_on_real_input(self):
"""A blocking input() must NOT hang the daemon thread on win32.
Drives the genuine helper chain (no mock of ``_prompt_text_input``)
with ``builtins.input`` patched to block forever. The confirm must
resolve via the app-loop modal (answered on a background thread, as
the real key bindings would) and never sit in ``input()``. On the
pre-#33961 code the win32 early-return routed to the real
``_prompt_text_input`` → off-main ``input()`` → permanent hang.
"""
def test_confirm_destructive_slash_cancelled_on_windows(self):
"""Cancellation via stdin fallback works on Windows."""
cli = _make_cli()
cli._app.loop.call_soon_threadsafe = lambda cb: cb()
cli.model = "test-model"
cli._agent_running = False
cli._spinner_text = ""
cli._should_exit = False
cli._command_running = False
cli.session_id = "test-session"
cli._pending_tool_info = {}
cli._tool_start_time = 0.0
cli._last_scrollback_tool = ""
def _blocking_input(prompt=""): # stands in for "no line ever arrives"
time.sleep(30)
return "1"
with patch.object(sys, "platform", "win32"), \
patch.object(cli, "_prompt_text_input", return_value="3"), \
patch("cli.load_cli_config", return_value={"approvals": {"destructive_slash_confirm": True}}):
result = cli._confirm_destructive_slash(
"reset",
"This starts a fresh session.\nThe current conversation history will be discarded.",
)
outcome = {}
done = threading.Event()
def _worker():
try:
with patch.object(sys, "platform", "win32"), \
patch("builtins.input", side_effect=_blocking_input), \
patch.object(cli, "_capture_modal_input_snapshot"), \
patch.object(cli, "_restore_modal_input_snapshot"), \
patch.object(cli, "_invalidate"):
outcome["result"] = cli._prompt_text_input_modal(
title="/new",
detail="destroys conversation state",
choices=_SAMPLE_CHOICES,
timeout=3,
)
finally:
done.set()
worker = threading.Thread(target=_worker, daemon=True)
answerer = threading.Thread(
target=_answer_modal_when_open, args=(cli, "cancel", done), daemon=True
)
answerer.start()
worker.start()
worker.join(timeout=5.0)
answerer.join(timeout=5.0)
assert not worker.is_alive(), (
"daemon thread hung in real input() — native-Windows confirm "
"deadlock regressed (#33961)"
)
# cancel → None; the point is it RETURNED rather than blocking forever.
assert outcome.get("result") in (None, "cancel")
def test_win32_scheduling_failure_cleanly_cancels_no_input(self):
"""If the modal can't be marshaled onto the app loop on native Windows
(scheduling failure) the off-main-thread path must cancel cleanly —
NOT fall through to a blocking raw ``input()``.
This is the degraded branch the pre-#33961 code handled with
``return self._prompt_text_input(...)`` (which deadlocks); the fix
returns ``None`` instead.
"""
cli = _make_cli()
def _raise(cb): # call_soon_threadsafe scheduling failure
raise RuntimeError("event loop closed")
cli._app.loop.call_soon_threadsafe = _raise
input_called = {"n": 0}
def _tracking_input(prompt=""):
input_called["n"] += 1
time.sleep(30)
return "1"
outcome = {}
def _worker():
with patch.object(sys, "platform", "win32"), \
patch("builtins.input", side_effect=_tracking_input), \
patch.object(cli, "_invalidate"):
outcome["result"] = cli._prompt_text_input_modal(
title="/new",
detail="destroys conversation state",
choices=_SAMPLE_CHOICES,
timeout=3,
)
worker = threading.Thread(target=_worker, daemon=True)
worker.start()
worker.join(timeout=5.0)
assert not worker.is_alive(), (
"daemon thread hung — win32 scheduling-failure fallback used raw "
"input() instead of cleanly cancelling (#33961)"
)
assert input_called["n"] == 0, "win32 off-thread fallback must not call input()"
assert outcome.get("result") is None
# Choice "3" normalizes to "cancel", which returns None.
assert result is None

View File

@@ -1264,123 +1264,3 @@ async def test_verbose_mode_respects_explicit_tool_preview_length(monkeypatch, t
assert VerboseAgent.LONG_CODE not in all_content
# But should still contain the truncated portion with "..."
assert "..." in all_content
class CodeBlockProgressAdapter(ProgressCaptureAdapter):
"""A markdown-capable progress adapter (declares supports_code_blocks)."""
supports_code_blocks = True
class TerminalCommandAgent:
"""Emits a terminal tool.started with a real, multi-line command arg."""
CMD = (
"set -euo pipefail\n"
"printf 'node: '; node --version\n"
"npm install -g hyperframes@latest"
)
def __init__(self, **kwargs):
self.tool_progress_callback = kwargs.get("tool_progress_callback")
self.tools = []
def run_conversation(self, message, conversation_history=None, task_id=None):
self.tool_progress_callback(
"tool.started", "terminal", self.CMD, {"command": self.CMD}
)
# Let the async progress task drain the queue and send before returning.
time.sleep(0.35)
return {"final_response": "done", "messages": [], "api_calls": 1}
@pytest.mark.asyncio
async def test_terminal_progress_is_truncated_preview_not_bash_block(monkeypatch, tmp_path):
"""Regression for #41215: terminal progress must render as a short truncated
preview, never the full command in a fenced ```bash block, even on a
markdown-capable (supports_code_blocks) gateway."""
monkeypatch.setenv("HERMES_TOOL_PROGRESS_MODE", "all")
fake_dotenv = types.ModuleType("dotenv")
fake_dotenv.load_dotenv = lambda *args, **kwargs: None
monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv)
fake_run_agent = types.ModuleType("run_agent")
fake_run_agent.AIAgent = TerminalCommandAgent
monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent)
import tools.terminal_tool # noqa: F401 - register terminal emoji
adapter = CodeBlockProgressAdapter(platform=Platform.TELEGRAM)
runner = _make_runner(adapter)
gateway_run = importlib.import_module("gateway.run")
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"})
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id="12345",
chat_type="dm",
thread_id=None,
)
result = await runner._run_agent(
message="hello",
context_prompt="",
history=[],
source=source,
session_id="sess-terminal-no-bash-block",
session_key="agent:main:telegram:dm:12345",
)
assert result["final_response"] == "done"
all_content = " ".join(call["content"] for call in adapter.sent)
all_content += " ".join(call["content"] for call in adapter.edits)
# Compact truncated preview, not a fenced bash block.
assert "```bash" not in all_content
assert 'terminal: "' in all_content
# The full multi-line command body must not reach the chat.
assert "npm install -g hyperframes@latest" not in all_content
@pytest.mark.asyncio
async def test_terminal_progress_no_bash_block_in_verbose_mode(monkeypatch, tmp_path):
"""#41215 also rendered the bash block in verbose mode. The revert removed it
from both branches, so verbose progress must not emit a fenced ```bash block
either (verbose still shows args by opt-in, just not as a code block)."""
monkeypatch.setenv("HERMES_TOOL_PROGRESS_MODE", "verbose")
fake_dotenv = types.ModuleType("dotenv")
fake_dotenv.load_dotenv = lambda *args, **kwargs: None
monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv)
fake_run_agent = types.ModuleType("run_agent")
fake_run_agent.AIAgent = TerminalCommandAgent
monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent)
import tools.terminal_tool # noqa: F401 - register terminal emoji
adapter = CodeBlockProgressAdapter(platform=Platform.TELEGRAM)
runner = _make_runner(adapter)
gateway_run = importlib.import_module("gateway.run")
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"})
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id="12345",
chat_type="dm",
thread_id=None,
)
result = await runner._run_agent(
message="hello",
context_prompt="",
history=[],
source=source,
session_id="sess-terminal-verbose-no-bash",
session_key="agent:main:telegram:dm:12345",
)
assert result["final_response"] == "done"
all_content = " ".join(call["content"] for call in adapter.sent)
all_content += " ".join(call["content"] for call in adapter.edits)
assert "```bash" not in all_content

View File

@@ -835,7 +835,7 @@ class TestEditMessageStreamingSafety:
assert second_call == {
"chat_id": 123,
"message_id": 456,
"text": "final bold",
"text": "final **bold**",
}
@pytest.mark.asyncio

View File

@@ -491,7 +491,6 @@ class TestRunDebugShare:
args.lines = 50
args.expire = 7
args.local = False
args.nous = False
with patch("hermes_cli.dump.run_dump"), \
patch("hermes_cli.debug._sweep_expired_pastes", return_value=(0, 0)) as mock_sweep, \
@@ -510,7 +509,6 @@ class TestRunDebugShare:
args.lines = 50
args.expire = 7
args.local = False
args.nous = False
with patch("hermes_cli.dump.run_dump"), \
patch(
@@ -531,7 +529,6 @@ class TestRunDebugShare:
args.lines = 50
args.expire = 7
args.local = True
args.nous = False
with patch("hermes_cli.dump.run_dump"):
run_debug_share(args)
@@ -549,7 +546,6 @@ class TestRunDebugShare:
args.lines = 50
args.expire = 7
args.local = False
args.nous = False
call_count = [0]
uploaded_content = []
@@ -603,7 +599,6 @@ class TestRunDebugShare:
args.lines = 50
args.expire = 7
args.local = False
args.nous = False
uploaded_content = []
@@ -649,7 +644,6 @@ class TestRunDebugShare:
args.lines = 50
args.expire = 7
args.local = False
args.nous = False
call_count = [0]
def _mock_upload(content, expiry_days=7):
@@ -674,7 +668,6 @@ class TestRunDebugShare:
args.lines = 50
args.expire = 7
args.local = False
args.nous = False
call_count = [0]
def _mock_upload(content, expiry_days=7):
@@ -701,7 +694,6 @@ class TestRunDebugShare:
args.lines = 50
args.expire = 7
args.local = False
args.nous = False
with patch("hermes_cli.dump.run_dump"), \
patch("hermes_cli.debug.upload_to_pastebin",
@@ -750,7 +742,6 @@ class TestRunDebugShareRedaction:
args.lines = 50
args.expire = 7
args.local = False
args.nous = False
args.no_redact = False
captured: list[str] = []
@@ -781,7 +772,6 @@ class TestRunDebugShareRedaction:
args.lines = 50
args.expire = 7
args.local = False
args.nous = False
args.no_redact = False
captured: list[str] = []
@@ -810,7 +800,6 @@ class TestRunDebugShareRedaction:
args.lines = 50
args.expire = 7
args.local = False
args.nous = False
args.no_redact = True
captured: list[str] = []
@@ -861,7 +850,6 @@ class TestRunDebug:
args.lines = 200
args.expire = 7
args.local = True
args.nous = False
with patch("hermes_cli.dump.run_dump"):
run_debug(args)
@@ -1240,7 +1228,6 @@ class TestShareIncludesAutoDelete:
args.lines = 50
args.expire = 7
args.local = False
args.nous = False
with patch("hermes_cli.dump.run_dump"), \
patch("hermes_cli.debug.upload_to_pastebin",
@@ -1263,7 +1250,6 @@ class TestShareIncludesAutoDelete:
args.lines = 50
args.expire = 7
args.local = False
args.nous = False
with patch("hermes_cli.dump.run_dump"), \
patch("hermes_cli.debug.upload_to_pastebin",
@@ -1281,7 +1267,6 @@ class TestShareIncludesAutoDelete:
args.lines = 50
args.expire = 7
args.local = True
args.nous = False
with patch("hermes_cli.dump.run_dump"):
run_debug_share(args)
@@ -1395,220 +1380,3 @@ class TestBuildDebugShare:
), patch("hermes_cli.debug._schedule_auto_delete"):
with pytest.raises(RuntimeError, match="all paste services down"):
build_debug_share(log_lines=50, redact=True)
# ---------------------------------------------------------------------------
# Shared bundle collection + Nous-S3 path
# ---------------------------------------------------------------------------
class TestCollectShareBundle:
def test_returns_report_and_logs(self, hermes_home):
from hermes_cli.debug import collect_share_bundle
with patch("hermes_cli.dump.run_dump"):
bundle = collect_share_bundle(log_lines=50, redact=True)
assert "report" in bundle
assert "agent.log" in bundle
assert "gateway.log" in bundle
assert "desktop.log" in bundle
# Banner is prepended under redact=True.
assert "redacted at upload time" in bundle["report"]
assert "session started" in bundle["agent.log"]
def test_no_redact_omits_banner(self, hermes_home):
from hermes_cli.debug import collect_share_bundle
with patch("hermes_cli.dump.run_dump"):
bundle = collect_share_bundle(log_lines=50, redact=False)
assert "redacted at upload time" not in bundle["report"]
def test_redaction_keeps_secrets_out(self, hermes_home):
from hermes_cli.debug import collect_share_bundle
secret = "sk-proj-abcdefghijklmnopqrstuvwxyz1234567890"
(hermes_home / "logs" / "agent.log").write_text(
f"line one\nOPENAI_API_KEY={secret}\nline three\n"
)
with patch("hermes_cli.dump.run_dump"):
redacted = collect_share_bundle(log_lines=50, redact=True)
unredacted = collect_share_bundle(log_lines=50, redact=False)
# Sanity: without redaction the secret is present in the bundle.
assert secret in "\n".join(unredacted.values())
# With redaction it must be scrubbed everywhere.
assert secret not in "\n".join(redacted.values())
def test_build_debug_share_uses_collector(self, hermes_home):
# build_debug_share must produce the same report text the collector does
# (i.e. the refactor preserved paste.rs behaviour).
from hermes_cli.debug import build_debug_share, collect_share_bundle
with patch("hermes_cli.dump.run_dump"):
expected = collect_share_bundle(log_lines=50, redact=True)["report"]
uploaded = []
def _upload(content, expiry_days=7):
uploaded.append(content)
return "https://paste.rs/x"
with patch("hermes_cli.dump.run_dump"), patch(
"hermes_cli.debug.upload_to_pastebin", side_effect=_upload
), patch("hermes_cli.debug._schedule_auto_delete"):
result = build_debug_share(log_lines=50, redact=True)
assert result.urls["Report"] == "https://paste.rs/x"
# The report uploaded should match the collector's report.
assert uploaded[0] == expected
class TestBuildNousBundle:
def test_envelope_shape_and_gzip(self, hermes_home):
import gzip
import json as _json
from hermes_cli.debug import build_nous_bundle
files = {"report": "hello", "agent.log": "log line"}
blob = build_nous_bundle(files, redact=True)
# It's gzip — magic bytes.
assert blob[:2] == b"\x1f\x8b"
envelope = _json.loads(gzip.decompress(blob).decode())
assert envelope["format"] == "hermes-debug-share/1"
assert envelope["redacted"] is True
assert envelope["files"] == files
assert "created" in envelope
def test_redacted_false_recorded(self):
import gzip
import json as _json
from hermes_cli.debug import build_nous_bundle
blob = build_nous_bundle({"report": "x"}, redact=False)
envelope = _json.loads(gzip.decompress(blob).decode())
assert envelope["redacted"] is False
class TestRunDebugShareNous:
def _args(self, **over):
class _A:
lines = 50
expire = 7
local = False
nous = True
no_redact = False
a = _A()
for k, v in over.items():
setattr(a, k, v)
return a
def test_nous_success_prints_view_url(self, hermes_home, capsys):
from hermes_cli.debug import run_debug_share
res = {
"id": "id-1",
"viewUrl": "https://support.example.com/diagnostics/id-1",
"expiresAt": "2026-06-20T00:00:00Z",
}
with patch("hermes_cli.dump.run_dump"), patch(
"hermes_cli.diagnostics_upload.share_to_nous", return_value=res
) as share:
run_debug_share(self._args())
out = capsys.readouterr().out
assert "Nous-INTERNAL" in out
assert "https://support.example.com/diagnostics/id-1" in out
assert "2026-06-20T00:00:00Z" in out
# The blob passed to share_to_nous must be gzip bytes.
blob = share.call_args[0][0]
assert isinstance(blob, (bytes, bytearray)) and blob[:2] == b"\x1f\x8b"
def test_nous_failure_suggests_local(self, hermes_home, capsys):
from hermes_cli.debug import run_debug_share
with patch("hermes_cli.dump.run_dump"), patch(
"hermes_cli.diagnostics_upload.share_to_nous",
side_effect=RuntimeError("service down"),
):
with pytest.raises(SystemExit) as exc:
run_debug_share(self._args())
assert exc.value.code == 1
err = capsys.readouterr().err
assert "Nous upload failed" in err
assert "--local" in err
def test_nous_does_not_touch_pastebin(self, hermes_home):
from hermes_cli.debug import run_debug_share
res = {"id": "id-1", "viewUrl": "https://v"}
with patch("hermes_cli.dump.run_dump"), patch(
"hermes_cli.diagnostics_upload.share_to_nous", return_value=res
), patch("hermes_cli.debug.upload_to_pastebin") as paste:
run_debug_share(self._args())
paste.assert_not_called()
class TestDebugSlashCommand:
"""`/debug [nous|local]` parsing in the CLI/TUI handler.
The classic CLI and the TUI slash worker both dispatch through
``HermesCLI.process_command`` → ``_handle_debug_command(cmd_original)``,
which parses an optional destination word and builds the args namespace
handed to ``run_debug_share``.
"""
def _handler(self):
from hermes_cli.cli_commands_mixin import CLICommandsMixin
class _Stub(CLICommandsMixin):
pass
return _Stub()._handle_debug_command
def _captured(self, cmd_original):
captured = {}
def _fake_run(args):
captured.update(vars(args))
with patch("hermes_cli.debug.run_debug_share", _fake_run):
self._handler()(cmd_original)
return captured
def test_bare_debug_defaults_to_paste(self):
c = self._captured("/debug")
assert c["nous"] is False and c["local"] is False
assert c["lines"] == 200 and c["expire"] == 7
def test_nous_word_sets_nous(self):
c = self._captured("/debug nous")
assert c["nous"] is True and c["local"] is False
def test_local_word_sets_local(self):
c = self._captured("/debug local")
assert c["local"] is True and c["nous"] is False
def test_word_parsing_is_case_insensitive(self):
c = self._captured("/debug NOUS")
assert c["nous"] is True
def test_local_wins_over_nous(self):
# local never touches the network, so it takes precedence.
c = self._captured("/debug nous local")
assert c["local"] is True and c["nous"] is False
def test_unknown_word_falls_back_to_default(self):
c = self._captured("/debug paste")
assert c["nous"] is False and c["local"] is False
def test_no_arg_default_keyword(self):
# Calling with no cmd_original (legacy callers) must still work.
c = self._captured("")
assert c["nous"] is False and c["local"] is False

View File

@@ -1,227 +0,0 @@
"""Tests for ``hermes_cli.diagnostics_upload`` — the Nous-S3 upload client.
All network I/O is mocked at ``urllib.request.urlopen``; no real requests
are made.
"""
import io
import json
import urllib.error
from unittest.mock import MagicMock, patch
import pytest
def _resp(*, status=200, body=b""):
"""Build a context-manager mock mimicking ``urllib.request.urlopen``."""
m = MagicMock()
m.status = status
m.getcode.return_value = status
m.read.return_value = body
m.__enter__ = lambda s: s
m.__exit__ = MagicMock(return_value=False)
return m
# ---------------------------------------------------------------------------
# request_upload_url
# ---------------------------------------------------------------------------
class TestRequestUploadUrl:
def test_happy_path_posts_json_and_returns_dict(self):
from hermes_cli.diagnostics_upload import request_upload_url
payload = {
"success": True,
"id": "abc-123",
"uploadUrl": "https://bucket.s3.amazonaws.com/uploads/abc-123.json.gz?sig",
"viewUrl": "https://support.example.com/diagnostics/abc-123",
"uploadExpiresInSeconds": 900,
}
resp = _resp(status=200, body=json.dumps(payload).encode())
with patch(
"hermes_cli.diagnostics_upload.urllib.request.urlopen",
return_value=resp,
) as urlopen:
result = request_upload_url(content_type="application/gzip", size_bytes=512)
assert result == payload
# The request object passed to urlopen carries our JSON body + headers.
req = urlopen.call_args[0][0]
assert req.method == "POST"
assert req.full_url.endswith("/api/diagnostics/upload-url")
sent = json.loads(req.data.decode())
assert sent["contentType"] == "application/gzip"
assert sent["sizeBytes"] == 512
# urllib lower-cases header keys.
assert req.headers["Content-type"] == "application/json"
def test_non_2xx_raises(self):
from hermes_cli.diagnostics_upload import request_upload_url
resp = _resp(status=500, body=b"boom")
with patch(
"hermes_cli.diagnostics_upload.urllib.request.urlopen",
return_value=resp,
):
with pytest.raises(RuntimeError):
request_upload_url()
def test_missing_upload_url_raises(self):
from hermes_cli.diagnostics_upload import request_upload_url
resp = _resp(status=200, body=json.dumps({"id": "x"}).encode())
with patch(
"hermes_cli.diagnostics_upload.urllib.request.urlopen",
return_value=resp,
):
with pytest.raises(RuntimeError):
request_upload_url()
def test_non_json_raises(self):
from hermes_cli.diagnostics_upload import request_upload_url
resp = _resp(status=200, body=b"<html>not json</html>")
with patch(
"hermes_cli.diagnostics_upload.urllib.request.urlopen",
return_value=resp,
):
with pytest.raises(RuntimeError):
request_upload_url()
def test_base_url_env_override(self, monkeypatch):
# NAS_BASE is read at import time; re-import the module under the
# patched env to confirm the override is honoured.
import importlib
monkeypatch.setenv("HERMES_DIAGNOSTICS_BASE_URL", "https://staging.example.com")
import hermes_cli.diagnostics_upload as mod
mod = importlib.reload(mod)
try:
assert mod.NAS_BASE == "https://staging.example.com"
resp = _resp(
status=200,
body=json.dumps({"uploadUrl": "u", "id": "i", "viewUrl": "v"}).encode(),
)
with patch(
"hermes_cli.diagnostics_upload.urllib.request.urlopen",
return_value=resp,
) as urlopen:
mod.request_upload_url()
req = urlopen.call_args[0][0]
assert req.full_url == "https://staging.example.com/api/diagnostics/upload-url"
finally:
monkeypatch.delenv("HERMES_DIAGNOSTICS_BASE_URL", raising=False)
importlib.reload(mod)
# ---------------------------------------------------------------------------
# put_bundle
# ---------------------------------------------------------------------------
class TestPutBundle:
def test_put_sends_exact_body_and_content_type(self):
from hermes_cli.diagnostics_upload import put_bundle
data = b"\x1f\x8b\x08gzipped-bytes"
resp = _resp(status=200, body=b"")
with patch(
"hermes_cli.diagnostics_upload.urllib.request.urlopen",
return_value=resp,
) as urlopen:
put_bundle("https://bucket.s3.amazonaws.com/uploads/x.json.gz?sig", data)
req = urlopen.call_args[0][0]
assert req.method == "PUT"
# PUT body must be the bundle bytes, unchanged.
assert req.data == data
assert req.headers["Content-type"] == "application/gzip"
def test_custom_content_type(self):
from hermes_cli.diagnostics_upload import put_bundle
resp = _resp(status=204, body=b"")
with patch(
"hermes_cli.diagnostics_upload.urllib.request.urlopen",
return_value=resp,
) as urlopen:
put_bundle("https://u", b"data", content_type="application/json")
req = urlopen.call_args[0][0]
assert req.headers["Content-type"] == "application/json"
def test_non_2xx_raises(self):
from hermes_cli.diagnostics_upload import put_bundle
resp = _resp(status=403, body=b"AccessDenied")
with patch(
"hermes_cli.diagnostics_upload.urllib.request.urlopen",
return_value=resp,
):
with pytest.raises(RuntimeError):
put_bundle("https://u", b"data")
def test_http_error_propagates(self):
from hermes_cli.diagnostics_upload import put_bundle
err = urllib.error.HTTPError("https://u", 500, "err", {}, io.BytesIO(b""))
with patch(
"hermes_cli.diagnostics_upload.urllib.request.urlopen",
side_effect=err,
):
with pytest.raises(urllib.error.HTTPError):
put_bundle("https://u", b"data")
# ---------------------------------------------------------------------------
# share_to_nous (orchestration)
# ---------------------------------------------------------------------------
class TestShareToNous:
def test_orchestrates_request_then_put(self):
from hermes_cli import diagnostics_upload as mod
info = {
"id": "id-9",
"uploadUrl": "https://bucket/uploads/id-9.json.gz?sig",
"viewUrl": "https://support/diagnostics/id-9",
"expiresAt": "2026-06-20T00:00:00Z",
}
blob = b"gzipped-bundle"
with patch.object(mod, "request_upload_url", return_value=info) as req, \
patch.object(mod, "put_bundle") as put:
result = mod.share_to_nous(blob)
assert result == info
req.assert_called_once()
# request was told the real byte size (NAS signs it into ContentLength)
assert req.call_args.kwargs["size_bytes"] == len(blob)
# PUT got the signed URL + the exact blob
put.assert_called_once_with(
info["uploadUrl"], blob, content_type="application/gzip"
)
def test_put_failure_propagates(self):
from hermes_cli import diagnostics_upload as mod
info = {"id": "id-9", "uploadUrl": "https://u", "viewUrl": "v"}
with patch.object(mod, "request_upload_url", return_value=info), \
patch.object(mod, "put_bundle", side_effect=RuntimeError("PUT failed")):
with pytest.raises(RuntimeError):
mod.share_to_nous(b"data")
def test_share_succeeds_without_id_in_response(self):
from hermes_cli import diagnostics_upload as mod
# NAS is stateless and there is no confirm step, so the share must
# succeed regardless of whether the response carries an ``id``.
info = {"uploadUrl": "https://u", "viewUrl": "v"} # no id
with patch.object(mod, "request_upload_url", return_value=info), \
patch.object(mod, "put_bundle") as put:
result = mod.share_to_nous(b"data")
assert result == info
put.assert_called_once()

View File

@@ -540,54 +540,6 @@ def test_run_doctor_accepts_hermes_provider_ids_that_catalog_aliases(
)
def test_run_doctor_accepts_vendor_slugs_for_named_custom_provider(monkeypatch, tmp_path):
home = tmp_path / ".hermes"
home.mkdir(parents=True, exist_ok=True)
(home / "config.yaml").write_text(
"model:\n"
" provider: custom:hpc-ai\n"
" default: deepseek/deepseek-v4-flash\n"
"custom_providers:\n"
" - name: hpc-ai\n"
" base_url: https://hpc-ai.example/v1\n"
" api_key: test-key\n",
encoding="utf-8",
)
monkeypatch.setattr(doctor_mod, "HERMES_HOME", home)
monkeypatch.setattr(doctor_mod, "PROJECT_ROOT", tmp_path / "project")
monkeypatch.setattr(doctor_mod, "_DHH", str(home))
(tmp_path / "project").mkdir(exist_ok=True)
fake_model_tools = types.SimpleNamespace(
check_tool_availability=lambda *a, **kw: ([], []),
TOOLSET_REQUIREMENTS={},
)
monkeypatch.setitem(sys.modules, "model_tools", fake_model_tools)
try:
from hermes_cli import auth as _auth_mod
monkeypatch.setattr(_auth_mod, "get_nous_auth_status", lambda: {})
monkeypatch.setattr(_auth_mod, "get_codex_auth_status", lambda: {})
monkeypatch.setattr(_auth_mod, "get_xai_oauth_auth_status", lambda: {})
except Exception:
pass
buf = io.StringIO()
with contextlib.redirect_stdout(buf):
doctor_mod.run_doctor(Namespace(fix=False))
out = buf.getvalue()
assert "model.provider 'custom:hpc-ai' is not a recognised provider" not in out
assert "model.provider 'custom:hpc-ai' is unknown" not in out
assert (
"model.default 'deepseek/deepseek-v4-flash' uses a vendor/model slug but provider is "
"'custom:hpc-ai'"
not in out
)
assert "Either set model.provider to 'openrouter', or drop the vendor prefix." not in out
def test_run_doctor_accepts_kimi_coding_cn_provider(monkeypatch, tmp_path):

View File

@@ -65,15 +65,6 @@ def test_resolve_provider_full_finds_named_custom_provider():
assert resolved.source == "user-config"
def test_is_aggregator_recognizes_named_custom_provider():
assert providers_mod.is_aggregator("custom:hpc-ai") is True
assert providers_mod.is_aggregator("custom:litellm") is True
def test_is_aggregator_leaves_unknown_provider_non_aggregator():
assert providers_mod.is_aggregator("not-a-provider") is False
def test_switch_model_accepts_explicit_named_custom_provider(monkeypatch):
"""Shared /model switch pipeline should accept --provider for custom_providers."""
monkeypatch.setattr(

View File

@@ -3,6 +3,8 @@
from __future__ import annotations
import logging
import os
import shutil
from pathlib import Path
from unittest.mock import MagicMock, patch
@@ -16,6 +18,7 @@ from hermes_cli.plugins_cmd import (
_repo_name_from_url,
_resolve_git_executable,
_resolve_git_url,
_resolve_subdir_within,
_sanitize_plugin_name,
)
@@ -97,35 +100,127 @@ class TestSanitizePluginName:
class TestResolveGitUrl:
"""Shorthand and full-URL resolution."""
"""Shorthand and full-URL resolution, with optional subdirectory."""
def test_owner_repo_shorthand(self):
url = _resolve_git_url("owner/repo")
url, subdir = _resolve_git_url("owner/repo")
assert url == "https://github.com/owner/repo.git"
assert subdir is None
def test_https_url_passthrough(self):
url = _resolve_git_url("https://github.com/x/y.git")
url, subdir = _resolve_git_url("https://github.com/x/y.git")
assert url == "https://github.com/x/y.git"
assert subdir is None
def test_ssh_url_passthrough(self):
url = _resolve_git_url("git@github.com:x/y.git")
url, subdir = _resolve_git_url("git@github.com:x/y.git")
assert url == "git@github.com:x/y.git"
assert subdir is None
def test_http_url_passthrough(self):
url = _resolve_git_url("http://example.com/repo.git")
url, subdir = _resolve_git_url("http://example.com/repo.git")
assert url == "http://example.com/repo.git"
assert subdir is None
def test_file_url_passthrough(self):
url = _resolve_git_url("file:///tmp/repo")
url, subdir = _resolve_git_url("file:///tmp/repo")
assert url == "file:///tmp/repo"
assert subdir is None
def test_invalid_single_word_raises(self):
with pytest.raises(ValueError, match="Invalid plugin identifier"):
_resolve_git_url("justoneword")
def test_invalid_three_parts_raises(self):
with pytest.raises(ValueError, match="Invalid plugin identifier"):
_resolve_git_url("a/b/c")
def test_shorthand_with_subdir(self):
url, subdir = _resolve_git_url("owner/repo/my-plugin")
assert url == "https://github.com/owner/repo.git"
assert subdir == "my-plugin"
def test_shorthand_with_nested_subdir(self):
url, subdir = _resolve_git_url("owner/repo/path/to/plugin")
assert url == "https://github.com/owner/repo.git"
assert subdir == "path/to/plugin"
def test_shorthand_with_subdir_trailing_slash(self):
url, subdir = _resolve_git_url("owner/repo/my-plugin/")
assert url == "https://github.com/owner/repo.git"
assert subdir == "my-plugin"
def test_https_url_with_subdir(self):
url, subdir = _resolve_git_url("https://github.com/owner/repo.git/my-plugin")
assert url == "https://github.com/owner/repo.git"
assert subdir == "my-plugin"
def test_https_url_with_nested_subdir(self):
url, subdir = _resolve_git_url(
"https://github.com/owner/repo.git/path/to/plugin"
)
assert url == "https://github.com/owner/repo.git"
assert subdir == "path/to/plugin"
def test_url_with_fragment_subdir(self):
url, subdir = _resolve_git_url("https://github.com/owner/repo.git#my-plugin")
assert url == "https://github.com/owner/repo.git"
assert subdir == "my-plugin"
def test_file_url_with_fragment_subdir(self):
url, subdir = _resolve_git_url("file:///tmp/repo#path/to/plugin")
assert url == "file:///tmp/repo"
assert subdir == "path/to/plugin"
def test_ssh_url_with_fragment_subdir(self):
url, subdir = _resolve_git_url("git@github.com:owner/repo.git#sub")
assert url == "git@github.com:owner/repo.git"
assert subdir == "sub"
# ── _resolve_subdir_within ──────────────────────────────────────────────────
class TestResolveSubdirWithin:
"""Subdirectory resolution stays within the clone and rejects traversal."""
def test_valid_subdir(self, tmp_path):
(tmp_path / "my-plugin").mkdir()
result = _resolve_subdir_within(tmp_path, "my-plugin")
assert result == (tmp_path / "my-plugin").resolve()
def test_valid_nested_subdir(self, tmp_path):
(tmp_path / "a" / "b" / "c").mkdir(parents=True)
result = _resolve_subdir_within(tmp_path, "a/b/c")
assert result == (tmp_path / "a" / "b" / "c").resolve()
def test_rejects_dot_dot_escape(self, tmp_path):
clone = tmp_path / "clone"
clone.mkdir()
(tmp_path / "secret").mkdir()
with pytest.raises(PluginOperationError, match="escapes the repository"):
_resolve_subdir_within(clone, "../secret")
def test_rejects_absolute_path_escape(self, tmp_path):
clone = tmp_path / "clone"
clone.mkdir()
# An absolute path resolves outside the clone root.
with pytest.raises(PluginOperationError, match="escapes the repository"):
_resolve_subdir_within(clone, "/etc")
def test_rejects_symlink_escape(self, tmp_path):
clone = tmp_path / "clone"
clone.mkdir()
outside = tmp_path / "outside"
outside.mkdir()
(clone / "link").symlink_to(outside)
with pytest.raises(PluginOperationError, match="escapes the repository"):
_resolve_subdir_within(clone, "link")
def test_rejects_missing_subdir(self, tmp_path):
with pytest.raises(PluginOperationError, match="does not exist"):
_resolve_subdir_within(tmp_path, "nope")
def test_rejects_file_not_dir(self, tmp_path):
(tmp_path / "afile").write_text("x")
with pytest.raises(PluginOperationError, match="not a directory"):
_resolve_subdir_within(tmp_path, "afile")
# ── _resolve_git_executable ─────────────────────────────────────────────────
@@ -698,3 +793,90 @@ class TestNoAutoActivation:
# The old code had: "Even with default config, check if a plugin registered one"
# The fix removes this. Verify it's gone.
assert "Even with default config, check if a plugin registered one" not in source
# ── End-to-end subdirectory install ──────────────────────────────────────────
class TestSubdirInstallE2E:
"""Install a plugin that lives in a subdirectory of a real local git repo."""
@staticmethod
def _make_repo_with_subdir_plugin(repo_root: Path) -> None:
"""Create a git repo where the plugin lives in ``./my-plugin/`` and the
repo root holds unrelated docs/tests."""
import subprocess as sp
repo_root.mkdir(parents=True, exist_ok=True)
# Root-level noise: docs + tests that should NOT be installed.
(repo_root / "README.md").write_text("# Monorepo docs\n")
(repo_root / "tests").mkdir()
(repo_root / "tests" / "test_x.py").write_text("def test_x():\n pass\n")
# The actual plugin in a subdirectory.
plugin_dir = repo_root / "my-plugin"
plugin_dir.mkdir()
(plugin_dir / "plugin.yaml").write_text(
"name: my-plugin\nmanifest_version: 1\ndescription: A subdir plugin\n"
)
(plugin_dir / "__init__.py").write_text("# plugin entry\n")
env = {
**os.environ,
"GIT_AUTHOR_NAME": "t",
"GIT_AUTHOR_EMAIL": "t@t",
"GIT_COMMITTER_NAME": "t",
"GIT_COMMITTER_EMAIL": "t@t",
}
sp.run(["git", "init", "-q"], cwd=repo_root, check=True, env=env)
sp.run(["git", "add", "-A"], cwd=repo_root, check=True, env=env)
sp.run(
["git", "commit", "-q", "-m", "init"],
cwd=repo_root,
check=True,
env=env,
)
def test_installs_only_the_subdir_plugin(self, tmp_path, monkeypatch):
if shutil.which("git") is None:
pytest.skip("git not available")
from hermes_cli import plugins_cmd as pc
repo_root = tmp_path / "monorepo"
self._make_repo_with_subdir_plugin(repo_root)
plugins_dir = tmp_path / "installed"
plugins_dir.mkdir()
monkeypatch.setattr(pc, "_plugins_dir", lambda: plugins_dir)
identifier = f"file://{repo_root}#my-plugin"
target, manifest, name = pc._install_plugin_core(identifier, force=False)
# Installed under the plugin's own name, not the repo name.
assert name == "my-plugin"
assert manifest.get("name") == "my-plugin"
assert target == (plugins_dir / "my-plugin").resolve()
# The plugin's files are present...
assert (target / "plugin.yaml").exists()
assert (target / "__init__.py").exists()
# ...and the repo-root noise is NOT.
assert not (target / "README.md").exists()
assert not (target / "tests").exists()
def test_missing_subdir_raises(self, tmp_path, monkeypatch):
if shutil.which("git") is None:
pytest.skip("git not available")
from hermes_cli import plugins_cmd as pc
repo_root = tmp_path / "monorepo"
self._make_repo_with_subdir_plugin(repo_root)
plugins_dir = tmp_path / "installed"
plugins_dir.mkdir()
monkeypatch.setattr(pc, "_plugins_dir", lambda: plugins_dir)
identifier = f"file://{repo_root}#does-not-exist"
with pytest.raises(PluginOperationError, match="does not exist"):
pc._install_plugin_core(identifier, force=False)

View File

@@ -97,11 +97,7 @@ def test_request_device_code(monkeypatch: pytest.MonkeyPatch) -> None:
assert code.user_code == "ABCD-1234"
assert code.expires_in == 600
assert "/api/auth/device/code" in captured["url"]
# Hosted Photon allowlists registered device clients — an unregistered
# client_id is rejected with 400 invalid_client. We use Photon's published
# CLI device client and send the standard scope.
assert captured["body"]["client_id"] == "photon-cli"
assert captured["body"]["scope"] == "openid profile email"
assert captured["body"]["client_id"] == "hermes-agent"
def test_poll_for_token_via_header(monkeypatch: pytest.MonkeyPatch) -> None:
@@ -285,108 +281,3 @@ def test_print_credential_summary_emits_only_display_strings(
assert "proj-uuid" in blob # project id is intentionally surfaced
# Header is always emitted
assert any("Photon iMessage status" in line for line in lines)
# ---------------------------------------------------------------------------
# Device-token candidate extraction + dashboard validation.
def test_device_response_candidates_covers_known_shapes() -> None:
candidates = photon_auth._device_response_token_candidates(
{
"access_token": "tok-snake",
"accessToken": "tok-camel",
"data": {"access_token": "tok-data"},
},
headers={"set-auth-token": "Bearer tok-header"},
)
by_source = {c.source: c.token for c in candidates}
assert by_source["access_token"] == "tok-snake"
assert by_source["accessToken"] == "tok-camel"
assert by_source["data.access_token"] == "tok-data"
# "Bearer " prefix is stripped from the header value.
assert by_source["set-auth-token"] == "tok-header"
def test_device_response_candidates_dedupes() -> None:
candidates = photon_auth._device_response_token_candidates(
{"access_token": "same", "accessToken": "same"},
)
assert [c.token for c in candidates] == ["same"]
def test_validate_photon_token_rejects_unrecognized_session(
monkeypatch: pytest.MonkeyPatch,
) -> None:
def fake_get(url: str, *, headers: Dict[str, str], timeout: float) -> _FakeResponse:
if url.endswith("/api/auth/get-session"):
return _FakeResponse(json_body={}) # no "user" key
return _FakeResponse(json_body=[])
monkeypatch.setattr(photon_auth.httpx, "get", fake_get)
with pytest.raises(photon_auth.PhotonDashboardAuthError):
photon_auth.validate_photon_token("some-token")
def test_validate_photon_token_rejects_project_api_denial(
monkeypatch: pytest.MonkeyPatch,
) -> None:
def fake_get(url: str, *, headers: Dict[str, str], timeout: float) -> _FakeResponse:
if url.endswith("/api/auth/get-session"):
return _FakeResponse(json_body={"user": {"id": "u1"}})
return _FakeResponse(status=403) # project API rejects
monkeypatch.setattr(photon_auth.httpx, "get", fake_get)
with pytest.raises(photon_auth.PhotonDashboardAuthError):
photon_auth.validate_photon_token("some-token")
def test_login_device_flow_validates_before_persisting(
tmp_hermes_home: Path, monkeypatch: pytest.MonkeyPatch,
) -> None:
def fake_post(url: str, *, json: Dict[str, Any], timeout: float) -> _FakeResponse:
if url.endswith("/api/auth/device/code"):
return _FakeResponse(json_body={
"device_code": "dev", "user_code": "AAAA",
"verification_uri": "https://app.photon.codes/device",
"verification_uri_complete": None,
"expires_in": 600, "interval": 0,
})
# device/token approval
return _FakeResponse(json_body={"access_token": "good-token"})
def fake_get(url: str, *, headers: Dict[str, str], timeout: float) -> _FakeResponse:
if url.endswith("/api/auth/get-session"):
return _FakeResponse(json_body={"user": {"id": "u1"}})
return _FakeResponse(json_body=[]) # projects OK
monkeypatch.setattr(photon_auth.httpx, "post", fake_post)
monkeypatch.setattr(photon_auth.httpx, "get", fake_get)
token = photon_auth.login_device_flow(open_browser=False)
assert token == "good-token"
assert photon_auth.load_photon_token() == "good-token"
def test_login_device_flow_raises_when_token_invalid(
tmp_hermes_home: Path, monkeypatch: pytest.MonkeyPatch,
) -> None:
def fake_post(url: str, *, json: Dict[str, Any], timeout: float) -> _FakeResponse:
if url.endswith("/api/auth/device/code"):
return _FakeResponse(json_body={
"device_code": "dev", "user_code": "AAAA",
"verification_uri": "https://app.photon.codes/device",
"verification_uri_complete": None,
"expires_in": 600, "interval": 0,
})
return _FakeResponse(json_body={"access_token": "bad-token"})
def fake_get(url: str, *, headers: Dict[str, str], timeout: float) -> _FakeResponse:
return _FakeResponse(status=401) # session lookup rejects
monkeypatch.setattr(photon_auth.httpx, "post", fake_post)
monkeypatch.setattr(photon_auth.httpx, "get", fake_get)
with pytest.raises(photon_auth.PhotonDashboardAuthError):
photon_auth.login_device_flow(open_browser=False)
# A token that failed validation must never be persisted.
assert photon_auth.load_photon_token() is None

View File

@@ -362,7 +362,7 @@ export const en: Translations = {
inactive: "inactive",
installBtn: "Install",
installHeading: "Install from GitHub / Git URL",
installHint: "Use owner/repo shorthand or a full https:// or git@ clone URL.",
installHint: "Use owner/repo shorthand or a full https:// or git@ clone URL. For a plugin in a subdirectory, append the path: owner/repo/path/to/plugin (or <url>#path/to/plugin).",
memoryProviderLabel: "Memory provider",
missingEnvWarn: "Set these in Keys before the plugin can run:",
noDashboardTab: "No dashboard tab",

View File

@@ -240,7 +240,7 @@ export default function PluginsPage() {
<Input
className="font-mono-ui lowercase"
id="install-url"
placeholder="owner/repo or https://..."
placeholder="owner/repo, owner/repo/subdir, or https://..."
spellCheck={false}
value={installId}
onChange={(e) => setInstallId(e.target.value)}