mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-14 14:19:29 +08:00
Compare commits
1 Commits
feat/debug
...
feat/plugi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cc60cbfeb5 |
70
cli.py
70
cli.py
@@ -6218,20 +6218,27 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
|
||||
choices visible and lets the normal Enter key binding submit the typed
|
||||
or highlighted choice.
|
||||
|
||||
**Platform note (Windows — issue #33961):**
|
||||
Earlier code bypassed the modal on ``sys.platform == "win32"`` and fell
|
||||
back to a raw ``input()`` prompt. When the confirm was triggered from the
|
||||
``process_loop`` daemon thread (the normal case) that ``input()`` ran off
|
||||
the main thread and deadlocked against prompt_toolkit's stdin ownership —
|
||||
the user saw a frozen cursor and Ctrl-C was swallowed (bare ``/reset``
|
||||
froze; ``/reset now`` worked only because it skips the prompt entirely).
|
||||
**Platform note (Windows dead-lock — issue #30768):**
|
||||
The queue-based modal relies on prompt_toolkit key bindings receiving
|
||||
keyboard events and calling ``_submit_slash_confirm_response``. On
|
||||
Windows (PowerShell / Windows Terminal) the prompt_toolkit input
|
||||
channel can become unresponsive when the modal is entered from the
|
||||
``process_loop`` daemon thread, causing a dead-lock: the user sees the
|
||||
confirmation panel but keystrokes never reach the key bindings and the
|
||||
``response_queue.get()`` blocks until the 120-second timeout expires.
|
||||
|
||||
Native Windows now uses the same path as Linux/macOS: the modal is set up
|
||||
on ``self._app.loop`` via ``call_soon_threadsafe`` and answered by the
|
||||
normal prompt_toolkit key bindings (the same input channel that already
|
||||
handles ordinary typing on Windows). The raw ``input()`` fallback is kept
|
||||
only for the genuinely safe cases: no running app (unit tests /
|
||||
non-interactive), no resolvable event loop, or a scheduling failure.
|
||||
To avoid this, we fall back to ``_prompt_text_input`` (a simple
|
||||
``input()``-based prompt) when any of these conditions hold:
|
||||
|
||||
* ``sys.platform == "win32"`` — native Windows console (ConPTY /
|
||||
win32_input) does not support the modal reliably.
|
||||
* ``self._app`` is not set — unit tests / non-interactive contexts.
|
||||
|
||||
On non-Windows platforms the modal itself is still safe from the
|
||||
``process_loop`` daemon thread as long as the main-thread event loop
|
||||
owns the prompt_toolkit buffer mutations. When we are off the main
|
||||
thread, schedule the modal snapshot / restore work on ``self._app.loop``
|
||||
via ``call_soon_threadsafe`` and keep the queue-based response path.
|
||||
"""
|
||||
import threading
|
||||
import time as _time
|
||||
@@ -6244,25 +6251,22 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
|
||||
if not getattr(self, "_app", None):
|
||||
return self._prompt_text_input("Choice [1/2/3]: ")
|
||||
|
||||
# On Windows the prompt_toolkit input channel can deadlock when the
|
||||
# modal is entered from the process_loop daemon thread — keystrokes
|
||||
# never reach the key bindings, so response_queue.get() blocks for
|
||||
# the full timeout (issue #30768). Fall back to the simpler
|
||||
# stdin-based prompt which works reliably on Windows.
|
||||
if sys.platform == "win32":
|
||||
return self._prompt_text_input("Choice [1/2/3]: ")
|
||||
|
||||
try:
|
||||
app_loop = self._app.loop
|
||||
except Exception:
|
||||
app_loop = None
|
||||
|
||||
in_main_thread = threading.current_thread() is threading.main_thread()
|
||||
|
||||
def _stdin_fallback() -> str | None:
|
||||
# On native Windows a raw input() from a non-main thread deadlocks
|
||||
# against prompt_toolkit's stdin ownership (#33961). With an app
|
||||
# running we cannot safely prompt off the main thread, so cancel
|
||||
# cleanly (None) rather than hang the terminal.
|
||||
if sys.platform == "win32" and not in_main_thread:
|
||||
self._invalidate()
|
||||
return None
|
||||
return self._prompt_text_input("Choice [1/2/3]: ")
|
||||
|
||||
if not in_main_thread and app_loop is None:
|
||||
return _stdin_fallback()
|
||||
return self._prompt_text_input("Choice [1/2/3]: ")
|
||||
|
||||
response_queue = queue.Queue()
|
||||
|
||||
@@ -6303,7 +6307,7 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
|
||||
return ready.wait(timeout=5)
|
||||
|
||||
if not _run_on_app_loop(_setup_modal):
|
||||
return _stdin_fallback()
|
||||
return self._prompt_text_input("Choice [1/2/3]: ")
|
||||
|
||||
_last_countdown_refresh = _time.monotonic()
|
||||
try:
|
||||
@@ -7272,7 +7276,7 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
|
||||
elif canonical == "copy":
|
||||
self._handle_copy_command(cmd_original)
|
||||
elif canonical == "debug":
|
||||
self._handle_debug_command(cmd_original)
|
||||
self._handle_debug_command()
|
||||
elif canonical == "update":
|
||||
if self._handle_update_command():
|
||||
return False
|
||||
@@ -8219,10 +8223,9 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
|
||||
print(" ⚠️ MCP reload timed out (30s). Some servers may not have reconnected.")
|
||||
|
||||
# Inline-skip tokens that bypass the destructive-slash confirmation modal.
|
||||
# A general escape hatch for non-interactive use (scripting/automation) and
|
||||
# for the degraded path where the modal can't be marshaled onto the app loop
|
||||
# — lets users self-serve without flipping approvals.destructive_slash_confirm
|
||||
# in config. (Native Windows now drives the modal normally — see #33961.)
|
||||
# Matches the escape-hatch pattern users on broken modal platforms
|
||||
# (currently native Windows PowerShell — issue #30768) need to self-serve
|
||||
# without having to flip approvals.destructive_slash_confirm in config.
|
||||
_DESTRUCTIVE_SKIP_TOKENS = frozenset({"now", "--yes", "-y"})
|
||||
|
||||
@classmethod
|
||||
@@ -8280,9 +8283,8 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
|
||||
Inline-skip: if ``cmd_original`` contains ``now``, ``--yes``, or
|
||||
``-y`` as an argument (e.g. ``/reset now``, ``/new --yes My title``),
|
||||
the modal is bypassed and ``"once"`` is returned immediately. This is
|
||||
an escape hatch for non-interactive use and for the degraded path where
|
||||
the modal can't be marshaled onto the app loop (native Windows itself now
|
||||
drives the modal normally — see #33961). Callers are responsible
|
||||
an escape hatch for platforms where the prompt_toolkit modal hangs
|
||||
(issue #30768 — native Windows PowerShell). Callers are responsible
|
||||
for stripping the skip tokens from any remaining argument parsing
|
||||
(see :meth:`_split_destructive_skip`).
|
||||
|
||||
|
||||
@@ -1795,11 +1795,9 @@ class BasePlatformAdapter(ABC):
|
||||
|
||||
# Whether this platform renders triple-backtick fenced code blocks (i.e.
|
||||
# ``format_message`` translates/preserves markdown fences into a real code
|
||||
# block). Capability flag for markdown-aware presentation choices.
|
||||
# block). Drives presentation choices like rendering a ``terminal`` tool
|
||||
# call's command as a ```bash block instead of a flat preview line.
|
||||
# Default False (plain-text platforms); markdown-rendering adapters set True.
|
||||
# Note: tool-progress deliberately does NOT use this to render a terminal
|
||||
# command as a ```bash block — that exposed full commands in chat. Progress
|
||||
# shows a short truncated preview only (see gateway/run.py progress_callback).
|
||||
supports_code_blocks: bool = False
|
||||
|
||||
def __init__(self, config: PlatformConfig, platform: Platform):
|
||||
|
||||
@@ -181,8 +181,6 @@ def _strip_mdv2(text: str) -> str:
|
||||
"""
|
||||
# Remove escape backslashes before special characters
|
||||
cleaned = re.sub(r'\\([_*\[\]()~`>#\+\-=|{}.!\\])', r'\1', text)
|
||||
# Remove standard markdown bold (**text** → text) BEFORE MarkdownV2 bold
|
||||
cleaned = re.sub(r'\*\*([^*]+)\*\*', r'\1', cleaned)
|
||||
# Remove MarkdownV2 bold markers that format_message converted from **bold**
|
||||
cleaned = re.sub(r'\*([^*]+)\*', r'\1', cleaned)
|
||||
# Remove MarkdownV2 italic markers that format_message converted from *italic*
|
||||
@@ -2210,17 +2208,11 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
# "Message is not modified" is a no-op, not an error
|
||||
if "not modified" in str(fmt_err).lower():
|
||||
return SendResult(success=True, message_id=message_id)
|
||||
# Fallback: strip MarkdownV2 escapes and retry as clean plain text
|
||||
logger.warning(
|
||||
"[%s] MarkdownV2 edit failed, falling back to plain text: %s",
|
||||
self.name,
|
||||
fmt_err,
|
||||
)
|
||||
_plain = _strip_mdv2(content) if content else content
|
||||
# Fallback: retry without markdown formatting
|
||||
await self._bot.edit_message_text(
|
||||
chat_id=int(chat_id),
|
||||
message_id=int(message_id),
|
||||
text=_plain,
|
||||
text=content,
|
||||
)
|
||||
return SendResult(success=True, message_id=message_id)
|
||||
except Exception as e:
|
||||
|
||||
@@ -12971,10 +12971,32 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
||||
# Build progress message with primary argument preview
|
||||
from agent.display import get_tool_emoji
|
||||
emoji = get_tool_emoji(tool_name, default="⚙️")
|
||||
|
||||
# Markdown-capable platforms render a terminal command as a native
|
||||
# ```bash fenced block (full command, no quotes, no label, no
|
||||
# truncation) instead of the noisy `terminal: "cmd…"` line. Gated
|
||||
# on the adapter's ``supports_code_blocks`` capability so every
|
||||
# markdown-rendering platform (and plugin adapters that opt in) gets
|
||||
# it, while plain-text platforms keep the compact line.
|
||||
_bash_block = None
|
||||
try:
|
||||
_progress_adapter = self.adapters.get(source.platform)
|
||||
except Exception:
|
||||
_progress_adapter = None
|
||||
if (
|
||||
getattr(_progress_adapter, "supports_code_blocks", False)
|
||||
and tool_name == "terminal"
|
||||
and isinstance(args, dict)
|
||||
and isinstance(args.get("command"), str)
|
||||
and args["command"].strip()
|
||||
):
|
||||
_bash_block = f"```bash\n{args['command'].rstrip()}\n```"
|
||||
|
||||
# Verbose mode: show detailed arguments, respects tool_preview_length
|
||||
if progress_mode == "verbose":
|
||||
if args:
|
||||
if _bash_block is not None:
|
||||
msg = _bash_block
|
||||
elif args:
|
||||
from agent.display import get_tool_preview_max_len
|
||||
_pl = get_tool_preview_max_len()
|
||||
args_str = json.dumps(args, ensure_ascii=False, default=str)
|
||||
@@ -12994,7 +13016,9 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
||||
# "all" / "new" modes: short preview, respects tool_preview_length
|
||||
# config (defaults to 40 chars when unset to keep gateway messages
|
||||
# compact — unlike CLI spinners, these persist as permanent messages).
|
||||
if preview:
|
||||
if _bash_block is not None:
|
||||
msg = _bash_block
|
||||
elif preview:
|
||||
from agent.display import get_tool_preview_max_len
|
||||
_pl = get_tool_preview_max_len()
|
||||
_cap = _pl if _pl > 0 else 40
|
||||
@@ -13106,8 +13130,6 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
||||
"message_id": message_id,
|
||||
"content": content,
|
||||
}
|
||||
if getattr(adapter, "REQUIRES_EDIT_FINALIZE", False):
|
||||
kwargs["finalize"] = True
|
||||
if _edit_accepts_metadata:
|
||||
kwargs["metadata"] = _progress_metadata
|
||||
return await adapter.edit_message(**kwargs)
|
||||
|
||||
@@ -2090,25 +2090,12 @@ class CLICommandsMixin:
|
||||
else:
|
||||
_cprint(f" {_ACCENT}✓ {feature_name} set to {label} (session only){_RST}")
|
||||
|
||||
def _handle_debug_command(self, cmd_original: str = ""):
|
||||
"""Handle /debug — upload debug report + logs and print share URLs.
|
||||
|
||||
Accepts optional destination words after the command:
|
||||
|
||||
- ``/debug`` → upload to the public paste service (default)
|
||||
- ``/debug nous`` → upload to Nous-internal storage (private, staff-only)
|
||||
- ``/debug local`` → render the report to stdout, no upload
|
||||
|
||||
``nous`` and ``local`` are mutually exclusive; if both are given,
|
||||
``local`` wins (it never touches the network).
|
||||
"""
|
||||
def _handle_debug_command(self):
|
||||
"""Handle /debug — upload debug report + logs and print paste URLs."""
|
||||
from hermes_cli.debug import run_debug_share
|
||||
from types import SimpleNamespace
|
||||
|
||||
words = {w.lower() for w in cmd_original.split()[1:]}
|
||||
local = "local" in words
|
||||
nous = "nous" in words and not local
|
||||
args = SimpleNamespace(lines=200, expire=7, local=local, nous=nous)
|
||||
args = SimpleNamespace(lines=200, expire=7, local=False)
|
||||
run_debug_share(args)
|
||||
|
||||
def _handle_update_command(self) -> bool:
|
||||
|
||||
@@ -217,8 +217,7 @@ COMMAND_REGISTRY: list[CommandDef] = [
|
||||
cli_only=True, args_hint="<path>"),
|
||||
CommandDef("update", "Update Hermes Agent to the latest version", "Info"),
|
||||
CommandDef("version", "Show Hermes Agent version", "Info", aliases=("v",)),
|
||||
CommandDef("debug", "Upload debug report (system info + logs) and get shareable links", "Info",
|
||||
args_hint="[nous|local]"),
|
||||
CommandDef("debug", "Upload debug report (system info + logs) and get shareable links", "Info"),
|
||||
|
||||
# Exit
|
||||
CommandDef("quit", "Exit the CLI (use --delete to also remove session history)", "Exit",
|
||||
|
||||
@@ -9,16 +9,8 @@ Currently supports:
|
||||
``~/.hermes/logs/*.log`` are not leaked into
|
||||
the public paste service. Pass ``--no-redact``
|
||||
to disable.
|
||||
Pass ``--nous`` to upload instead to Nous-internal
|
||||
storage (AWS S3) via a signed URL minted by the
|
||||
Nous account service: the bundle is private
|
||||
(viewable only by Nous staff / allowlisted mods via
|
||||
a Google-login-gated viewer) and auto-deletes after
|
||||
14 days, rather than going to a public paste.
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import gzip
|
||||
import io
|
||||
import json
|
||||
import logging
|
||||
@@ -589,97 +581,6 @@ def collect_debug_report(
|
||||
return buf.getvalue()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Shared bundle collection (used by both the paste.rs and Nous-S3 paths)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Bundle format identifier embedded in the Nous-S3 JSON envelope. The
|
||||
# discord-support viewer keys off this string to parse the bundle.
|
||||
_NOUS_BUNDLE_FORMAT = "hermes-debug-share/1"
|
||||
|
||||
|
||||
def collect_share_bundle(
|
||||
log_lines: int = 200,
|
||||
redact: bool = True,
|
||||
) -> dict[str, str]:
|
||||
"""Collect the debug report + full logs as a label→text mapping.
|
||||
|
||||
Returns ``{"report": ..., "agent.log": ..., "gateway.log": ...,
|
||||
"desktop.log": ...}`` where each value is the already-redacted (when
|
||||
``redact`` is True) text that would be uploaded. Keys for logs that are
|
||||
absent/empty are simply omitted.
|
||||
|
||||
This is the single source of collection + redaction shared by both
|
||||
destinations: the paste.rs path (:func:`build_debug_share`) and the
|
||||
Nous-S3 path (``--nous``). Centralising it guarantees the Nous bundle is
|
||||
built from the *same* force-redacted snapshots as the public paste path —
|
||||
redaction is the safety boundary, so the Nous path must never see raw
|
||||
logs.
|
||||
|
||||
The dump header is prepended to each full log (mirroring the historical
|
||||
paste behaviour) so every file is self-contained, and the redaction
|
||||
banner is prepended when ``redact`` is True.
|
||||
"""
|
||||
dump_text = _capture_dump()
|
||||
log_snapshots = _capture_default_log_snapshots(log_lines, redact=redact)
|
||||
|
||||
report = collect_debug_report(
|
||||
log_lines=log_lines,
|
||||
dump_text=dump_text,
|
||||
log_snapshots=log_snapshots,
|
||||
)
|
||||
agent_log = log_snapshots["agent"].full_text
|
||||
gateway_log = log_snapshots["gateway"].full_text
|
||||
desktop_log = log_snapshots["desktop"].full_text
|
||||
|
||||
# Prepend dump header to each full log so every file is self-contained.
|
||||
if agent_log:
|
||||
agent_log = dump_text + "\n\n--- full agent.log ---\n" + agent_log
|
||||
if gateway_log:
|
||||
gateway_log = dump_text + "\n\n--- full gateway.log ---\n" + gateway_log
|
||||
if desktop_log:
|
||||
desktop_log = dump_text + "\n\n--- full desktop.log ---\n" + desktop_log
|
||||
|
||||
# Visible banner so reviewers know redaction was applied at upload time.
|
||||
if redact:
|
||||
report = _REDACTION_BANNER + report
|
||||
if agent_log:
|
||||
agent_log = _REDACTION_BANNER + agent_log
|
||||
if gateway_log:
|
||||
gateway_log = _REDACTION_BANNER + gateway_log
|
||||
if desktop_log:
|
||||
desktop_log = _REDACTION_BANNER + desktop_log
|
||||
|
||||
bundle: dict[str, str] = {"report": report}
|
||||
if agent_log:
|
||||
bundle["agent.log"] = agent_log
|
||||
if gateway_log:
|
||||
bundle["gateway.log"] = gateway_log
|
||||
if desktop_log:
|
||||
bundle["desktop.log"] = desktop_log
|
||||
return bundle
|
||||
|
||||
|
||||
def build_nous_bundle(bundle: dict[str, str], redact: bool = True) -> bytes:
|
||||
"""Gzip-compress a :func:`collect_share_bundle` mapping into the Nous envelope.
|
||||
|
||||
The JSON shape is what the discord-support viewer (Repo 3) parses::
|
||||
|
||||
{"format": "hermes-debug-share/1",
|
||||
"redacted": <bool>,
|
||||
"created": <iso8601>,
|
||||
"files": {"report": ..., "agent.log": ..., ...}}
|
||||
"""
|
||||
created = datetime.datetime.now(datetime.timezone.utc).isoformat()
|
||||
envelope = {
|
||||
"format": _NOUS_BUNDLE_FORMAT,
|
||||
"redacted": bool(redact),
|
||||
"created": created,
|
||||
"files": bundle,
|
||||
}
|
||||
return gzip.compress(json.dumps(envelope).encode("utf-8"))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI entry points
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -719,18 +620,45 @@ def build_debug_share(
|
||||
"""
|
||||
_best_effort_sweep_expired_pastes()
|
||||
|
||||
# Collect the report + full logs (force-redacted when redact=True) via the
|
||||
# shared collector so the paste.rs and Nous-S3 paths build identical,
|
||||
# identically-redacted bundles. The dump header + redaction banner are
|
||||
# applied inside collect_share_bundle.
|
||||
bundle = collect_share_bundle(log_lines=log_lines, redact=redact)
|
||||
# Capture dump once — prepended to every paste for context.
|
||||
# The dump is already redacted at extract time via dump.py:_redact;
|
||||
# log_snapshots are redacted by _capture_default_log_snapshots when
|
||||
# redact=True so credentials never reach the public paste service.
|
||||
dump_text = _capture_dump()
|
||||
log_snapshots = _capture_default_log_snapshots(log_lines, redact=redact)
|
||||
|
||||
if redact:
|
||||
logger.info(
|
||||
"hermes debug share: applied force-mode redaction to log snapshots before upload"
|
||||
)
|
||||
|
||||
report = bundle["report"]
|
||||
report = collect_debug_report(
|
||||
log_lines=log_lines,
|
||||
dump_text=dump_text,
|
||||
log_snapshots=log_snapshots,
|
||||
)
|
||||
agent_log = log_snapshots["agent"].full_text
|
||||
gateway_log = log_snapshots["gateway"].full_text
|
||||
desktop_log = log_snapshots["desktop"].full_text
|
||||
|
||||
# Prepend dump header to each full log so every paste is self-contained.
|
||||
if agent_log:
|
||||
agent_log = dump_text + "\n\n--- full agent.log ---\n" + agent_log
|
||||
if gateway_log:
|
||||
gateway_log = dump_text + "\n\n--- full gateway.log ---\n" + gateway_log
|
||||
if desktop_log:
|
||||
desktop_log = dump_text + "\n\n--- full desktop.log ---\n" + desktop_log
|
||||
|
||||
# Visible banner so reviewers reading the public paste know redaction
|
||||
# was applied at upload time. Banner is omitted under --no-redact.
|
||||
if redact:
|
||||
report = _REDACTION_BANNER + report
|
||||
if agent_log:
|
||||
agent_log = _REDACTION_BANNER + agent_log
|
||||
if gateway_log:
|
||||
gateway_log = _REDACTION_BANNER + gateway_log
|
||||
if desktop_log:
|
||||
desktop_log = _REDACTION_BANNER + desktop_log
|
||||
|
||||
urls: dict[str, str] = {}
|
||||
failures: list[str] = []
|
||||
@@ -739,8 +667,11 @@ def build_debug_share(
|
||||
urls["Report"] = upload_to_pastebin(report, expiry_days=expiry)
|
||||
|
||||
# 2-4. Full logs (optional — failures are collected, not raised)
|
||||
for label in ("agent.log", "gateway.log", "desktop.log"):
|
||||
content = bundle.get(label)
|
||||
for label, content in (
|
||||
("agent.log", agent_log),
|
||||
("gateway.log", gateway_log),
|
||||
("desktop.log", desktop_log),
|
||||
):
|
||||
if not content:
|
||||
continue
|
||||
try:
|
||||
@@ -765,23 +696,43 @@ def run_debug_share(args):
|
||||
log_lines = getattr(args, "lines", 200)
|
||||
expiry = getattr(args, "expire", 7)
|
||||
local_only = getattr(args, "local", False)
|
||||
nous = getattr(args, "nous", False)
|
||||
redact = not getattr(args, "no_redact", False)
|
||||
|
||||
if local_only:
|
||||
# Local-only path never uploads — render the report to stdout and bail
|
||||
# before any network I/O. Reuses the shared collector so the rendered
|
||||
# output matches exactly what would be uploaded.
|
||||
# before any network I/O. Mirrors the upload path's collection logic.
|
||||
_best_effort_sweep_expired_pastes()
|
||||
print("Collecting debug report...")
|
||||
bundle = collect_share_bundle(log_lines=log_lines, redact=redact)
|
||||
print(bundle["report"])
|
||||
for title, label in (
|
||||
("FULL agent.log", "agent.log"),
|
||||
("FULL gateway.log", "gateway.log"),
|
||||
("FULL desktop.log", "desktop.log"),
|
||||
dump_text = _capture_dump()
|
||||
log_snapshots = _capture_default_log_snapshots(log_lines, redact=redact)
|
||||
report = collect_debug_report(
|
||||
log_lines=log_lines,
|
||||
dump_text=dump_text,
|
||||
log_snapshots=log_snapshots,
|
||||
)
|
||||
agent_log = log_snapshots["agent"].full_text
|
||||
gateway_log = log_snapshots["gateway"].full_text
|
||||
desktop_log = log_snapshots["desktop"].full_text
|
||||
if agent_log:
|
||||
agent_log = dump_text + "\n\n--- full agent.log ---\n" + agent_log
|
||||
if gateway_log:
|
||||
gateway_log = dump_text + "\n\n--- full gateway.log ---\n" + gateway_log
|
||||
if desktop_log:
|
||||
desktop_log = dump_text + "\n\n--- full desktop.log ---\n" + desktop_log
|
||||
if redact:
|
||||
report = _REDACTION_BANNER + report
|
||||
if agent_log:
|
||||
agent_log = _REDACTION_BANNER + agent_log
|
||||
if gateway_log:
|
||||
gateway_log = _REDACTION_BANNER + gateway_log
|
||||
if desktop_log:
|
||||
desktop_log = _REDACTION_BANNER + desktop_log
|
||||
print(report)
|
||||
for title, body in (
|
||||
("FULL agent.log", agent_log),
|
||||
("FULL gateway.log", gateway_log),
|
||||
("FULL desktop.log", desktop_log),
|
||||
):
|
||||
body = bundle.get(label)
|
||||
if body:
|
||||
print(f"\n\n{'=' * 60}")
|
||||
print(title)
|
||||
@@ -789,10 +740,6 @@ def run_debug_share(args):
|
||||
print(body)
|
||||
return
|
||||
|
||||
if nous:
|
||||
_run_debug_share_nous(log_lines=log_lines, redact=redact)
|
||||
return
|
||||
|
||||
print(_PRIVACY_NOTICE)
|
||||
print("Collecting debug report...")
|
||||
print("Uploading...")
|
||||
@@ -826,80 +773,6 @@ def run_debug_share(args):
|
||||
print(f"\nShare these links with the Hermes team for support.")
|
||||
|
||||
|
||||
_NOUS_PRIVACY_NOTICE = """\
|
||||
⚠️ --nous: This uploads your debug bundle to Nous-INTERNAL storage (AWS S3),
|
||||
NOT a public paste service. The following is included:
|
||||
• System info (OS, Python/Hermes version, provider, which API keys are
|
||||
configured — NOT the actual keys)
|
||||
• Full agent.log, gateway.log, and desktop.log (up to 512 KB each — likely
|
||||
contains conversation content, tool outputs, and file paths)
|
||||
|
||||
• The bundle is viewable only by Nous staff (and allowlisted Discord mods)
|
||||
via a Google-login-gated viewer.
|
||||
• It is NOT a public paste — there is no public URL to the contents.
|
||||
• It auto-deletes after 14 days.
|
||||
"""
|
||||
|
||||
|
||||
def _run_debug_share_nous(*, log_lines: int, redact: bool) -> None:
|
||||
"""Handle ``hermes debug share --nous``: upload the bundle to Nous-S3.
|
||||
|
||||
Collects the same force-redacted bundle as the paste path, gzips it into
|
||||
the Nous envelope, requests a signed URL from NAS, uploads, and prints the
|
||||
private viewer link. On any failure falls back to a clear error that
|
||||
suggests ``--local``.
|
||||
"""
|
||||
from hermes_cli.diagnostics_upload import share_to_nous
|
||||
|
||||
print(_NOUS_PRIVACY_NOTICE)
|
||||
if not redact:
|
||||
print(
|
||||
"⚠️ --no-redact is set: secrets in your logs will NOT be redacted "
|
||||
"before upload.\n"
|
||||
)
|
||||
print("Collecting debug report...")
|
||||
_best_effort_sweep_expired_pastes()
|
||||
|
||||
bundle = collect_share_bundle(log_lines=log_lines, redact=redact)
|
||||
if redact:
|
||||
logger.info(
|
||||
"hermes debug share --nous: applied force-mode redaction before upload"
|
||||
)
|
||||
blob = build_nous_bundle(bundle, redact=redact)
|
||||
|
||||
print("Uploading to Nous diagnostics storage...")
|
||||
try:
|
||||
res = share_to_nous(blob)
|
||||
except Exception as exc:
|
||||
print(
|
||||
f"\nNous upload failed: {exc}\n"
|
||||
"\nThe Nous diagnostics service may be unavailable or not yet "
|
||||
"provisioned.\n"
|
||||
"Run `hermes debug share --local` to print the report instead, "
|
||||
"or `hermes debug share` to upload to a public paste service.\n",
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
view_url = res.get("viewUrl") or res.get("view_url")
|
||||
print("\nDebug bundle uploaded to Nous (private):")
|
||||
if view_url:
|
||||
print(f" View URL {view_url}")
|
||||
else:
|
||||
print(f" (no view URL returned; upload id: {res.get('id', '?')})")
|
||||
|
||||
expires_at = res.get("expiresAt") or res.get("expires_at")
|
||||
if expires_at:
|
||||
print(f"\n⏱ Auto-deletes at {expires_at} (14-day retention).")
|
||||
else:
|
||||
print("\n⏱ Auto-deletes after 14 days.")
|
||||
|
||||
print(
|
||||
"\nShare this private link with the Nous team — only Nous staff "
|
||||
"(via Google login) can open it."
|
||||
)
|
||||
|
||||
|
||||
def run_debug_delete(args):
|
||||
"""Delete one or more paste URLs uploaded by /debug."""
|
||||
urls = getattr(args, "urls", [])
|
||||
@@ -950,8 +823,6 @@ def run_debug(args):
|
||||
print(" --lines N Number of log lines to include (default: 200)")
|
||||
print(" --expire N Paste expiry in days (default: 7)")
|
||||
print(" --local Print report locally instead of uploading")
|
||||
print(" --nous Upload to Nous-internal storage (private, staff-only,")
|
||||
print(" auto-deletes in 14 days) instead of a public paste")
|
||||
print(" --no-redact Disable upload-time secret redaction (default: redact)")
|
||||
print()
|
||||
print("Options (delete):")
|
||||
|
||||
@@ -1,138 +0,0 @@
|
||||
"""Client for uploading ``hermes debug share`` bundles to Nous-internal S3.
|
||||
|
||||
This is the opt-in (``--nous``) destination for ``hermes debug share``.
|
||||
Unlike the public paste.rs path, bundles uploaded here go to a Nous-owned
|
||||
S3 bucket via a short-lived signed URL minted by the Nous account service
|
||||
(NAS). The bucket auto-expires objects after 14 days, and the contents are
|
||||
only viewable by Nous staff (and allowlisted Discord mods) through a
|
||||
Google-OAuth-gated viewer.
|
||||
|
||||
Flow:
|
||||
|
||||
1. POST {NAS_BASE}/api/diagnostics/upload-url → {uploadUrl, viewUrl, id, ...}
|
||||
(the request body carries ``sizeBytes``; NAS signs it into the presigned
|
||||
URL's ``ContentLength``, so the PUT must send exactly that many bytes)
|
||||
2. PUT <uploadUrl> (the gzipped bundle, Content-Type application/gzip)
|
||||
|
||||
NAS is stateless — the object's existence in S3 is the only state, so there is
|
||||
no confirm/callback step.
|
||||
|
||||
Uses stdlib ``urllib`` only, matching ``debug.py`` style — no third-party deps.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import urllib.request
|
||||
|
||||
# Base URL of the Nous account service that mints the signed upload URL.
|
||||
# Overridable via env so the feature can be pointed at staging / a local dev
|
||||
# NAS instance during testing.
|
||||
NAS_BASE = os.environ.get(
|
||||
"HERMES_DIAGNOSTICS_BASE_URL", "https://portal.nousresearch.com"
|
||||
)
|
||||
|
||||
# Network timeout for each request (seconds). The upload itself can be larger
|
||||
# (a gzipped log bundle), so the PUT gets a more generous window.
|
||||
_REQUEST_TIMEOUT = 30
|
||||
_UPLOAD_TIMEOUT = 120
|
||||
|
||||
_USER_AGENT = "hermes-agent/debug-share"
|
||||
|
||||
|
||||
def request_upload_url(
|
||||
content_type: str = "application/gzip",
|
||||
size_bytes: int | None = None,
|
||||
) -> dict:
|
||||
"""Ask NAS to mint a presigned PUT URL for a diagnostics bundle.
|
||||
|
||||
POSTs a small JSON body to ``{NAS_BASE}/api/diagnostics/upload-url`` and
|
||||
returns the parsed JSON response, expected to contain at least
|
||||
``uploadUrl``, ``viewUrl`` and ``id`` (plus optional ``expiresAt`` /
|
||||
``uploadExpiresInSeconds``).
|
||||
|
||||
Raises on non-2xx responses or unparseable JSON.
|
||||
"""
|
||||
payload: dict = {"contentType": content_type}
|
||||
if size_bytes is not None:
|
||||
payload["sizeBytes"] = int(size_bytes)
|
||||
|
||||
data = json.dumps(payload).encode("utf-8")
|
||||
req = urllib.request.Request(
|
||||
f"{NAS_BASE}/api/diagnostics/upload-url",
|
||||
data=data,
|
||||
method="POST",
|
||||
headers={
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json",
|
||||
"User-Agent": _USER_AGENT,
|
||||
},
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=_REQUEST_TIMEOUT) as resp:
|
||||
status = getattr(resp, "status", None)
|
||||
if status is None:
|
||||
status = resp.getcode()
|
||||
if not (200 <= status < 300):
|
||||
raise RuntimeError(
|
||||
f"diagnostics upload-url request failed: HTTP {status}"
|
||||
)
|
||||
body = resp.read().decode("utf-8")
|
||||
|
||||
try:
|
||||
result = json.loads(body)
|
||||
except (ValueError, json.JSONDecodeError) as exc:
|
||||
raise RuntimeError(
|
||||
f"diagnostics upload-url returned non-JSON response: {body[:200]}"
|
||||
) from exc
|
||||
|
||||
if not isinstance(result, dict) or not result.get("uploadUrl"):
|
||||
raise RuntimeError(
|
||||
"diagnostics upload-url response missing 'uploadUrl': "
|
||||
f"{body[:200]}"
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
def put_bundle(
|
||||
upload_url: str,
|
||||
data: bytes,
|
||||
content_type: str = "application/gzip",
|
||||
) -> None:
|
||||
"""PUT the gzipped *data* bundle to a presigned *upload_url*.
|
||||
|
||||
Sets the ``Content-Type`` header (must match what NAS pinned when signing
|
||||
the URL, otherwise S3 rejects the signature). Raises on non-2xx.
|
||||
"""
|
||||
req = urllib.request.Request(
|
||||
upload_url,
|
||||
data=data,
|
||||
method="PUT",
|
||||
headers={
|
||||
"Content-Type": content_type,
|
||||
"User-Agent": _USER_AGENT,
|
||||
},
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=_UPLOAD_TIMEOUT) as resp:
|
||||
status = getattr(resp, "status", None)
|
||||
if status is None:
|
||||
status = resp.getcode()
|
||||
if not (200 <= status < 300):
|
||||
raise RuntimeError(f"diagnostics bundle PUT failed: HTTP {status}")
|
||||
|
||||
|
||||
def share_to_nous(report_bundle: bytes) -> dict:
|
||||
"""Orchestrate the full Nous-S3 upload of a gzipped *report_bundle*.
|
||||
|
||||
Two steps: mint a presigned PUT URL (sending the exact ``sizeBytes`` NAS
|
||||
signs into the URL's ``ContentLength``), then PUT the bundle. NAS is
|
||||
stateless — the object's existence in S3 is the only state, so there is no
|
||||
confirm/callback step. Returns the dict from :func:`request_upload_url`
|
||||
(which carries ``viewUrl`` / ``id`` / expiry metadata) so the caller can
|
||||
print the viewer link. Raises on any failure of either step.
|
||||
"""
|
||||
size_bytes = len(report_bundle)
|
||||
info = request_upload_url(
|
||||
content_type="application/gzip", size_bytes=size_bytes
|
||||
)
|
||||
put_bundle(info["uploadUrl"], report_bundle, content_type="application/gzip")
|
||||
|
||||
return info
|
||||
@@ -738,14 +738,11 @@ def run_doctor(args):
|
||||
issues,
|
||||
)
|
||||
|
||||
# Warn if model is set to a provider-prefixed name on a provider that doesn't use them.
|
||||
# Vendor/model slugs are valid on aggregator-style providers and on any custom
|
||||
# provider — bare "custom" or a named "custom:<name>" that fronts an OpenAI-compatible
|
||||
# aggregator (e.g. custom:hpc-ai serving deepseek/deepseek-v4-flash) requires the prefix.
|
||||
# Warn if model is set to a provider-prefixed name on a provider that doesn't use them
|
||||
provider_for_policy = runtime_provider or catalog_provider
|
||||
provider_policy_id = str(provider_for_policy or "").strip().lower()
|
||||
providers_accepting_vendor_slugs = {
|
||||
"openrouter",
|
||||
"custom",
|
||||
"auto",
|
||||
"kilocode",
|
||||
"opencode-zen",
|
||||
@@ -753,16 +750,11 @@ def run_doctor(args):
|
||||
"lmstudio",
|
||||
"nous",
|
||||
}
|
||||
provider_accepts_vendor_slug = (
|
||||
provider_policy_id in providers_accepting_vendor_slugs
|
||||
or provider_policy_id == "custom"
|
||||
or provider_policy_id.startswith("custom:")
|
||||
)
|
||||
if (
|
||||
default_model
|
||||
and "/" in default_model
|
||||
and provider_policy_id
|
||||
and not provider_accepts_vendor_slug
|
||||
and provider_for_policy
|
||||
and provider_for_policy not in providers_accepting_vendor_slugs
|
||||
):
|
||||
check_warn(
|
||||
f"model.default '{default_model}' uses a vendor/model slug but provider is '{provider_raw}'",
|
||||
|
||||
@@ -135,34 +135,89 @@ def _sanitize_plugin_name(
|
||||
return target
|
||||
|
||||
|
||||
def _resolve_git_url(identifier: str) -> str:
|
||||
"""Turn an identifier into a cloneable Git URL.
|
||||
def _resolve_git_url(identifier: str) -> tuple[str, Optional[str]]:
|
||||
"""Turn an identifier into a cloneable Git URL and optional subdirectory.
|
||||
|
||||
Returns ``(git_url, subdir)`` where ``subdir`` is the path within the
|
||||
cloned repository that contains the plugin (``None`` when the plugin lives
|
||||
at the repo root).
|
||||
|
||||
Accepted formats:
|
||||
- Full URL: https://github.com/owner/repo.git
|
||||
- Full URL: git@github.com:owner/repo.git
|
||||
- Full URL: ssh://git@github.com/owner/repo.git
|
||||
- Shorthand: owner/repo → https://github.com/owner/repo.git
|
||||
- Shorthand w/ subdir: owner/repo/path/to/plugin
|
||||
→ (https://github.com/owner/repo.git, "path/to/plugin")
|
||||
- Full URL w/ subdir (``.git`` boundary):
|
||||
https://github.com/owner/repo.git/path/to/plugin
|
||||
→ (https://github.com/owner/repo.git, "path/to/plugin")
|
||||
- Any URL w/ explicit subdir fragment (works for every scheme, incl.
|
||||
``file://`` and ssh): <url>#path/to/plugin
|
||||
→ (<url>, "path/to/plugin")
|
||||
|
||||
NOTE: ``http://`` and ``file://`` schemes are accepted but will trigger a
|
||||
security warning at install time.
|
||||
"""
|
||||
# Already a URL
|
||||
# Already a URL.
|
||||
if identifier.startswith(("https://", "http://", "git@", "ssh://", "file://")):
|
||||
return identifier
|
||||
# Explicit ``#subdir`` fragment — unambiguous for any scheme.
|
||||
if "#" in identifier:
|
||||
git_url, _, frag = identifier.partition("#")
|
||||
return git_url, (frag.strip("/") or None)
|
||||
# Natural ``.git/`` boundary (GitHub-style URLs).
|
||||
marker = ".git/"
|
||||
idx = identifier.find(marker)
|
||||
if idx != -1:
|
||||
git_url = identifier[: idx + len(".git")]
|
||||
subdir = identifier[idx + len(marker) :].strip("/")
|
||||
return git_url, (subdir or None)
|
||||
return identifier, None
|
||||
|
||||
# owner/repo shorthand
|
||||
parts = identifier.strip("/").split("/")
|
||||
if len(parts) == 2:
|
||||
owner, repo = parts
|
||||
return f"https://github.com/{owner}/{repo}.git"
|
||||
# owner/repo[/subdir...] shorthand
|
||||
parts = [p for p in identifier.strip("/").split("/") if p]
|
||||
if len(parts) >= 2:
|
||||
owner, repo = parts[0], parts[1]
|
||||
subdir = "/".join(parts[2:]).strip("/")
|
||||
git_url = f"https://github.com/{owner}/{repo}.git"
|
||||
return git_url, (subdir or None)
|
||||
|
||||
raise ValueError(
|
||||
f"Invalid plugin identifier: '{identifier}'. "
|
||||
"Use a Git URL or owner/repo shorthand."
|
||||
"Use a Git URL or 'owner/repo' shorthand (optionally with a subdirectory: "
|
||||
"'owner/repo/path/to/plugin')."
|
||||
)
|
||||
|
||||
|
||||
def _resolve_subdir_within(clone_root: Path, subdir: str) -> Path:
|
||||
"""Resolve ``subdir`` inside ``clone_root``, rejecting path traversal.
|
||||
|
||||
Guards against ``..`` segments, absolute paths, and symlinks that would
|
||||
escape the cloned repository. Returns the resolved directory path.
|
||||
Raises ``PluginOperationError`` if the path escapes the clone, doesn't
|
||||
exist, or is not a directory.
|
||||
"""
|
||||
clone_root = clone_root.resolve()
|
||||
candidate = (clone_root / subdir).resolve()
|
||||
|
||||
# The resolved candidate must stay within the clone root.
|
||||
if candidate != clone_root and clone_root not in candidate.parents:
|
||||
raise PluginOperationError(
|
||||
f"Plugin subdirectory '{subdir}' escapes the repository.",
|
||||
)
|
||||
|
||||
if not candidate.exists():
|
||||
raise PluginOperationError(
|
||||
f"Plugin subdirectory '{subdir}' does not exist in the repository.",
|
||||
)
|
||||
if not candidate.is_dir():
|
||||
raise PluginOperationError(
|
||||
f"Plugin subdirectory '{subdir}' is not a directory.",
|
||||
)
|
||||
|
||||
return candidate
|
||||
|
||||
|
||||
def _repo_name_from_url(url: str) -> str:
|
||||
"""Extract the repo name from a Git URL for the plugin directory name."""
|
||||
# Strip trailing .git and slashes
|
||||
@@ -372,14 +427,14 @@ def _install_plugin_core(identifier: str, *, force: bool) -> tuple[Path, dict, s
|
||||
import tempfile
|
||||
|
||||
try:
|
||||
git_url = _resolve_git_url(identifier)
|
||||
git_url, subdir = _resolve_git_url(identifier)
|
||||
except ValueError as e:
|
||||
raise PluginOperationError(str(e)) from e
|
||||
|
||||
plugins_dir = _plugins_dir()
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
tmp_target = Path(tmp) / "plugin"
|
||||
tmp_clone = Path(tmp) / "plugin"
|
||||
|
||||
git_exe = _resolve_git_executable()
|
||||
if not git_exe:
|
||||
@@ -387,7 +442,7 @@ def _install_plugin_core(identifier: str, *, force: bool) -> tuple[Path, dict, s
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[git_exe, "clone", "--depth", "1", git_url, str(tmp_target)],
|
||||
[git_exe, "clone", "--depth", "1", git_url, str(tmp_clone)],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=60,
|
||||
@@ -405,8 +460,16 @@ def _install_plugin_core(identifier: str, *, force: bool) -> tuple[Path, dict, s
|
||||
err = (result.stderr or result.stdout or "").strip()
|
||||
raise PluginOperationError(f"Git clone failed:\n{err}")
|
||||
|
||||
# Resolve the directory within the clone that holds the plugin.
|
||||
if subdir:
|
||||
tmp_target = _resolve_subdir_within(tmp_clone, subdir)
|
||||
else:
|
||||
tmp_target = tmp_clone
|
||||
|
||||
manifest = _read_manifest(tmp_target)
|
||||
plugin_name = manifest.get("name") or _repo_name_from_url(git_url)
|
||||
plugin_name = manifest.get("name") or (
|
||||
subdir.rstrip("/").rsplit("/", 1)[-1] if subdir else _repo_name_from_url(git_url)
|
||||
)
|
||||
|
||||
try:
|
||||
target = _sanitize_plugin_name(plugin_name, plugins_dir)
|
||||
@@ -471,7 +534,7 @@ def cmd_install(
|
||||
console = Console()
|
||||
|
||||
try:
|
||||
git_url = _resolve_git_url(identifier)
|
||||
git_url, _subdir = _resolve_git_url(identifier)
|
||||
except ValueError as e:
|
||||
console.print(f"[red]Error:[/red] {e}")
|
||||
sys.exit(1)
|
||||
@@ -482,7 +545,10 @@ def cmd_install(
|
||||
"Consider using https:// or git@ for production installs.",
|
||||
)
|
||||
|
||||
console.print(f"[dim]Cloning {git_url}...[/dim]")
|
||||
if _subdir:
|
||||
console.print(f"[dim]Cloning {git_url} (subdir: {_subdir})...[/dim]")
|
||||
else:
|
||||
console.print(f"[dim]Cloning {git_url}...[/dim]")
|
||||
|
||||
try:
|
||||
target, installed_manifest, installed_name = _install_plugin_core(
|
||||
@@ -1473,7 +1539,7 @@ def dashboard_install_plugin(
|
||||
"""Non-interactive install for the web dashboard. Returns a JSON-serializable dict."""
|
||||
warnings: list[str] = []
|
||||
try:
|
||||
git_url = _resolve_git_url(identifier)
|
||||
git_url, _subdir = _resolve_git_url(identifier)
|
||||
if git_url.startswith(("http://", "file://")):
|
||||
warnings.append(
|
||||
"Insecure URL scheme; prefer https:// or git@ for production installs.",
|
||||
|
||||
@@ -492,10 +492,7 @@ def get_label(provider_id: str) -> str:
|
||||
|
||||
def is_aggregator(provider: str) -> bool:
|
||||
"""Return True when the provider is a multi-model aggregator."""
|
||||
provider_norm = normalize_provider(provider or "")
|
||||
if provider_norm.startswith("custom:"):
|
||||
return True
|
||||
pdef = get_provider(provider_norm)
|
||||
pdef = get_provider(provider)
|
||||
return pdef.is_aggregator if pdef else False
|
||||
|
||||
|
||||
|
||||
@@ -29,7 +29,6 @@ Examples:
|
||||
hermes debug share --expire 30 Keep paste for 30 days
|
||||
hermes debug share --local Print report locally (no upload)
|
||||
hermes debug share --no-redact Disable upload-time secret redaction
|
||||
hermes debug share --nous Upload to Nous-internal storage (private)
|
||||
hermes debug delete <url> Delete a previously uploaded paste
|
||||
""",
|
||||
)
|
||||
@@ -65,17 +64,6 @@ Examples:
|
||||
"into the public paste service."
|
||||
),
|
||||
)
|
||||
share_parser.add_argument(
|
||||
"--nous",
|
||||
action="store_true",
|
||||
help=(
|
||||
"Upload the debug bundle to Nous-internal storage (AWS S3) instead "
|
||||
"of a public paste service. The bundle is private — viewable only "
|
||||
"by Nous staff (and allowlisted Discord mods) via a Google-login-"
|
||||
"gated viewer — and auto-deletes after 14 days. Still force-redacts "
|
||||
"secrets unless --no-redact is also passed."
|
||||
),
|
||||
)
|
||||
delete_parser = debug_sub.add_parser(
|
||||
"delete",
|
||||
help="Delete a paste uploaded by 'hermes debug share'",
|
||||
|
||||
@@ -41,19 +41,15 @@ except ImportError: # pragma: no cover - httpx is a hermes dependency
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PhotonDashboardAuthError(RuntimeError):
|
||||
"""Raised when Photon rejects a device-flow token for the dashboard API."""
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Constants
|
||||
|
||||
# Hosted Photon allowlists registered device clients on the device-code
|
||||
# endpoint — an unregistered client_id is rejected with
|
||||
# `400 {"error":"invalid_client"}`. Use Photon's published CLI device
|
||||
# client until the dashboard API registers Hermes as its own client_id.
|
||||
DEFAULT_CLIENT_ID = "photon-cli"
|
||||
DEFAULT_SCOPE = "openid profile email"
|
||||
# Photon's published OAuth device-client identifier for first-party CLIs.
|
||||
# We use a fixed "hermes-agent" client_id string — Photon's device endpoint
|
||||
# accepts any opaque client_id and ties the bearer token to the approving
|
||||
# user, not to the client. If Photon later requires registered clients,
|
||||
# this is the one knob to update.
|
||||
DEFAULT_CLIENT_ID = "hermes-agent"
|
||||
|
||||
DEFAULT_DASHBOARD_HOST = "https://app.photon.codes"
|
||||
DEFAULT_SPECTRUM_HOST = "https://spectrum.photon.codes"
|
||||
@@ -170,13 +166,6 @@ class DeviceCode:
|
||||
interval: int
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class _DeviceTokenCandidate:
|
||||
"""A token-like value extracted from the device-token response."""
|
||||
source: str
|
||||
token: str
|
||||
|
||||
|
||||
def _dashboard_host() -> str:
|
||||
return (os.getenv("PHOTON_DASHBOARD_HOST") or DEFAULT_DASHBOARD_HOST).rstrip("/")
|
||||
|
||||
@@ -186,7 +175,7 @@ def _spectrum_host() -> str:
|
||||
|
||||
|
||||
def request_device_code(
|
||||
*, client_id: str = DEFAULT_CLIENT_ID, scope: Optional[str] = DEFAULT_SCOPE,
|
||||
*, client_id: str = DEFAULT_CLIENT_ID, scope: Optional[str] = None,
|
||||
) -> DeviceCode:
|
||||
"""POST ``/api/auth/device/code`` and return the device + user codes."""
|
||||
if httpx is None:
|
||||
@@ -243,22 +232,16 @@ def poll_for_token(
|
||||
time.sleep(sleep)
|
||||
continue
|
||||
if resp.status_code == 200:
|
||||
body: Dict[str, Any] = {}
|
||||
try:
|
||||
decoded = resp.json() or {}
|
||||
body = decoded if isinstance(decoded, dict) else {}
|
||||
except (TypeError, ValueError, json.JSONDecodeError):
|
||||
body = {}
|
||||
candidates = _device_response_token_candidates(
|
||||
body, headers=getattr(resp, "headers", {}),
|
||||
)
|
||||
if not candidates:
|
||||
token = resp.headers.get("set-auth-token")
|
||||
if not token:
|
||||
body = resp.json() or {}
|
||||
session = body.get("session") or {}
|
||||
token = session.get("access_token") or body.get("access_token")
|
||||
if not token:
|
||||
raise RuntimeError(
|
||||
"Photon returned 200 but no token candidate in the "
|
||||
"device-token response (expected access_token, "
|
||||
"data.access_token, accessToken, or set-auth-token)."
|
||||
"Photon returned 200 but no token in headers or body"
|
||||
)
|
||||
return candidates[0].token
|
||||
return token
|
||||
if resp.status_code == 400:
|
||||
# RFC 8628 §3.5 — error codes are returned with 400.
|
||||
body: Dict[str, Any] = {}
|
||||
@@ -290,142 +273,6 @@ def poll_for_token(
|
||||
raise TimeoutError("Photon device login timed out")
|
||||
|
||||
|
||||
def _device_response_token_candidates(
|
||||
body: Dict[str, Any],
|
||||
*,
|
||||
headers: Optional[Any] = None,
|
||||
) -> list:
|
||||
"""Extract de-duplicated token candidates from a device-token response.
|
||||
|
||||
Photon's device-token endpoint has returned tokens under several keys
|
||||
across versions (``access_token``, ``accessToken``, ``data.*``) and the
|
||||
documented ``set-auth-token`` response header. We collect every shape so
|
||||
the caller can validate each against the dashboard API before trusting it.
|
||||
"""
|
||||
candidates: list = []
|
||||
seen: set = set()
|
||||
|
||||
def add(source: str, value: Any) -> None:
|
||||
token = _clean_bearer_token(value)
|
||||
if not token or token in seen:
|
||||
return
|
||||
seen.add(token)
|
||||
candidates.append(_DeviceTokenCandidate(source=source, token=token))
|
||||
|
||||
add("access_token", body.get("access_token"))
|
||||
add("accessToken", body.get("accessToken"))
|
||||
session = body.get("session")
|
||||
if isinstance(session, dict):
|
||||
add("session.access_token", session.get("access_token"))
|
||||
data = body.get("data")
|
||||
if isinstance(data, dict):
|
||||
add("data.access_token", data.get("access_token"))
|
||||
add("data.accessToken", data.get("accessToken"))
|
||||
add("set-auth-token", _header_value(headers, "set-auth-token"))
|
||||
return candidates
|
||||
|
||||
|
||||
def _clean_bearer_token(value: Any) -> Optional[str]:
|
||||
if not isinstance(value, str):
|
||||
return None
|
||||
token = value.strip()
|
||||
if token.lower().startswith("bearer "):
|
||||
token = token[7:].strip()
|
||||
return token or None
|
||||
|
||||
|
||||
def _header_value(headers: Optional[Any], name: str) -> Optional[str]:
|
||||
if not headers:
|
||||
return None
|
||||
try:
|
||||
value = headers.get(name)
|
||||
if value:
|
||||
return str(value)
|
||||
except AttributeError:
|
||||
pass
|
||||
try:
|
||||
for key, value in dict(headers).items():
|
||||
if str(key).lower() == name.lower() and value:
|
||||
return str(value)
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
def _dashboard_get(path: str, token: str) -> Any:
|
||||
if httpx is None:
|
||||
raise RuntimeError("httpx is required for Photon device login")
|
||||
url = f"{_dashboard_host()}{path}"
|
||||
return httpx.get(
|
||||
url,
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
timeout=30.0,
|
||||
)
|
||||
|
||||
|
||||
def validate_photon_token(token: str) -> Dict[str, Any]:
|
||||
"""Verify a device-flow token is usable for dashboard project APIs.
|
||||
|
||||
The device flow can return a token that authenticates the Better Auth
|
||||
session lookup but is rejected by the project APIs. Validate against
|
||||
``/api/auth/get-session`` and ``/api/projects/`` so we fail loudly at
|
||||
login instead of saving a token that 404s/401s downstream.
|
||||
"""
|
||||
resp = _dashboard_get("/api/auth/get-session", token)
|
||||
if resp.status_code in (401, 403):
|
||||
raise PhotonDashboardAuthError(
|
||||
"Photon issued a device token, but the dashboard session lookup "
|
||||
"rejected it."
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
user = data.get("user") if isinstance(data, dict) else None
|
||||
if not isinstance(user, dict) or not user:
|
||||
raise PhotonDashboardAuthError(
|
||||
"Photon issued a device token, but the dashboard session lookup "
|
||||
"did not recognize it."
|
||||
)
|
||||
projects_resp = _dashboard_get("/api/projects/", token)
|
||||
if projects_resp.status_code in (401, 403):
|
||||
raise PhotonDashboardAuthError(
|
||||
"Photon device token was accepted for the session lookup but "
|
||||
"rejected by the project API."
|
||||
)
|
||||
projects_resp.raise_for_status()
|
||||
return user
|
||||
|
||||
|
||||
def _validated_dashboard_token(candidates: list) -> str:
|
||||
"""Return the first candidate token that passes dashboard validation."""
|
||||
if not candidates:
|
||||
raise RuntimeError(
|
||||
"Photon returned 200 but no token candidate in the device-token "
|
||||
"response."
|
||||
)
|
||||
dashboard_error: Optional[PhotonDashboardAuthError] = None
|
||||
last_error: Optional[BaseException] = None
|
||||
for candidate in candidates:
|
||||
try:
|
||||
validate_photon_token(candidate.token)
|
||||
return candidate.token
|
||||
except PhotonDashboardAuthError as exc:
|
||||
dashboard_error = exc
|
||||
last_error = exc
|
||||
continue
|
||||
except Exception as exc:
|
||||
last_error = exc
|
||||
continue
|
||||
if dashboard_error is not None:
|
||||
sources = ", ".join(c.source for c in candidates) or "none"
|
||||
raise PhotonDashboardAuthError(
|
||||
f"{dashboard_error} Device login returned no project-valid "
|
||||
f"dashboard token (tried: {sources})."
|
||||
) from dashboard_error
|
||||
if last_error is not None:
|
||||
raise last_error
|
||||
raise RuntimeError("Photon did not return a usable dashboard token")
|
||||
|
||||
|
||||
def login_device_flow(
|
||||
*,
|
||||
client_id: str = DEFAULT_CLIENT_ID,
|
||||
@@ -450,13 +297,7 @@ def login_device_flow(
|
||||
webbrowser.open(target, new=2)
|
||||
except Exception:
|
||||
pass
|
||||
# Poll once for the approved token, then collect every candidate shape so
|
||||
# we can validate against the dashboard API before persisting (avoids
|
||||
# saving a token that authenticates the session lookup but 404s on the
|
||||
# project APIs).
|
||||
first_token = poll_for_token(code, client_id=client_id)
|
||||
candidates = [_DeviceTokenCandidate(source="poll", token=first_token)]
|
||||
token = _validated_dashboard_token(candidates)
|
||||
token = poll_for_token(code, client_id=client_id)
|
||||
store_photon_token(token)
|
||||
return token
|
||||
|
||||
|
||||
@@ -45,7 +45,6 @@ ACP_REGISTRY_MANIFEST = REPO_ROOT / "acp_registry" / "agent.json"
|
||||
|
||||
# Auto-extracted from noreply emails + manual overrides
|
||||
AUTHOR_MAP = {
|
||||
"raysun12142006@gmail.com": "yanxue06",
|
||||
"alberto.regalado@ymail.com": "ARegalado1",
|
||||
"alchemistchaos@protonmail.com": "AlchemistChaos", # co-author only
|
||||
"gilad@smiti.ai": "giladbau",
|
||||
@@ -1242,7 +1241,6 @@ AUTHOR_MAP = {
|
||||
"charliekerfoot@gmail.com": "CharlieKerfoot", # PR #18951
|
||||
# Debug share upload-time redaction (May 2026)
|
||||
"dhuysamen@gmail.com": "GodsBoy", # PR #19318
|
||||
"github@nadyahermes.anonaddy.com": "ruangraung", # PR #42308
|
||||
"mrcoferland@gmail.com": "mrcoferland", # PR #19023
|
||||
"chenlinfeng@ruije.com.cn": "noOne-list", # PR #19050
|
||||
"briansu@Mac-mini.attlocal.net": "likejudy", # PR #19052
|
||||
|
||||
@@ -1,31 +1,29 @@
|
||||
"""Regression tests for #30768, #32383, and #33961.
|
||||
"""Regression tests for issue #30768 and #32383.
|
||||
|
||||
``_prompt_text_input_modal`` answers destructive-slash confirmations through a
|
||||
queue-based modal driven by prompt_toolkit key bindings. When invoked from the
|
||||
``process_loop`` daemon thread it sets the modal up on the app's event loop via
|
||||
``call_soon_threadsafe``, so it is safe on every platform — including native
|
||||
Windows (#33961), where the earlier ``sys.platform == "win32"`` → raw ``input()``
|
||||
fallback deadlocked the daemon thread against prompt_toolkit's stdin ownership.
|
||||
``_prompt_text_input_modal`` uses a queue-based modal that relies on
|
||||
prompt_toolkit key bindings receiving keyboard events. On Windows the
|
||||
prompt_toolkit input channel can deadlock when the modal is entered from
|
||||
the ``process_loop`` daemon thread. The fix falls back to the simpler
|
||||
``_prompt_text_input`` (stdin-based) prompt on Windows.
|
||||
|
||||
These tests verify:
|
||||
1. Daemon-thread confirm uses the modal via the app loop on Linux AND native
|
||||
Windows (#33961) — never the raw stdin fallback, never a hang.
|
||||
2. Main-thread confirm with a running app uses the modal.
|
||||
3. The raw stdin fallback is kept ONLY for the safe cases: no running app, and
|
||||
(on win32, off-thread) a scheduling failure degrades to a clean cancel.
|
||||
4. Empty choices returns None.
|
||||
1. Windows detection triggers the stdin fallback
|
||||
2. Non-Windows daemon threads still use the modal via the app loop
|
||||
3. macOS/Linux main-thread path still uses the modal (no regression)
|
||||
4. No-app path still uses the stdin fallback (existing behavior)
|
||||
5. Empty choices returns None (existing behavior)
|
||||
"""
|
||||
|
||||
import queue
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def _make_cli():
|
||||
"""Minimal HermesCLI shell exposing the prompt/modal helpers."""
|
||||
"""Minimal HermesCLI shell exposing prompt/modal helpers."""
|
||||
import cli as cli_mod
|
||||
|
||||
obj = object.__new__(cli_mod.HermesCLI)
|
||||
@@ -39,6 +37,9 @@ def _make_cli():
|
||||
return obj
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Sample choices used across tests
|
||||
# ---------------------------------------------------------------------------
|
||||
_SAMPLE_CHOICES = [
|
||||
("once", "Approve Once", "proceed this time only"),
|
||||
("always", "Always Approve", "proceed and silence this prompt permanently"),
|
||||
@@ -46,106 +47,119 @@ _SAMPLE_CHOICES = [
|
||||
]
|
||||
|
||||
|
||||
def _answer_modal_when_open(cli, response, stop=None):
|
||||
"""Push ``response`` onto the modal's response_queue once it opens.
|
||||
class TestModalWindowsFallback:
|
||||
"""Windows dead-lock regression tests for _prompt_text_input_modal."""
|
||||
|
||||
Gives up after ~2s, or early when ``stop`` is set (the modal will never open,
|
||||
e.g. a scheduling failure) so degraded-path tests don't wait the full budget.
|
||||
"""
|
||||
for _ in range(100):
|
||||
if stop is not None and stop.is_set():
|
||||
return
|
||||
state = cli._slash_confirm_state
|
||||
if state and "response_queue" in state:
|
||||
state["response_queue"].put(response)
|
||||
return
|
||||
time.sleep(0.02)
|
||||
|
||||
|
||||
def _run_on_daemon(call, cli, *, platform, response, schedule=None):
|
||||
"""Invoke ``call`` on a daemon thread — as the process_loop does — answering
|
||||
the modal with ``response`` once it opens.
|
||||
|
||||
Returns ``{result, stdin_called, capture, restore}``. ``schedule`` overrides
|
||||
the ``call_soon_threadsafe`` side effect (default: run the callback inline);
|
||||
pass a raiser to simulate a scheduling failure. Fails if the worker hangs,
|
||||
which is the deadlock canary for #33961.
|
||||
"""
|
||||
outcome = {"capture": [], "restore": [], "result": None, "stdin_called": False}
|
||||
done = threading.Event()
|
||||
|
||||
def _worker():
|
||||
try:
|
||||
with patch.object(sys, "platform", platform), \
|
||||
patch.object(cli._app.loop, "call_soon_threadsafe", side_effect=schedule or (lambda cb: cb())), \
|
||||
patch.object(cli, "_prompt_text_input") as mock_stdin, \
|
||||
patch.object(cli, "_invalidate"), \
|
||||
patch.object(cli, "_capture_modal_input_snapshot", side_effect=lambda: outcome["capture"].append(1)), \
|
||||
patch.object(cli, "_restore_modal_input_snapshot", side_effect=lambda: outcome["restore"].append(1)):
|
||||
outcome["result"] = call()
|
||||
outcome["stdin_called"] = mock_stdin.called
|
||||
finally:
|
||||
done.set()
|
||||
|
||||
worker = threading.Thread(target=_worker, daemon=True)
|
||||
answerer = threading.Thread(target=_answer_modal_when_open, args=(cli, response, done), daemon=True)
|
||||
answerer.start()
|
||||
worker.start()
|
||||
worker.join(timeout=2.0)
|
||||
answerer.join(timeout=2.0)
|
||||
assert not worker.is_alive(), "daemon thread hung — modal deadlocked"
|
||||
return outcome
|
||||
|
||||
|
||||
class TestModal:
|
||||
"""Behaviour of _prompt_text_input_modal across platforms and threads."""
|
||||
|
||||
@pytest.mark.parametrize("platform", ["linux", "win32"])
|
||||
def test_daemon_thread_uses_modal_via_app_loop(self, platform):
|
||||
"""Off the process_loop daemon thread, the confirm uses the modal via
|
||||
call_soon_threadsafe on every platform — including native Windows, where
|
||||
the old win32 early-return deadlocked on raw input() (#33961)."""
|
||||
def test_windows_falls_back_to_stdin(self):
|
||||
"""On Windows, _prompt_text_input_modal should use _prompt_text_input."""
|
||||
cli = _make_cli()
|
||||
outcome = _run_on_daemon(
|
||||
lambda: cli._prompt_text_input_modal(
|
||||
title="⚠️ /reset",
|
||||
|
||||
with patch.object(sys, "platform", "win32"), \
|
||||
patch.object(cli, "_prompt_text_input", return_value="1") as mock_stdin:
|
||||
result = cli._prompt_text_input_modal(
|
||||
title="⚠️ /new — destroys conversation state",
|
||||
detail="This starts a fresh session.",
|
||||
choices=_SAMPLE_CHOICES,
|
||||
timeout=5,
|
||||
),
|
||||
cli,
|
||||
platform=platform,
|
||||
response="once",
|
||||
)
|
||||
assert outcome["stdin_called"] is False, "must use the modal, not raw input()"
|
||||
assert outcome["result"] == "once"
|
||||
assert outcome["capture"] == [1]
|
||||
assert outcome["restore"] == [1]
|
||||
assert cli._slash_confirm_state is None
|
||||
)
|
||||
|
||||
def test_main_thread_with_app_uses_modal(self):
|
||||
"""On the main thread with a running app, the queue-based modal is used."""
|
||||
# The stdin-based fallback was used, not the modal queue path.
|
||||
mock_stdin.assert_called_once_with("Choice [1/2/3]: ")
|
||||
assert result == "1"
|
||||
|
||||
def test_non_main_thread_uses_modal_via_app_loop(self):
|
||||
"""Off the main thread on Linux, keep the modal path via app-loop setup."""
|
||||
cli = _make_cli()
|
||||
result_holder = {}
|
||||
setup_calls = []
|
||||
teardown_calls = []
|
||||
|
||||
def _call_soon_threadsafe(callback):
|
||||
callback()
|
||||
|
||||
def run_on_daemon():
|
||||
with patch.object(sys, "platform", "linux"), \
|
||||
patch.object(cli._app.loop, "call_soon_threadsafe", side_effect=_call_soon_threadsafe), \
|
||||
patch.object(cli, "_prompt_text_input") as mock_stdin, \
|
||||
patch.object(cli, "_capture_modal_input_snapshot", side_effect=lambda: setup_calls.append("capture")), \
|
||||
patch.object(cli, "_restore_modal_input_snapshot", side_effect=lambda: teardown_calls.append("restore")):
|
||||
result_holder["result"] = cli._prompt_text_input_modal(
|
||||
title="⚠️ /reset",
|
||||
detail="This starts a fresh session.",
|
||||
choices=_SAMPLE_CHOICES,
|
||||
timeout=5,
|
||||
)
|
||||
result_holder["stdin_called"] = mock_stdin.called
|
||||
|
||||
def _submit_after_delay():
|
||||
time.sleep(0.2)
|
||||
state = cli._slash_confirm_state
|
||||
if state and "response_queue" in state:
|
||||
state["response_queue"].put("once")
|
||||
|
||||
submitter = threading.Thread(target=_submit_after_delay, daemon=True)
|
||||
t = threading.Thread(target=run_on_daemon, daemon=True)
|
||||
submitter.start()
|
||||
t.start()
|
||||
t.join(timeout=2.0)
|
||||
submitter.join(timeout=2.0)
|
||||
assert not t.is_alive(), "daemon thread hung — modal deadlocked"
|
||||
assert result_holder["stdin_called"] is False
|
||||
assert result_holder["result"] == "once"
|
||||
assert setup_calls == ["capture"]
|
||||
assert teardown_calls == ["restore"]
|
||||
|
||||
def test_main_thread_non_windows_uses_modal(self):
|
||||
"""On macOS/Linux main thread, the queue-based modal is still used."""
|
||||
cli = _make_cli()
|
||||
|
||||
# We need to simulate the modal receiving a response. We'll patch
|
||||
# the response_queue to immediately return a value.
|
||||
with patch.object(sys, "platform", "darwin"), \
|
||||
patch.object(cli, "_capture_modal_input_snapshot"), \
|
||||
patch.object(cli, "_restore_modal_input_snapshot"), \
|
||||
patch.object(cli, "_invalidate"), \
|
||||
patch.object(cli, "_prompt_text_input") as mock_stdin:
|
||||
answerer = threading.Thread(target=_answer_modal_when_open, args=(cli, "once"), daemon=True)
|
||||
answerer.start()
|
||||
result = cli._prompt_text_input_modal(
|
||||
title="⚠️ /new",
|
||||
detail="This starts a fresh session.",
|
||||
choices=_SAMPLE_CHOICES,
|
||||
timeout=5,
|
||||
)
|
||||
answerer.join(timeout=2.0)
|
||||
patch.object(cli, "_invalidate"):
|
||||
# Start the modal in a way that it will receive a response
|
||||
# immediately via the queue.
|
||||
original_queue = queue.Queue
|
||||
original_time = time.monotonic
|
||||
|
||||
mock_stdin.assert_not_called()
|
||||
assert result == "once"
|
||||
def _fake_modal_flow(*args, **kwargs):
|
||||
"""Simulate the modal flow: set state, put response, return."""
|
||||
# We'll directly test that the modal path is entered by
|
||||
# checking that _slash_confirm_state was set.
|
||||
pass
|
||||
|
||||
# Since we can't easily mock the internal queue, let's test
|
||||
# that the modal path is entered by checking that
|
||||
# _prompt_text_input was NOT called.
|
||||
with patch.object(cli, "_prompt_text_input") as mock_stdin:
|
||||
# Set up a response that will be put into the queue
|
||||
# after the modal starts waiting.
|
||||
def _submit_after_delay():
|
||||
time.sleep(0.2)
|
||||
state = cli._slash_confirm_state
|
||||
if state and "response_queue" in state:
|
||||
state["response_queue"].put("once")
|
||||
|
||||
submitter = threading.Thread(target=_submit_after_delay, daemon=True)
|
||||
submitter.start()
|
||||
|
||||
result = cli._prompt_text_input_modal(
|
||||
title="⚠️ /new",
|
||||
detail="This starts a fresh session.",
|
||||
choices=_SAMPLE_CHOICES,
|
||||
timeout=5,
|
||||
)
|
||||
|
||||
submitter.join(timeout=2.0)
|
||||
|
||||
# The stdin fallback should NOT have been called.
|
||||
mock_stdin.assert_not_called()
|
||||
# The result should be "once" from the simulated modal response.
|
||||
assert result == "once"
|
||||
|
||||
def test_no_app_falls_back_to_stdin(self):
|
||||
"""Without a running app (oneshot / non-interactive), use the stdin prompt."""
|
||||
"""Without a prompt_toolkit app, always use stdin fallback."""
|
||||
cli = _make_cli()
|
||||
cli._app = None
|
||||
|
||||
@@ -159,102 +173,78 @@ class TestModal:
|
||||
mock_stdin.assert_called_once_with("Choice [1/2/3]: ")
|
||||
assert result == "3"
|
||||
|
||||
def test_windows_no_app_falls_back_to_stdin(self):
|
||||
"""win32 without a running app keeps stdin — the only case where the raw
|
||||
prompt is safe on Windows, since no app owns the console to deadlock."""
|
||||
cli = _make_cli()
|
||||
cli._app = None
|
||||
|
||||
with patch.object(sys, "platform", "win32"), \
|
||||
patch.object(cli, "_prompt_text_input", return_value="1") as mock_stdin:
|
||||
result = cli._prompt_text_input_modal(
|
||||
title="⚠️ /new — destroys conversation state",
|
||||
detail="This starts a fresh session.",
|
||||
choices=_SAMPLE_CHOICES,
|
||||
)
|
||||
|
||||
mock_stdin.assert_called_once_with("Choice [1/2/3]: ")
|
||||
assert result == "1"
|
||||
|
||||
def test_windows_scheduling_failure_clean_cancels(self):
|
||||
"""win32 off the main thread: if marshaling onto the app loop fails, cancel
|
||||
cleanly (None) rather than fall to raw input() (which deadlocks on native
|
||||
Windows) or hang. Asserts the _stdin_fallback guard (#33961)."""
|
||||
cli = _make_cli()
|
||||
|
||||
def _raise(_cb):
|
||||
raise RuntimeError("loop closed")
|
||||
|
||||
outcome = _run_on_daemon(
|
||||
lambda: cli._prompt_text_input_modal(
|
||||
title="⚠️ /reset",
|
||||
detail="This starts a fresh session.",
|
||||
choices=_SAMPLE_CHOICES,
|
||||
timeout=5,
|
||||
),
|
||||
cli,
|
||||
platform="win32",
|
||||
response="once",
|
||||
schedule=_raise,
|
||||
)
|
||||
assert outcome["stdin_called"] is False, "win32 off-thread must NOT call raw input()"
|
||||
assert outcome["result"] is None
|
||||
assert cli._slash_confirm_state is None
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"platform, expect_stdin, expect_result",
|
||||
[("win32", False, None), ("linux", True, "1")],
|
||||
)
|
||||
def test_daemon_thread_no_app_loop_uses_fallback(self, platform, expect_stdin, expect_result):
|
||||
"""Off the daemon thread with no resolvable app loop (``self._app.loop``
|
||||
is None / raises), the modal can never be scheduled, so the method short-
|
||||
circuits at the app_loop-is-None site (cli.py ~7260) — a distinct path
|
||||
from a call_soon_threadsafe failure. win32 clean-cancels (None) instead of
|
||||
deadlocking on raw input(); other platforms keep the stdin prompt."""
|
||||
cli = _make_cli()
|
||||
cli._app.loop = None # forces app_loop is None, off the main thread
|
||||
|
||||
outcome = {"result": None, "stdin_called": False}
|
||||
done = threading.Event()
|
||||
|
||||
def _worker():
|
||||
try:
|
||||
with patch.object(sys, "platform", platform), \
|
||||
patch.object(cli, "_prompt_text_input", return_value="1") as mock_stdin, \
|
||||
patch.object(cli, "_invalidate"):
|
||||
outcome["result"] = cli._prompt_text_input_modal(
|
||||
title="⚠️ /reset",
|
||||
detail="This starts a fresh session.",
|
||||
choices=_SAMPLE_CHOICES,
|
||||
timeout=5,
|
||||
)
|
||||
outcome["stdin_called"] = mock_stdin.called
|
||||
finally:
|
||||
done.set()
|
||||
|
||||
worker = threading.Thread(target=_worker, daemon=True)
|
||||
worker.start()
|
||||
worker.join(timeout=2.0)
|
||||
assert not worker.is_alive(), "daemon thread hung — modal deadlocked"
|
||||
assert outcome["stdin_called"] is expect_stdin
|
||||
assert outcome["result"] == expect_result
|
||||
assert cli._slash_confirm_state is None
|
||||
|
||||
def test_empty_choices_returns_none(self):
|
||||
"""Empty choices returns None without prompting."""
|
||||
"""Empty choices list should return None without prompting."""
|
||||
cli = _make_cli()
|
||||
|
||||
with patch.object(cli, "_prompt_text_input") as mock_stdin:
|
||||
result = cli._prompt_text_input_modal(title="Test", detail="Test", choices=[])
|
||||
result = cli._prompt_text_input_modal(
|
||||
title="Test",
|
||||
detail="Test",
|
||||
choices=[],
|
||||
)
|
||||
|
||||
mock_stdin.assert_not_called()
|
||||
assert result is None
|
||||
|
||||
def test_windows_fallback_does_not_set_modal_state(self):
|
||||
"""Verify Windows fallback doesn't leave _slash_confirm_state set."""
|
||||
cli = _make_cli()
|
||||
|
||||
with patch.object(sys, "platform", "win32"), \
|
||||
patch.object(cli, "_prompt_text_input", return_value="1"):
|
||||
cli._prompt_text_input_modal(
|
||||
title="⚠️ /reset",
|
||||
detail="This starts a fresh session.",
|
||||
choices=_SAMPLE_CHOICES,
|
||||
)
|
||||
|
||||
assert cli._slash_confirm_state is None
|
||||
|
||||
def test_non_main_thread_modal_clears_state(self):
|
||||
"""Verify daemon-thread modal teardown does not leave state behind."""
|
||||
cli = _make_cli()
|
||||
errors = []
|
||||
|
||||
def _call_soon_threadsafe(callback):
|
||||
callback()
|
||||
|
||||
def run_on_daemon():
|
||||
try:
|
||||
with patch.object(sys, "platform", "linux"), \
|
||||
patch.object(cli._app.loop, "call_soon_threadsafe", side_effect=_call_soon_threadsafe):
|
||||
def _submit_after_delay():
|
||||
time.sleep(0.2)
|
||||
state = cli._slash_confirm_state
|
||||
if state and "response_queue" in state:
|
||||
state["response_queue"].put("cancel")
|
||||
|
||||
submitter = threading.Thread(target=_submit_after_delay, daemon=True)
|
||||
submitter.start()
|
||||
cli._prompt_text_input_modal(
|
||||
title="⚠️ /new",
|
||||
detail="This starts a fresh session.",
|
||||
choices=_SAMPLE_CHOICES,
|
||||
timeout=5,
|
||||
)
|
||||
submitter.join(timeout=2.0)
|
||||
if cli._slash_confirm_state is not None:
|
||||
errors.append("_slash_confirm_state should be None")
|
||||
except Exception as exc:
|
||||
errors.append(str(exc))
|
||||
|
||||
t = threading.Thread(target=run_on_daemon, daemon=True)
|
||||
t.start()
|
||||
t.join(timeout=2.0)
|
||||
assert not errors, f"unexpected errors: {errors}"
|
||||
assert cli._slash_confirm_state is None
|
||||
|
||||
|
||||
class TestConfirmDestructiveSlashWindows:
|
||||
"""End-to-end _confirm_destructive_slash on the native-Windows daemon thread."""
|
||||
"""Integration-level tests for _confirm_destructive_slash on Windows."""
|
||||
|
||||
def _make_interactive_cli(self):
|
||||
def test_confirm_destructive_slash_bypasses_modal_on_windows(self):
|
||||
"""_confirm_destructive_slash should work on Windows via stdin fallback."""
|
||||
cli = _make_cli()
|
||||
cli.model = "test-model"
|
||||
cli._agent_running = False
|
||||
@@ -265,140 +255,37 @@ class TestConfirmDestructiveSlashWindows:
|
||||
cli._pending_tool_info = {}
|
||||
cli._tool_start_time = 0.0
|
||||
cli._last_scrollback_tool = ""
|
||||
return cli
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"response, expected",
|
||||
[("once", "once"), ("cancel", None)],
|
||||
)
|
||||
def test_confirm_destructive_slash_uses_modal_on_windows(self, response, expected):
|
||||
"""On native Windows, the bare /new confirm drives the modal (not stdin)
|
||||
and returns the chosen outcome — the bug #33961 froze this path."""
|
||||
cli = self._make_interactive_cli()
|
||||
with patch("cli.load_cli_config", return_value={"approvals": {"destructive_slash_confirm": True}}):
|
||||
outcome = _run_on_daemon(
|
||||
lambda: cli._confirm_destructive_slash(
|
||||
"new",
|
||||
"This starts a fresh session.\nThe current conversation history will be discarded.",
|
||||
),
|
||||
cli,
|
||||
platform="win32",
|
||||
response=response,
|
||||
with patch.object(sys, "platform", "win32"), \
|
||||
patch.object(cli, "_prompt_text_input", return_value="1"), \
|
||||
patch("cli.load_cli_config", return_value={"approvals": {"destructive_slash_confirm": True}}):
|
||||
result = cli._confirm_destructive_slash(
|
||||
"new",
|
||||
"This starts a fresh session.\nThe current conversation history will be discarded.",
|
||||
)
|
||||
|
||||
assert outcome["stdin_called"] is False
|
||||
assert outcome["result"] == expected
|
||||
assert result == "once"
|
||||
|
||||
|
||||
class TestNativeWindowsNoRawInputDeadlock:
|
||||
"""Anti-regression guard exercising the REAL ``_prompt_text_input``.
|
||||
|
||||
Every other test here mocks ``_prompt_text_input`` away, so they only
|
||||
assert *routing* (modal vs. stdin) — they cannot observe the actual hang
|
||||
that #33961 was. The historical regression was precisely that
|
||||
``_prompt_text_input_modal`` delegated to the *real* ``_prompt_text_input``
|
||||
on native Windows, which on a non-main thread runs a bare ``input()`` that
|
||||
blocks forever against prompt_toolkit's stdin ownership.
|
||||
|
||||
These tests let the real ``_prompt_text_input`` run with a blocking
|
||||
``input()`` and assert the worker thread never hangs. They fail on the
|
||||
pre-#33961 code (win32 → ``_prompt_text_input`` → off-main ``input()``)
|
||||
and pass once the modal path / clean-cancel fallback is in place.
|
||||
"""
|
||||
|
||||
def test_win32_daemon_thread_never_blocks_on_real_input(self):
|
||||
"""A blocking input() must NOT hang the daemon thread on win32.
|
||||
|
||||
Drives the genuine helper chain (no mock of ``_prompt_text_input``)
|
||||
with ``builtins.input`` patched to block forever. The confirm must
|
||||
resolve via the app-loop modal (answered on a background thread, as
|
||||
the real key bindings would) and never sit in ``input()``. On the
|
||||
pre-#33961 code the win32 early-return routed to the real
|
||||
``_prompt_text_input`` → off-main ``input()`` → permanent hang.
|
||||
"""
|
||||
def test_confirm_destructive_slash_cancelled_on_windows(self):
|
||||
"""Cancellation via stdin fallback works on Windows."""
|
||||
cli = _make_cli()
|
||||
cli._app.loop.call_soon_threadsafe = lambda cb: cb()
|
||||
cli.model = "test-model"
|
||||
cli._agent_running = False
|
||||
cli._spinner_text = ""
|
||||
cli._should_exit = False
|
||||
cli._command_running = False
|
||||
cli.session_id = "test-session"
|
||||
cli._pending_tool_info = {}
|
||||
cli._tool_start_time = 0.0
|
||||
cli._last_scrollback_tool = ""
|
||||
|
||||
def _blocking_input(prompt=""): # stands in for "no line ever arrives"
|
||||
time.sleep(30)
|
||||
return "1"
|
||||
with patch.object(sys, "platform", "win32"), \
|
||||
patch.object(cli, "_prompt_text_input", return_value="3"), \
|
||||
patch("cli.load_cli_config", return_value={"approvals": {"destructive_slash_confirm": True}}):
|
||||
result = cli._confirm_destructive_slash(
|
||||
"reset",
|
||||
"This starts a fresh session.\nThe current conversation history will be discarded.",
|
||||
)
|
||||
|
||||
outcome = {}
|
||||
done = threading.Event()
|
||||
|
||||
def _worker():
|
||||
try:
|
||||
with patch.object(sys, "platform", "win32"), \
|
||||
patch("builtins.input", side_effect=_blocking_input), \
|
||||
patch.object(cli, "_capture_modal_input_snapshot"), \
|
||||
patch.object(cli, "_restore_modal_input_snapshot"), \
|
||||
patch.object(cli, "_invalidate"):
|
||||
outcome["result"] = cli._prompt_text_input_modal(
|
||||
title="/new",
|
||||
detail="destroys conversation state",
|
||||
choices=_SAMPLE_CHOICES,
|
||||
timeout=3,
|
||||
)
|
||||
finally:
|
||||
done.set()
|
||||
|
||||
worker = threading.Thread(target=_worker, daemon=True)
|
||||
answerer = threading.Thread(
|
||||
target=_answer_modal_when_open, args=(cli, "cancel", done), daemon=True
|
||||
)
|
||||
answerer.start()
|
||||
worker.start()
|
||||
worker.join(timeout=5.0)
|
||||
answerer.join(timeout=5.0)
|
||||
assert not worker.is_alive(), (
|
||||
"daemon thread hung in real input() — native-Windows confirm "
|
||||
"deadlock regressed (#33961)"
|
||||
)
|
||||
# cancel → None; the point is it RETURNED rather than blocking forever.
|
||||
assert outcome.get("result") in (None, "cancel")
|
||||
|
||||
def test_win32_scheduling_failure_cleanly_cancels_no_input(self):
|
||||
"""If the modal can't be marshaled onto the app loop on native Windows
|
||||
(scheduling failure) the off-main-thread path must cancel cleanly —
|
||||
NOT fall through to a blocking raw ``input()``.
|
||||
|
||||
This is the degraded branch the pre-#33961 code handled with
|
||||
``return self._prompt_text_input(...)`` (which deadlocks); the fix
|
||||
returns ``None`` instead.
|
||||
"""
|
||||
cli = _make_cli()
|
||||
|
||||
def _raise(cb): # call_soon_threadsafe scheduling failure
|
||||
raise RuntimeError("event loop closed")
|
||||
|
||||
cli._app.loop.call_soon_threadsafe = _raise
|
||||
|
||||
input_called = {"n": 0}
|
||||
|
||||
def _tracking_input(prompt=""):
|
||||
input_called["n"] += 1
|
||||
time.sleep(30)
|
||||
return "1"
|
||||
|
||||
outcome = {}
|
||||
|
||||
def _worker():
|
||||
with patch.object(sys, "platform", "win32"), \
|
||||
patch("builtins.input", side_effect=_tracking_input), \
|
||||
patch.object(cli, "_invalidate"):
|
||||
outcome["result"] = cli._prompt_text_input_modal(
|
||||
title="/new",
|
||||
detail="destroys conversation state",
|
||||
choices=_SAMPLE_CHOICES,
|
||||
timeout=3,
|
||||
)
|
||||
|
||||
worker = threading.Thread(target=_worker, daemon=True)
|
||||
worker.start()
|
||||
worker.join(timeout=5.0)
|
||||
assert not worker.is_alive(), (
|
||||
"daemon thread hung — win32 scheduling-failure fallback used raw "
|
||||
"input() instead of cleanly cancelling (#33961)"
|
||||
)
|
||||
assert input_called["n"] == 0, "win32 off-thread fallback must not call input()"
|
||||
assert outcome.get("result") is None
|
||||
# Choice "3" normalizes to "cancel", which returns None.
|
||||
assert result is None
|
||||
|
||||
@@ -1264,123 +1264,3 @@ async def test_verbose_mode_respects_explicit_tool_preview_length(monkeypatch, t
|
||||
assert VerboseAgent.LONG_CODE not in all_content
|
||||
# But should still contain the truncated portion with "..."
|
||||
assert "..." in all_content
|
||||
|
||||
|
||||
class CodeBlockProgressAdapter(ProgressCaptureAdapter):
|
||||
"""A markdown-capable progress adapter (declares supports_code_blocks)."""
|
||||
|
||||
supports_code_blocks = True
|
||||
|
||||
|
||||
class TerminalCommandAgent:
|
||||
"""Emits a terminal tool.started with a real, multi-line command arg."""
|
||||
|
||||
CMD = (
|
||||
"set -euo pipefail\n"
|
||||
"printf 'node: '; node --version\n"
|
||||
"npm install -g hyperframes@latest"
|
||||
)
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
self.tool_progress_callback = kwargs.get("tool_progress_callback")
|
||||
self.tools = []
|
||||
|
||||
def run_conversation(self, message, conversation_history=None, task_id=None):
|
||||
self.tool_progress_callback(
|
||||
"tool.started", "terminal", self.CMD, {"command": self.CMD}
|
||||
)
|
||||
# Let the async progress task drain the queue and send before returning.
|
||||
time.sleep(0.35)
|
||||
return {"final_response": "done", "messages": [], "api_calls": 1}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_terminal_progress_is_truncated_preview_not_bash_block(monkeypatch, tmp_path):
|
||||
"""Regression for #41215: terminal progress must render as a short truncated
|
||||
preview, never the full command in a fenced ```bash block, even on a
|
||||
markdown-capable (supports_code_blocks) gateway."""
|
||||
monkeypatch.setenv("HERMES_TOOL_PROGRESS_MODE", "all")
|
||||
|
||||
fake_dotenv = types.ModuleType("dotenv")
|
||||
fake_dotenv.load_dotenv = lambda *args, **kwargs: None
|
||||
monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv)
|
||||
|
||||
fake_run_agent = types.ModuleType("run_agent")
|
||||
fake_run_agent.AIAgent = TerminalCommandAgent
|
||||
monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent)
|
||||
import tools.terminal_tool # noqa: F401 - register terminal emoji
|
||||
|
||||
adapter = CodeBlockProgressAdapter(platform=Platform.TELEGRAM)
|
||||
runner = _make_runner(adapter)
|
||||
gateway_run = importlib.import_module("gateway.run")
|
||||
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
|
||||
monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"})
|
||||
|
||||
source = SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id="12345",
|
||||
chat_type="dm",
|
||||
thread_id=None,
|
||||
)
|
||||
|
||||
result = await runner._run_agent(
|
||||
message="hello",
|
||||
context_prompt="",
|
||||
history=[],
|
||||
source=source,
|
||||
session_id="sess-terminal-no-bash-block",
|
||||
session_key="agent:main:telegram:dm:12345",
|
||||
)
|
||||
|
||||
assert result["final_response"] == "done"
|
||||
all_content = " ".join(call["content"] for call in adapter.sent)
|
||||
all_content += " ".join(call["content"] for call in adapter.edits)
|
||||
# Compact truncated preview, not a fenced bash block.
|
||||
assert "```bash" not in all_content
|
||||
assert 'terminal: "' in all_content
|
||||
# The full multi-line command body must not reach the chat.
|
||||
assert "npm install -g hyperframes@latest" not in all_content
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_terminal_progress_no_bash_block_in_verbose_mode(monkeypatch, tmp_path):
|
||||
"""#41215 also rendered the bash block in verbose mode. The revert removed it
|
||||
from both branches, so verbose progress must not emit a fenced ```bash block
|
||||
either (verbose still shows args by opt-in, just not as a code block)."""
|
||||
monkeypatch.setenv("HERMES_TOOL_PROGRESS_MODE", "verbose")
|
||||
|
||||
fake_dotenv = types.ModuleType("dotenv")
|
||||
fake_dotenv.load_dotenv = lambda *args, **kwargs: None
|
||||
monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv)
|
||||
|
||||
fake_run_agent = types.ModuleType("run_agent")
|
||||
fake_run_agent.AIAgent = TerminalCommandAgent
|
||||
monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent)
|
||||
import tools.terminal_tool # noqa: F401 - register terminal emoji
|
||||
|
||||
adapter = CodeBlockProgressAdapter(platform=Platform.TELEGRAM)
|
||||
runner = _make_runner(adapter)
|
||||
gateway_run = importlib.import_module("gateway.run")
|
||||
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
|
||||
monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"})
|
||||
|
||||
source = SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id="12345",
|
||||
chat_type="dm",
|
||||
thread_id=None,
|
||||
)
|
||||
|
||||
result = await runner._run_agent(
|
||||
message="hello",
|
||||
context_prompt="",
|
||||
history=[],
|
||||
source=source,
|
||||
session_id="sess-terminal-verbose-no-bash",
|
||||
session_key="agent:main:telegram:dm:12345",
|
||||
)
|
||||
|
||||
assert result["final_response"] == "done"
|
||||
all_content = " ".join(call["content"] for call in adapter.sent)
|
||||
all_content += " ".join(call["content"] for call in adapter.edits)
|
||||
assert "```bash" not in all_content
|
||||
|
||||
@@ -835,7 +835,7 @@ class TestEditMessageStreamingSafety:
|
||||
assert second_call == {
|
||||
"chat_id": 123,
|
||||
"message_id": 456,
|
||||
"text": "final bold",
|
||||
"text": "final **bold**",
|
||||
}
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
@@ -491,7 +491,6 @@ class TestRunDebugShare:
|
||||
args.lines = 50
|
||||
args.expire = 7
|
||||
args.local = False
|
||||
args.nous = False
|
||||
|
||||
with patch("hermes_cli.dump.run_dump"), \
|
||||
patch("hermes_cli.debug._sweep_expired_pastes", return_value=(0, 0)) as mock_sweep, \
|
||||
@@ -510,7 +509,6 @@ class TestRunDebugShare:
|
||||
args.lines = 50
|
||||
args.expire = 7
|
||||
args.local = False
|
||||
args.nous = False
|
||||
|
||||
with patch("hermes_cli.dump.run_dump"), \
|
||||
patch(
|
||||
@@ -531,7 +529,6 @@ class TestRunDebugShare:
|
||||
args.lines = 50
|
||||
args.expire = 7
|
||||
args.local = True
|
||||
args.nous = False
|
||||
|
||||
with patch("hermes_cli.dump.run_dump"):
|
||||
run_debug_share(args)
|
||||
@@ -549,7 +546,6 @@ class TestRunDebugShare:
|
||||
args.lines = 50
|
||||
args.expire = 7
|
||||
args.local = False
|
||||
args.nous = False
|
||||
|
||||
call_count = [0]
|
||||
uploaded_content = []
|
||||
@@ -603,7 +599,6 @@ class TestRunDebugShare:
|
||||
args.lines = 50
|
||||
args.expire = 7
|
||||
args.local = False
|
||||
args.nous = False
|
||||
|
||||
uploaded_content = []
|
||||
|
||||
@@ -649,7 +644,6 @@ class TestRunDebugShare:
|
||||
args.lines = 50
|
||||
args.expire = 7
|
||||
args.local = False
|
||||
args.nous = False
|
||||
|
||||
call_count = [0]
|
||||
def _mock_upload(content, expiry_days=7):
|
||||
@@ -674,7 +668,6 @@ class TestRunDebugShare:
|
||||
args.lines = 50
|
||||
args.expire = 7
|
||||
args.local = False
|
||||
args.nous = False
|
||||
|
||||
call_count = [0]
|
||||
def _mock_upload(content, expiry_days=7):
|
||||
@@ -701,7 +694,6 @@ class TestRunDebugShare:
|
||||
args.lines = 50
|
||||
args.expire = 7
|
||||
args.local = False
|
||||
args.nous = False
|
||||
|
||||
with patch("hermes_cli.dump.run_dump"), \
|
||||
patch("hermes_cli.debug.upload_to_pastebin",
|
||||
@@ -750,7 +742,6 @@ class TestRunDebugShareRedaction:
|
||||
args.lines = 50
|
||||
args.expire = 7
|
||||
args.local = False
|
||||
args.nous = False
|
||||
args.no_redact = False
|
||||
|
||||
captured: list[str] = []
|
||||
@@ -781,7 +772,6 @@ class TestRunDebugShareRedaction:
|
||||
args.lines = 50
|
||||
args.expire = 7
|
||||
args.local = False
|
||||
args.nous = False
|
||||
args.no_redact = False
|
||||
|
||||
captured: list[str] = []
|
||||
@@ -810,7 +800,6 @@ class TestRunDebugShareRedaction:
|
||||
args.lines = 50
|
||||
args.expire = 7
|
||||
args.local = False
|
||||
args.nous = False
|
||||
args.no_redact = True
|
||||
|
||||
captured: list[str] = []
|
||||
@@ -861,7 +850,6 @@ class TestRunDebug:
|
||||
args.lines = 200
|
||||
args.expire = 7
|
||||
args.local = True
|
||||
args.nous = False
|
||||
|
||||
with patch("hermes_cli.dump.run_dump"):
|
||||
run_debug(args)
|
||||
@@ -1240,7 +1228,6 @@ class TestShareIncludesAutoDelete:
|
||||
args.lines = 50
|
||||
args.expire = 7
|
||||
args.local = False
|
||||
args.nous = False
|
||||
|
||||
with patch("hermes_cli.dump.run_dump"), \
|
||||
patch("hermes_cli.debug.upload_to_pastebin",
|
||||
@@ -1263,7 +1250,6 @@ class TestShareIncludesAutoDelete:
|
||||
args.lines = 50
|
||||
args.expire = 7
|
||||
args.local = False
|
||||
args.nous = False
|
||||
|
||||
with patch("hermes_cli.dump.run_dump"), \
|
||||
patch("hermes_cli.debug.upload_to_pastebin",
|
||||
@@ -1281,7 +1267,6 @@ class TestShareIncludesAutoDelete:
|
||||
args.lines = 50
|
||||
args.expire = 7
|
||||
args.local = True
|
||||
args.nous = False
|
||||
|
||||
with patch("hermes_cli.dump.run_dump"):
|
||||
run_debug_share(args)
|
||||
@@ -1395,220 +1380,3 @@ class TestBuildDebugShare:
|
||||
), patch("hermes_cli.debug._schedule_auto_delete"):
|
||||
with pytest.raises(RuntimeError, match="all paste services down"):
|
||||
build_debug_share(log_lines=50, redact=True)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Shared bundle collection + Nous-S3 path
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCollectShareBundle:
|
||||
def test_returns_report_and_logs(self, hermes_home):
|
||||
from hermes_cli.debug import collect_share_bundle
|
||||
|
||||
with patch("hermes_cli.dump.run_dump"):
|
||||
bundle = collect_share_bundle(log_lines=50, redact=True)
|
||||
|
||||
assert "report" in bundle
|
||||
assert "agent.log" in bundle
|
||||
assert "gateway.log" in bundle
|
||||
assert "desktop.log" in bundle
|
||||
# Banner is prepended under redact=True.
|
||||
assert "redacted at upload time" in bundle["report"]
|
||||
assert "session started" in bundle["agent.log"]
|
||||
|
||||
def test_no_redact_omits_banner(self, hermes_home):
|
||||
from hermes_cli.debug import collect_share_bundle
|
||||
|
||||
with patch("hermes_cli.dump.run_dump"):
|
||||
bundle = collect_share_bundle(log_lines=50, redact=False)
|
||||
|
||||
assert "redacted at upload time" not in bundle["report"]
|
||||
|
||||
def test_redaction_keeps_secrets_out(self, hermes_home):
|
||||
from hermes_cli.debug import collect_share_bundle
|
||||
|
||||
secret = "sk-proj-abcdefghijklmnopqrstuvwxyz1234567890"
|
||||
(hermes_home / "logs" / "agent.log").write_text(
|
||||
f"line one\nOPENAI_API_KEY={secret}\nline three\n"
|
||||
)
|
||||
with patch("hermes_cli.dump.run_dump"):
|
||||
redacted = collect_share_bundle(log_lines=50, redact=True)
|
||||
unredacted = collect_share_bundle(log_lines=50, redact=False)
|
||||
|
||||
# Sanity: without redaction the secret is present in the bundle.
|
||||
assert secret in "\n".join(unredacted.values())
|
||||
# With redaction it must be scrubbed everywhere.
|
||||
assert secret not in "\n".join(redacted.values())
|
||||
|
||||
|
||||
def test_build_debug_share_uses_collector(self, hermes_home):
|
||||
# build_debug_share must produce the same report text the collector does
|
||||
# (i.e. the refactor preserved paste.rs behaviour).
|
||||
from hermes_cli.debug import build_debug_share, collect_share_bundle
|
||||
|
||||
with patch("hermes_cli.dump.run_dump"):
|
||||
expected = collect_share_bundle(log_lines=50, redact=True)["report"]
|
||||
|
||||
uploaded = []
|
||||
|
||||
def _upload(content, expiry_days=7):
|
||||
uploaded.append(content)
|
||||
return "https://paste.rs/x"
|
||||
|
||||
with patch("hermes_cli.dump.run_dump"), patch(
|
||||
"hermes_cli.debug.upload_to_pastebin", side_effect=_upload
|
||||
), patch("hermes_cli.debug._schedule_auto_delete"):
|
||||
result = build_debug_share(log_lines=50, redact=True)
|
||||
|
||||
assert result.urls["Report"] == "https://paste.rs/x"
|
||||
# The report uploaded should match the collector's report.
|
||||
assert uploaded[0] == expected
|
||||
|
||||
|
||||
class TestBuildNousBundle:
|
||||
def test_envelope_shape_and_gzip(self, hermes_home):
|
||||
import gzip
|
||||
import json as _json
|
||||
|
||||
from hermes_cli.debug import build_nous_bundle
|
||||
|
||||
files = {"report": "hello", "agent.log": "log line"}
|
||||
blob = build_nous_bundle(files, redact=True)
|
||||
|
||||
# It's gzip — magic bytes.
|
||||
assert blob[:2] == b"\x1f\x8b"
|
||||
envelope = _json.loads(gzip.decompress(blob).decode())
|
||||
assert envelope["format"] == "hermes-debug-share/1"
|
||||
assert envelope["redacted"] is True
|
||||
assert envelope["files"] == files
|
||||
assert "created" in envelope
|
||||
|
||||
def test_redacted_false_recorded(self):
|
||||
import gzip
|
||||
import json as _json
|
||||
|
||||
from hermes_cli.debug import build_nous_bundle
|
||||
|
||||
blob = build_nous_bundle({"report": "x"}, redact=False)
|
||||
envelope = _json.loads(gzip.decompress(blob).decode())
|
||||
assert envelope["redacted"] is False
|
||||
|
||||
|
||||
class TestRunDebugShareNous:
|
||||
def _args(self, **over):
|
||||
class _A:
|
||||
lines = 50
|
||||
expire = 7
|
||||
local = False
|
||||
nous = True
|
||||
no_redact = False
|
||||
|
||||
a = _A()
|
||||
for k, v in over.items():
|
||||
setattr(a, k, v)
|
||||
return a
|
||||
|
||||
def test_nous_success_prints_view_url(self, hermes_home, capsys):
|
||||
from hermes_cli.debug import run_debug_share
|
||||
|
||||
res = {
|
||||
"id": "id-1",
|
||||
"viewUrl": "https://support.example.com/diagnostics/id-1",
|
||||
"expiresAt": "2026-06-20T00:00:00Z",
|
||||
}
|
||||
with patch("hermes_cli.dump.run_dump"), patch(
|
||||
"hermes_cli.diagnostics_upload.share_to_nous", return_value=res
|
||||
) as share:
|
||||
run_debug_share(self._args())
|
||||
|
||||
out = capsys.readouterr().out
|
||||
assert "Nous-INTERNAL" in out
|
||||
assert "https://support.example.com/diagnostics/id-1" in out
|
||||
assert "2026-06-20T00:00:00Z" in out
|
||||
# The blob passed to share_to_nous must be gzip bytes.
|
||||
blob = share.call_args[0][0]
|
||||
assert isinstance(blob, (bytes, bytearray)) and blob[:2] == b"\x1f\x8b"
|
||||
|
||||
def test_nous_failure_suggests_local(self, hermes_home, capsys):
|
||||
from hermes_cli.debug import run_debug_share
|
||||
|
||||
with patch("hermes_cli.dump.run_dump"), patch(
|
||||
"hermes_cli.diagnostics_upload.share_to_nous",
|
||||
side_effect=RuntimeError("service down"),
|
||||
):
|
||||
with pytest.raises(SystemExit) as exc:
|
||||
run_debug_share(self._args())
|
||||
assert exc.value.code == 1
|
||||
err = capsys.readouterr().err
|
||||
assert "Nous upload failed" in err
|
||||
assert "--local" in err
|
||||
|
||||
def test_nous_does_not_touch_pastebin(self, hermes_home):
|
||||
from hermes_cli.debug import run_debug_share
|
||||
|
||||
res = {"id": "id-1", "viewUrl": "https://v"}
|
||||
with patch("hermes_cli.dump.run_dump"), patch(
|
||||
"hermes_cli.diagnostics_upload.share_to_nous", return_value=res
|
||||
), patch("hermes_cli.debug.upload_to_pastebin") as paste:
|
||||
run_debug_share(self._args())
|
||||
paste.assert_not_called()
|
||||
|
||||
|
||||
class TestDebugSlashCommand:
|
||||
"""`/debug [nous|local]` parsing in the CLI/TUI handler.
|
||||
|
||||
The classic CLI and the TUI slash worker both dispatch through
|
||||
``HermesCLI.process_command`` → ``_handle_debug_command(cmd_original)``,
|
||||
which parses an optional destination word and builds the args namespace
|
||||
handed to ``run_debug_share``.
|
||||
"""
|
||||
|
||||
def _handler(self):
|
||||
from hermes_cli.cli_commands_mixin import CLICommandsMixin
|
||||
|
||||
class _Stub(CLICommandsMixin):
|
||||
pass
|
||||
|
||||
return _Stub()._handle_debug_command
|
||||
|
||||
def _captured(self, cmd_original):
|
||||
captured = {}
|
||||
|
||||
def _fake_run(args):
|
||||
captured.update(vars(args))
|
||||
|
||||
with patch("hermes_cli.debug.run_debug_share", _fake_run):
|
||||
self._handler()(cmd_original)
|
||||
return captured
|
||||
|
||||
def test_bare_debug_defaults_to_paste(self):
|
||||
c = self._captured("/debug")
|
||||
assert c["nous"] is False and c["local"] is False
|
||||
assert c["lines"] == 200 and c["expire"] == 7
|
||||
|
||||
def test_nous_word_sets_nous(self):
|
||||
c = self._captured("/debug nous")
|
||||
assert c["nous"] is True and c["local"] is False
|
||||
|
||||
def test_local_word_sets_local(self):
|
||||
c = self._captured("/debug local")
|
||||
assert c["local"] is True and c["nous"] is False
|
||||
|
||||
def test_word_parsing_is_case_insensitive(self):
|
||||
c = self._captured("/debug NOUS")
|
||||
assert c["nous"] is True
|
||||
|
||||
def test_local_wins_over_nous(self):
|
||||
# local never touches the network, so it takes precedence.
|
||||
c = self._captured("/debug nous local")
|
||||
assert c["local"] is True and c["nous"] is False
|
||||
|
||||
def test_unknown_word_falls_back_to_default(self):
|
||||
c = self._captured("/debug paste")
|
||||
assert c["nous"] is False and c["local"] is False
|
||||
|
||||
def test_no_arg_default_keyword(self):
|
||||
# Calling with no cmd_original (legacy callers) must still work.
|
||||
c = self._captured("")
|
||||
assert c["nous"] is False and c["local"] is False
|
||||
|
||||
|
||||
@@ -1,227 +0,0 @@
|
||||
"""Tests for ``hermes_cli.diagnostics_upload`` — the Nous-S3 upload client.
|
||||
|
||||
All network I/O is mocked at ``urllib.request.urlopen``; no real requests
|
||||
are made.
|
||||
"""
|
||||
|
||||
import io
|
||||
import json
|
||||
import urllib.error
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def _resp(*, status=200, body=b""):
|
||||
"""Build a context-manager mock mimicking ``urllib.request.urlopen``."""
|
||||
m = MagicMock()
|
||||
m.status = status
|
||||
m.getcode.return_value = status
|
||||
m.read.return_value = body
|
||||
m.__enter__ = lambda s: s
|
||||
m.__exit__ = MagicMock(return_value=False)
|
||||
return m
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# request_upload_url
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestRequestUploadUrl:
|
||||
def test_happy_path_posts_json_and_returns_dict(self):
|
||||
from hermes_cli.diagnostics_upload import request_upload_url
|
||||
|
||||
payload = {
|
||||
"success": True,
|
||||
"id": "abc-123",
|
||||
"uploadUrl": "https://bucket.s3.amazonaws.com/uploads/abc-123.json.gz?sig",
|
||||
"viewUrl": "https://support.example.com/diagnostics/abc-123",
|
||||
"uploadExpiresInSeconds": 900,
|
||||
}
|
||||
resp = _resp(status=200, body=json.dumps(payload).encode())
|
||||
|
||||
with patch(
|
||||
"hermes_cli.diagnostics_upload.urllib.request.urlopen",
|
||||
return_value=resp,
|
||||
) as urlopen:
|
||||
result = request_upload_url(content_type="application/gzip", size_bytes=512)
|
||||
|
||||
assert result == payload
|
||||
|
||||
# The request object passed to urlopen carries our JSON body + headers.
|
||||
req = urlopen.call_args[0][0]
|
||||
assert req.method == "POST"
|
||||
assert req.full_url.endswith("/api/diagnostics/upload-url")
|
||||
sent = json.loads(req.data.decode())
|
||||
assert sent["contentType"] == "application/gzip"
|
||||
assert sent["sizeBytes"] == 512
|
||||
# urllib lower-cases header keys.
|
||||
assert req.headers["Content-type"] == "application/json"
|
||||
|
||||
def test_non_2xx_raises(self):
|
||||
from hermes_cli.diagnostics_upload import request_upload_url
|
||||
|
||||
resp = _resp(status=500, body=b"boom")
|
||||
with patch(
|
||||
"hermes_cli.diagnostics_upload.urllib.request.urlopen",
|
||||
return_value=resp,
|
||||
):
|
||||
with pytest.raises(RuntimeError):
|
||||
request_upload_url()
|
||||
|
||||
def test_missing_upload_url_raises(self):
|
||||
from hermes_cli.diagnostics_upload import request_upload_url
|
||||
|
||||
resp = _resp(status=200, body=json.dumps({"id": "x"}).encode())
|
||||
with patch(
|
||||
"hermes_cli.diagnostics_upload.urllib.request.urlopen",
|
||||
return_value=resp,
|
||||
):
|
||||
with pytest.raises(RuntimeError):
|
||||
request_upload_url()
|
||||
|
||||
def test_non_json_raises(self):
|
||||
from hermes_cli.diagnostics_upload import request_upload_url
|
||||
|
||||
resp = _resp(status=200, body=b"<html>not json</html>")
|
||||
with patch(
|
||||
"hermes_cli.diagnostics_upload.urllib.request.urlopen",
|
||||
return_value=resp,
|
||||
):
|
||||
with pytest.raises(RuntimeError):
|
||||
request_upload_url()
|
||||
|
||||
def test_base_url_env_override(self, monkeypatch):
|
||||
# NAS_BASE is read at import time; re-import the module under the
|
||||
# patched env to confirm the override is honoured.
|
||||
import importlib
|
||||
|
||||
monkeypatch.setenv("HERMES_DIAGNOSTICS_BASE_URL", "https://staging.example.com")
|
||||
import hermes_cli.diagnostics_upload as mod
|
||||
|
||||
mod = importlib.reload(mod)
|
||||
try:
|
||||
assert mod.NAS_BASE == "https://staging.example.com"
|
||||
resp = _resp(
|
||||
status=200,
|
||||
body=json.dumps({"uploadUrl": "u", "id": "i", "viewUrl": "v"}).encode(),
|
||||
)
|
||||
with patch(
|
||||
"hermes_cli.diagnostics_upload.urllib.request.urlopen",
|
||||
return_value=resp,
|
||||
) as urlopen:
|
||||
mod.request_upload_url()
|
||||
req = urlopen.call_args[0][0]
|
||||
assert req.full_url == "https://staging.example.com/api/diagnostics/upload-url"
|
||||
finally:
|
||||
monkeypatch.delenv("HERMES_DIAGNOSTICS_BASE_URL", raising=False)
|
||||
importlib.reload(mod)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# put_bundle
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestPutBundle:
|
||||
def test_put_sends_exact_body_and_content_type(self):
|
||||
from hermes_cli.diagnostics_upload import put_bundle
|
||||
|
||||
data = b"\x1f\x8b\x08gzipped-bytes"
|
||||
resp = _resp(status=200, body=b"")
|
||||
|
||||
with patch(
|
||||
"hermes_cli.diagnostics_upload.urllib.request.urlopen",
|
||||
return_value=resp,
|
||||
) as urlopen:
|
||||
put_bundle("https://bucket.s3.amazonaws.com/uploads/x.json.gz?sig", data)
|
||||
|
||||
req = urlopen.call_args[0][0]
|
||||
assert req.method == "PUT"
|
||||
# PUT body must be the bundle bytes, unchanged.
|
||||
assert req.data == data
|
||||
assert req.headers["Content-type"] == "application/gzip"
|
||||
|
||||
def test_custom_content_type(self):
|
||||
from hermes_cli.diagnostics_upload import put_bundle
|
||||
|
||||
resp = _resp(status=204, body=b"")
|
||||
with patch(
|
||||
"hermes_cli.diagnostics_upload.urllib.request.urlopen",
|
||||
return_value=resp,
|
||||
) as urlopen:
|
||||
put_bundle("https://u", b"data", content_type="application/json")
|
||||
req = urlopen.call_args[0][0]
|
||||
assert req.headers["Content-type"] == "application/json"
|
||||
|
||||
def test_non_2xx_raises(self):
|
||||
from hermes_cli.diagnostics_upload import put_bundle
|
||||
|
||||
resp = _resp(status=403, body=b"AccessDenied")
|
||||
with patch(
|
||||
"hermes_cli.diagnostics_upload.urllib.request.urlopen",
|
||||
return_value=resp,
|
||||
):
|
||||
with pytest.raises(RuntimeError):
|
||||
put_bundle("https://u", b"data")
|
||||
|
||||
def test_http_error_propagates(self):
|
||||
from hermes_cli.diagnostics_upload import put_bundle
|
||||
|
||||
err = urllib.error.HTTPError("https://u", 500, "err", {}, io.BytesIO(b""))
|
||||
with patch(
|
||||
"hermes_cli.diagnostics_upload.urllib.request.urlopen",
|
||||
side_effect=err,
|
||||
):
|
||||
with pytest.raises(urllib.error.HTTPError):
|
||||
put_bundle("https://u", b"data")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# share_to_nous (orchestration)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestShareToNous:
|
||||
def test_orchestrates_request_then_put(self):
|
||||
from hermes_cli import diagnostics_upload as mod
|
||||
|
||||
info = {
|
||||
"id": "id-9",
|
||||
"uploadUrl": "https://bucket/uploads/id-9.json.gz?sig",
|
||||
"viewUrl": "https://support/diagnostics/id-9",
|
||||
"expiresAt": "2026-06-20T00:00:00Z",
|
||||
}
|
||||
blob = b"gzipped-bundle"
|
||||
|
||||
with patch.object(mod, "request_upload_url", return_value=info) as req, \
|
||||
patch.object(mod, "put_bundle") as put:
|
||||
result = mod.share_to_nous(blob)
|
||||
|
||||
assert result == info
|
||||
req.assert_called_once()
|
||||
# request was told the real byte size (NAS signs it into ContentLength)
|
||||
assert req.call_args.kwargs["size_bytes"] == len(blob)
|
||||
# PUT got the signed URL + the exact blob
|
||||
put.assert_called_once_with(
|
||||
info["uploadUrl"], blob, content_type="application/gzip"
|
||||
)
|
||||
|
||||
def test_put_failure_propagates(self):
|
||||
from hermes_cli import diagnostics_upload as mod
|
||||
|
||||
info = {"id": "id-9", "uploadUrl": "https://u", "viewUrl": "v"}
|
||||
with patch.object(mod, "request_upload_url", return_value=info), \
|
||||
patch.object(mod, "put_bundle", side_effect=RuntimeError("PUT failed")):
|
||||
with pytest.raises(RuntimeError):
|
||||
mod.share_to_nous(b"data")
|
||||
|
||||
def test_share_succeeds_without_id_in_response(self):
|
||||
from hermes_cli import diagnostics_upload as mod
|
||||
|
||||
# NAS is stateless and there is no confirm step, so the share must
|
||||
# succeed regardless of whether the response carries an ``id``.
|
||||
info = {"uploadUrl": "https://u", "viewUrl": "v"} # no id
|
||||
with patch.object(mod, "request_upload_url", return_value=info), \
|
||||
patch.object(mod, "put_bundle") as put:
|
||||
result = mod.share_to_nous(b"data")
|
||||
assert result == info
|
||||
put.assert_called_once()
|
||||
@@ -540,54 +540,6 @@ def test_run_doctor_accepts_hermes_provider_ids_that_catalog_aliases(
|
||||
)
|
||||
|
||||
|
||||
def test_run_doctor_accepts_vendor_slugs_for_named_custom_provider(monkeypatch, tmp_path):
|
||||
home = tmp_path / ".hermes"
|
||||
home.mkdir(parents=True, exist_ok=True)
|
||||
(home / "config.yaml").write_text(
|
||||
"model:\n"
|
||||
" provider: custom:hpc-ai\n"
|
||||
" default: deepseek/deepseek-v4-flash\n"
|
||||
"custom_providers:\n"
|
||||
" - name: hpc-ai\n"
|
||||
" base_url: https://hpc-ai.example/v1\n"
|
||||
" api_key: test-key\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
monkeypatch.setattr(doctor_mod, "HERMES_HOME", home)
|
||||
monkeypatch.setattr(doctor_mod, "PROJECT_ROOT", tmp_path / "project")
|
||||
monkeypatch.setattr(doctor_mod, "_DHH", str(home))
|
||||
(tmp_path / "project").mkdir(exist_ok=True)
|
||||
|
||||
fake_model_tools = types.SimpleNamespace(
|
||||
check_tool_availability=lambda *a, **kw: ([], []),
|
||||
TOOLSET_REQUIREMENTS={},
|
||||
)
|
||||
monkeypatch.setitem(sys.modules, "model_tools", fake_model_tools)
|
||||
|
||||
try:
|
||||
from hermes_cli import auth as _auth_mod
|
||||
monkeypatch.setattr(_auth_mod, "get_nous_auth_status", lambda: {})
|
||||
monkeypatch.setattr(_auth_mod, "get_codex_auth_status", lambda: {})
|
||||
monkeypatch.setattr(_auth_mod, "get_xai_oauth_auth_status", lambda: {})
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
buf = io.StringIO()
|
||||
with contextlib.redirect_stdout(buf):
|
||||
doctor_mod.run_doctor(Namespace(fix=False))
|
||||
|
||||
out = buf.getvalue()
|
||||
assert "model.provider 'custom:hpc-ai' is not a recognised provider" not in out
|
||||
assert "model.provider 'custom:hpc-ai' is unknown" not in out
|
||||
assert (
|
||||
"model.default 'deepseek/deepseek-v4-flash' uses a vendor/model slug but provider is "
|
||||
"'custom:hpc-ai'"
|
||||
not in out
|
||||
)
|
||||
assert "Either set model.provider to 'openrouter', or drop the vendor prefix." not in out
|
||||
|
||||
|
||||
|
||||
|
||||
def test_run_doctor_accepts_kimi_coding_cn_provider(monkeypatch, tmp_path):
|
||||
|
||||
@@ -65,15 +65,6 @@ def test_resolve_provider_full_finds_named_custom_provider():
|
||||
assert resolved.source == "user-config"
|
||||
|
||||
|
||||
def test_is_aggregator_recognizes_named_custom_provider():
|
||||
assert providers_mod.is_aggregator("custom:hpc-ai") is True
|
||||
assert providers_mod.is_aggregator("custom:litellm") is True
|
||||
|
||||
|
||||
def test_is_aggregator_leaves_unknown_provider_non_aggregator():
|
||||
assert providers_mod.is_aggregator("not-a-provider") is False
|
||||
|
||||
|
||||
def test_switch_model_accepts_explicit_named_custom_provider(monkeypatch):
|
||||
"""Shared /model switch pipeline should accept --provider for custom_providers."""
|
||||
monkeypatch.setattr(
|
||||
|
||||
@@ -3,6 +3,8 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
@@ -16,6 +18,7 @@ from hermes_cli.plugins_cmd import (
|
||||
_repo_name_from_url,
|
||||
_resolve_git_executable,
|
||||
_resolve_git_url,
|
||||
_resolve_subdir_within,
|
||||
_sanitize_plugin_name,
|
||||
)
|
||||
|
||||
@@ -97,35 +100,127 @@ class TestSanitizePluginName:
|
||||
|
||||
|
||||
class TestResolveGitUrl:
|
||||
"""Shorthand and full-URL resolution."""
|
||||
"""Shorthand and full-URL resolution, with optional subdirectory."""
|
||||
|
||||
def test_owner_repo_shorthand(self):
|
||||
url = _resolve_git_url("owner/repo")
|
||||
url, subdir = _resolve_git_url("owner/repo")
|
||||
assert url == "https://github.com/owner/repo.git"
|
||||
assert subdir is None
|
||||
|
||||
def test_https_url_passthrough(self):
|
||||
url = _resolve_git_url("https://github.com/x/y.git")
|
||||
url, subdir = _resolve_git_url("https://github.com/x/y.git")
|
||||
assert url == "https://github.com/x/y.git"
|
||||
assert subdir is None
|
||||
|
||||
def test_ssh_url_passthrough(self):
|
||||
url = _resolve_git_url("git@github.com:x/y.git")
|
||||
url, subdir = _resolve_git_url("git@github.com:x/y.git")
|
||||
assert url == "git@github.com:x/y.git"
|
||||
assert subdir is None
|
||||
|
||||
def test_http_url_passthrough(self):
|
||||
url = _resolve_git_url("http://example.com/repo.git")
|
||||
url, subdir = _resolve_git_url("http://example.com/repo.git")
|
||||
assert url == "http://example.com/repo.git"
|
||||
assert subdir is None
|
||||
|
||||
def test_file_url_passthrough(self):
|
||||
url = _resolve_git_url("file:///tmp/repo")
|
||||
url, subdir = _resolve_git_url("file:///tmp/repo")
|
||||
assert url == "file:///tmp/repo"
|
||||
assert subdir is None
|
||||
|
||||
def test_invalid_single_word_raises(self):
|
||||
with pytest.raises(ValueError, match="Invalid plugin identifier"):
|
||||
_resolve_git_url("justoneword")
|
||||
|
||||
def test_invalid_three_parts_raises(self):
|
||||
with pytest.raises(ValueError, match="Invalid plugin identifier"):
|
||||
_resolve_git_url("a/b/c")
|
||||
def test_shorthand_with_subdir(self):
|
||||
url, subdir = _resolve_git_url("owner/repo/my-plugin")
|
||||
assert url == "https://github.com/owner/repo.git"
|
||||
assert subdir == "my-plugin"
|
||||
|
||||
def test_shorthand_with_nested_subdir(self):
|
||||
url, subdir = _resolve_git_url("owner/repo/path/to/plugin")
|
||||
assert url == "https://github.com/owner/repo.git"
|
||||
assert subdir == "path/to/plugin"
|
||||
|
||||
def test_shorthand_with_subdir_trailing_slash(self):
|
||||
url, subdir = _resolve_git_url("owner/repo/my-plugin/")
|
||||
assert url == "https://github.com/owner/repo.git"
|
||||
assert subdir == "my-plugin"
|
||||
|
||||
def test_https_url_with_subdir(self):
|
||||
url, subdir = _resolve_git_url("https://github.com/owner/repo.git/my-plugin")
|
||||
assert url == "https://github.com/owner/repo.git"
|
||||
assert subdir == "my-plugin"
|
||||
|
||||
def test_https_url_with_nested_subdir(self):
|
||||
url, subdir = _resolve_git_url(
|
||||
"https://github.com/owner/repo.git/path/to/plugin"
|
||||
)
|
||||
assert url == "https://github.com/owner/repo.git"
|
||||
assert subdir == "path/to/plugin"
|
||||
|
||||
def test_url_with_fragment_subdir(self):
|
||||
url, subdir = _resolve_git_url("https://github.com/owner/repo.git#my-plugin")
|
||||
assert url == "https://github.com/owner/repo.git"
|
||||
assert subdir == "my-plugin"
|
||||
|
||||
def test_file_url_with_fragment_subdir(self):
|
||||
url, subdir = _resolve_git_url("file:///tmp/repo#path/to/plugin")
|
||||
assert url == "file:///tmp/repo"
|
||||
assert subdir == "path/to/plugin"
|
||||
|
||||
def test_ssh_url_with_fragment_subdir(self):
|
||||
url, subdir = _resolve_git_url("git@github.com:owner/repo.git#sub")
|
||||
assert url == "git@github.com:owner/repo.git"
|
||||
assert subdir == "sub"
|
||||
|
||||
|
||||
# ── _resolve_subdir_within ──────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestResolveSubdirWithin:
|
||||
"""Subdirectory resolution stays within the clone and rejects traversal."""
|
||||
|
||||
def test_valid_subdir(self, tmp_path):
|
||||
(tmp_path / "my-plugin").mkdir()
|
||||
result = _resolve_subdir_within(tmp_path, "my-plugin")
|
||||
assert result == (tmp_path / "my-plugin").resolve()
|
||||
|
||||
def test_valid_nested_subdir(self, tmp_path):
|
||||
(tmp_path / "a" / "b" / "c").mkdir(parents=True)
|
||||
result = _resolve_subdir_within(tmp_path, "a/b/c")
|
||||
assert result == (tmp_path / "a" / "b" / "c").resolve()
|
||||
|
||||
def test_rejects_dot_dot_escape(self, tmp_path):
|
||||
clone = tmp_path / "clone"
|
||||
clone.mkdir()
|
||||
(tmp_path / "secret").mkdir()
|
||||
with pytest.raises(PluginOperationError, match="escapes the repository"):
|
||||
_resolve_subdir_within(clone, "../secret")
|
||||
|
||||
def test_rejects_absolute_path_escape(self, tmp_path):
|
||||
clone = tmp_path / "clone"
|
||||
clone.mkdir()
|
||||
# An absolute path resolves outside the clone root.
|
||||
with pytest.raises(PluginOperationError, match="escapes the repository"):
|
||||
_resolve_subdir_within(clone, "/etc")
|
||||
|
||||
def test_rejects_symlink_escape(self, tmp_path):
|
||||
clone = tmp_path / "clone"
|
||||
clone.mkdir()
|
||||
outside = tmp_path / "outside"
|
||||
outside.mkdir()
|
||||
(clone / "link").symlink_to(outside)
|
||||
with pytest.raises(PluginOperationError, match="escapes the repository"):
|
||||
_resolve_subdir_within(clone, "link")
|
||||
|
||||
def test_rejects_missing_subdir(self, tmp_path):
|
||||
with pytest.raises(PluginOperationError, match="does not exist"):
|
||||
_resolve_subdir_within(tmp_path, "nope")
|
||||
|
||||
def test_rejects_file_not_dir(self, tmp_path):
|
||||
(tmp_path / "afile").write_text("x")
|
||||
with pytest.raises(PluginOperationError, match="not a directory"):
|
||||
_resolve_subdir_within(tmp_path, "afile")
|
||||
|
||||
|
||||
# ── _resolve_git_executable ─────────────────────────────────────────────────
|
||||
@@ -698,3 +793,90 @@ class TestNoAutoActivation:
|
||||
# The old code had: "Even with default config, check if a plugin registered one"
|
||||
# The fix removes this. Verify it's gone.
|
||||
assert "Even with default config, check if a plugin registered one" not in source
|
||||
|
||||
|
||||
# ── End-to-end subdirectory install ──────────────────────────────────────────
|
||||
|
||||
|
||||
class TestSubdirInstallE2E:
|
||||
"""Install a plugin that lives in a subdirectory of a real local git repo."""
|
||||
|
||||
@staticmethod
|
||||
def _make_repo_with_subdir_plugin(repo_root: Path) -> None:
|
||||
"""Create a git repo where the plugin lives in ``./my-plugin/`` and the
|
||||
repo root holds unrelated docs/tests."""
|
||||
import subprocess as sp
|
||||
|
||||
repo_root.mkdir(parents=True, exist_ok=True)
|
||||
# Root-level noise: docs + tests that should NOT be installed.
|
||||
(repo_root / "README.md").write_text("# Monorepo docs\n")
|
||||
(repo_root / "tests").mkdir()
|
||||
(repo_root / "tests" / "test_x.py").write_text("def test_x():\n pass\n")
|
||||
# The actual plugin in a subdirectory.
|
||||
plugin_dir = repo_root / "my-plugin"
|
||||
plugin_dir.mkdir()
|
||||
(plugin_dir / "plugin.yaml").write_text(
|
||||
"name: my-plugin\nmanifest_version: 1\ndescription: A subdir plugin\n"
|
||||
)
|
||||
(plugin_dir / "__init__.py").write_text("# plugin entry\n")
|
||||
|
||||
env = {
|
||||
**os.environ,
|
||||
"GIT_AUTHOR_NAME": "t",
|
||||
"GIT_AUTHOR_EMAIL": "t@t",
|
||||
"GIT_COMMITTER_NAME": "t",
|
||||
"GIT_COMMITTER_EMAIL": "t@t",
|
||||
}
|
||||
sp.run(["git", "init", "-q"], cwd=repo_root, check=True, env=env)
|
||||
sp.run(["git", "add", "-A"], cwd=repo_root, check=True, env=env)
|
||||
sp.run(
|
||||
["git", "commit", "-q", "-m", "init"],
|
||||
cwd=repo_root,
|
||||
check=True,
|
||||
env=env,
|
||||
)
|
||||
|
||||
def test_installs_only_the_subdir_plugin(self, tmp_path, monkeypatch):
|
||||
if shutil.which("git") is None:
|
||||
pytest.skip("git not available")
|
||||
|
||||
from hermes_cli import plugins_cmd as pc
|
||||
|
||||
repo_root = tmp_path / "monorepo"
|
||||
self._make_repo_with_subdir_plugin(repo_root)
|
||||
|
||||
plugins_dir = tmp_path / "installed"
|
||||
plugins_dir.mkdir()
|
||||
monkeypatch.setattr(pc, "_plugins_dir", lambda: plugins_dir)
|
||||
|
||||
identifier = f"file://{repo_root}#my-plugin"
|
||||
target, manifest, name = pc._install_plugin_core(identifier, force=False)
|
||||
|
||||
# Installed under the plugin's own name, not the repo name.
|
||||
assert name == "my-plugin"
|
||||
assert manifest.get("name") == "my-plugin"
|
||||
assert target == (plugins_dir / "my-plugin").resolve()
|
||||
|
||||
# The plugin's files are present...
|
||||
assert (target / "plugin.yaml").exists()
|
||||
assert (target / "__init__.py").exists()
|
||||
# ...and the repo-root noise is NOT.
|
||||
assert not (target / "README.md").exists()
|
||||
assert not (target / "tests").exists()
|
||||
|
||||
def test_missing_subdir_raises(self, tmp_path, monkeypatch):
|
||||
if shutil.which("git") is None:
|
||||
pytest.skip("git not available")
|
||||
|
||||
from hermes_cli import plugins_cmd as pc
|
||||
|
||||
repo_root = tmp_path / "monorepo"
|
||||
self._make_repo_with_subdir_plugin(repo_root)
|
||||
|
||||
plugins_dir = tmp_path / "installed"
|
||||
plugins_dir.mkdir()
|
||||
monkeypatch.setattr(pc, "_plugins_dir", lambda: plugins_dir)
|
||||
|
||||
identifier = f"file://{repo_root}#does-not-exist"
|
||||
with pytest.raises(PluginOperationError, match="does not exist"):
|
||||
pc._install_plugin_core(identifier, force=False)
|
||||
|
||||
@@ -97,11 +97,7 @@ def test_request_device_code(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
assert code.user_code == "ABCD-1234"
|
||||
assert code.expires_in == 600
|
||||
assert "/api/auth/device/code" in captured["url"]
|
||||
# Hosted Photon allowlists registered device clients — an unregistered
|
||||
# client_id is rejected with 400 invalid_client. We use Photon's published
|
||||
# CLI device client and send the standard scope.
|
||||
assert captured["body"]["client_id"] == "photon-cli"
|
||||
assert captured["body"]["scope"] == "openid profile email"
|
||||
assert captured["body"]["client_id"] == "hermes-agent"
|
||||
|
||||
|
||||
def test_poll_for_token_via_header(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
@@ -285,108 +281,3 @@ def test_print_credential_summary_emits_only_display_strings(
|
||||
assert "proj-uuid" in blob # project id is intentionally surfaced
|
||||
# Header is always emitted
|
||||
assert any("Photon iMessage status" in line for line in lines)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Device-token candidate extraction + dashboard validation.
|
||||
|
||||
def test_device_response_candidates_covers_known_shapes() -> None:
|
||||
candidates = photon_auth._device_response_token_candidates(
|
||||
{
|
||||
"access_token": "tok-snake",
|
||||
"accessToken": "tok-camel",
|
||||
"data": {"access_token": "tok-data"},
|
||||
},
|
||||
headers={"set-auth-token": "Bearer tok-header"},
|
||||
)
|
||||
by_source = {c.source: c.token for c in candidates}
|
||||
assert by_source["access_token"] == "tok-snake"
|
||||
assert by_source["accessToken"] == "tok-camel"
|
||||
assert by_source["data.access_token"] == "tok-data"
|
||||
# "Bearer " prefix is stripped from the header value.
|
||||
assert by_source["set-auth-token"] == "tok-header"
|
||||
|
||||
|
||||
def test_device_response_candidates_dedupes() -> None:
|
||||
candidates = photon_auth._device_response_token_candidates(
|
||||
{"access_token": "same", "accessToken": "same"},
|
||||
)
|
||||
assert [c.token for c in candidates] == ["same"]
|
||||
|
||||
|
||||
def test_validate_photon_token_rejects_unrecognized_session(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
def fake_get(url: str, *, headers: Dict[str, str], timeout: float) -> _FakeResponse:
|
||||
if url.endswith("/api/auth/get-session"):
|
||||
return _FakeResponse(json_body={}) # no "user" key
|
||||
return _FakeResponse(json_body=[])
|
||||
|
||||
monkeypatch.setattr(photon_auth.httpx, "get", fake_get)
|
||||
with pytest.raises(photon_auth.PhotonDashboardAuthError):
|
||||
photon_auth.validate_photon_token("some-token")
|
||||
|
||||
|
||||
def test_validate_photon_token_rejects_project_api_denial(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
def fake_get(url: str, *, headers: Dict[str, str], timeout: float) -> _FakeResponse:
|
||||
if url.endswith("/api/auth/get-session"):
|
||||
return _FakeResponse(json_body={"user": {"id": "u1"}})
|
||||
return _FakeResponse(status=403) # project API rejects
|
||||
|
||||
monkeypatch.setattr(photon_auth.httpx, "get", fake_get)
|
||||
with pytest.raises(photon_auth.PhotonDashboardAuthError):
|
||||
photon_auth.validate_photon_token("some-token")
|
||||
|
||||
|
||||
def test_login_device_flow_validates_before_persisting(
|
||||
tmp_hermes_home: Path, monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
def fake_post(url: str, *, json: Dict[str, Any], timeout: float) -> _FakeResponse:
|
||||
if url.endswith("/api/auth/device/code"):
|
||||
return _FakeResponse(json_body={
|
||||
"device_code": "dev", "user_code": "AAAA",
|
||||
"verification_uri": "https://app.photon.codes/device",
|
||||
"verification_uri_complete": None,
|
||||
"expires_in": 600, "interval": 0,
|
||||
})
|
||||
# device/token approval
|
||||
return _FakeResponse(json_body={"access_token": "good-token"})
|
||||
|
||||
def fake_get(url: str, *, headers: Dict[str, str], timeout: float) -> _FakeResponse:
|
||||
if url.endswith("/api/auth/get-session"):
|
||||
return _FakeResponse(json_body={"user": {"id": "u1"}})
|
||||
return _FakeResponse(json_body=[]) # projects OK
|
||||
|
||||
monkeypatch.setattr(photon_auth.httpx, "post", fake_post)
|
||||
monkeypatch.setattr(photon_auth.httpx, "get", fake_get)
|
||||
|
||||
token = photon_auth.login_device_flow(open_browser=False)
|
||||
assert token == "good-token"
|
||||
assert photon_auth.load_photon_token() == "good-token"
|
||||
|
||||
|
||||
def test_login_device_flow_raises_when_token_invalid(
|
||||
tmp_hermes_home: Path, monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
def fake_post(url: str, *, json: Dict[str, Any], timeout: float) -> _FakeResponse:
|
||||
if url.endswith("/api/auth/device/code"):
|
||||
return _FakeResponse(json_body={
|
||||
"device_code": "dev", "user_code": "AAAA",
|
||||
"verification_uri": "https://app.photon.codes/device",
|
||||
"verification_uri_complete": None,
|
||||
"expires_in": 600, "interval": 0,
|
||||
})
|
||||
return _FakeResponse(json_body={"access_token": "bad-token"})
|
||||
|
||||
def fake_get(url: str, *, headers: Dict[str, str], timeout: float) -> _FakeResponse:
|
||||
return _FakeResponse(status=401) # session lookup rejects
|
||||
|
||||
monkeypatch.setattr(photon_auth.httpx, "post", fake_post)
|
||||
monkeypatch.setattr(photon_auth.httpx, "get", fake_get)
|
||||
|
||||
with pytest.raises(photon_auth.PhotonDashboardAuthError):
|
||||
photon_auth.login_device_flow(open_browser=False)
|
||||
# A token that failed validation must never be persisted.
|
||||
assert photon_auth.load_photon_token() is None
|
||||
|
||||
@@ -362,7 +362,7 @@ export const en: Translations = {
|
||||
inactive: "inactive",
|
||||
installBtn: "Install",
|
||||
installHeading: "Install from GitHub / Git URL",
|
||||
installHint: "Use owner/repo shorthand or a full https:// or git@ clone URL.",
|
||||
installHint: "Use owner/repo shorthand or a full https:// or git@ clone URL. For a plugin in a subdirectory, append the path: owner/repo/path/to/plugin (or <url>#path/to/plugin).",
|
||||
memoryProviderLabel: "Memory provider",
|
||||
missingEnvWarn: "Set these in Keys before the plugin can run:",
|
||||
noDashboardTab: "No dashboard tab",
|
||||
|
||||
@@ -240,7 +240,7 @@ export default function PluginsPage() {
|
||||
<Input
|
||||
className="font-mono-ui lowercase"
|
||||
id="install-url"
|
||||
placeholder="owner/repo or https://..."
|
||||
placeholder="owner/repo, owner/repo/subdir, or https://..."
|
||||
spellCheck={false}
|
||||
value={installId}
|
||||
onChange={(e) => setInstallId(e.target.value)}
|
||||
|
||||
Reference in New Issue
Block a user