mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-03 17:27:37 +08:00
Compare commits
1 Commits
fix/plugin
...
fix/tirith
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8fd60223b6 |
2
cli.py
2
cli.py
@@ -4797,7 +4797,7 @@ class HermesCLI:
|
|||||||
# Ensure tirith security scanner is available (downloads if needed)
|
# Ensure tirith security scanner is available (downloads if needed)
|
||||||
try:
|
try:
|
||||||
from tools.tirith_security import ensure_installed
|
from tools.tirith_security import ensure_installed
|
||||||
ensure_installed()
|
ensure_installed(log_failures=False)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass # Non-fatal — fail-open at scan time if unavailable
|
pass # Non-fatal — fail-open at scan time if unavailable
|
||||||
|
|
||||||
|
|||||||
@@ -305,7 +305,7 @@ class GatewayRunner:
|
|||||||
# Ensure tirith security scanner is available (downloads if needed)
|
# Ensure tirith security scanner is available (downloads if needed)
|
||||||
try:
|
try:
|
||||||
from tools.tirith_security import ensure_installed
|
from tools.tirith_security import ensure_installed
|
||||||
ensure_installed()
|
ensure_installed(log_failures=False)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass # Non-fatal — fail-open at scan time if unavailable
|
pass # Non-fatal — fail-open at scan time if unavailable
|
||||||
|
|
||||||
|
|||||||
@@ -315,6 +315,23 @@ class TestEnsureInstalled:
|
|||||||
mock_thread.start.assert_called_once()
|
mock_thread.start.assert_called_once()
|
||||||
_tirith_mod._resolved_path = None
|
_tirith_mod._resolved_path = None
|
||||||
|
|
||||||
|
@patch("tools.tirith_security._load_security_config")
|
||||||
|
def test_startup_prefetch_can_suppress_install_failure_logs(self, mock_cfg):
|
||||||
|
mock_cfg.return_value = {"tirith_enabled": True, "tirith_path": "tirith",
|
||||||
|
"tirith_timeout": 5, "tirith_fail_open": True}
|
||||||
|
_tirith_mod._resolved_path = None
|
||||||
|
with patch("tools.tirith_security.shutil.which", return_value=None), \
|
||||||
|
patch("tools.tirith_security._hermes_bin_dir", return_value="/nonexistent"), \
|
||||||
|
patch("tools.tirith_security._is_install_failed_on_disk", return_value=False), \
|
||||||
|
patch("tools.tirith_security.threading.Thread") as MockThread:
|
||||||
|
mock_thread = MagicMock()
|
||||||
|
MockThread.return_value = mock_thread
|
||||||
|
result = ensure_installed(log_failures=False)
|
||||||
|
assert result is None
|
||||||
|
assert MockThread.call_args.kwargs["kwargs"] == {"log_failures": False}
|
||||||
|
mock_thread.start.assert_called_once()
|
||||||
|
_tirith_mod._resolved_path = None
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Failed download caches the miss (Finding #1)
|
# Failed download caches the miss (Finding #1)
|
||||||
@@ -516,6 +533,22 @@ class TestCosignVerification:
|
|||||||
assert path is None
|
assert path is None
|
||||||
assert reason == "cosign_missing"
|
assert reason == "cosign_missing"
|
||||||
|
|
||||||
|
@patch("tools.tirith_security.logger.debug")
|
||||||
|
@patch("tools.tirith_security.logger.warning")
|
||||||
|
@patch("tools.tirith_security.shutil.which", return_value=None)
|
||||||
|
@patch("tools.tirith_security._download_file")
|
||||||
|
@patch("tools.tirith_security._detect_target", return_value="aarch64-apple-darwin")
|
||||||
|
def test_install_quiet_mode_downgrades_cosign_missing_log(self, mock_target, mock_dl,
|
||||||
|
mock_which, mock_warning,
|
||||||
|
mock_debug):
|
||||||
|
"""Startup prefetch should not surface cosign-missing as a warning."""
|
||||||
|
from tools.tirith_security import _install_tirith
|
||||||
|
path, reason = _install_tirith(log_failures=False)
|
||||||
|
assert path is None
|
||||||
|
assert reason == "cosign_missing"
|
||||||
|
mock_warning.assert_not_called()
|
||||||
|
mock_debug.assert_called()
|
||||||
|
|
||||||
@patch("tools.tirith_security._verify_cosign", return_value=None)
|
@patch("tools.tirith_security._verify_cosign", return_value=None)
|
||||||
@patch("tools.tirith_security.shutil.which", return_value="/usr/local/bin/cosign")
|
@patch("tools.tirith_security.shutil.which", return_value="/usr/local/bin/cosign")
|
||||||
@patch("tools.tirith_security._download_file")
|
@patch("tools.tirith_security._download_file")
|
||||||
|
|||||||
@@ -279,7 +279,7 @@ def _verify_checksum(archive_path: str, checksums_path: str, archive_name: str)
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def _install_tirith() -> tuple[str | None, str]:
|
def _install_tirith(*, log_failures: bool = True) -> tuple[str | None, str]:
|
||||||
"""Download and install tirith to $HERMES_HOME/bin/tirith.
|
"""Download and install tirith to $HERMES_HOME/bin/tirith.
|
||||||
|
|
||||||
Verifies provenance via cosign and SHA-256 checksum.
|
Verifies provenance via cosign and SHA-256 checksum.
|
||||||
@@ -287,6 +287,8 @@ def _install_tirith() -> tuple[str | None, str]:
|
|||||||
failure_reason is a short tag used by the disk marker to decide if the
|
failure_reason is a short tag used by the disk marker to decide if the
|
||||||
failure is retryable (e.g. "cosign_missing" clears when cosign appears).
|
failure is retryable (e.g. "cosign_missing" clears when cosign appears).
|
||||||
"""
|
"""
|
||||||
|
log = logger.warning if log_failures else logger.debug
|
||||||
|
|
||||||
target = _detect_target()
|
target = _detect_target()
|
||||||
if not target:
|
if not target:
|
||||||
logger.info("tirith auto-install: unsupported platform %s/%s",
|
logger.info("tirith auto-install: unsupported platform %s/%s",
|
||||||
@@ -309,7 +311,7 @@ def _install_tirith() -> tuple[str | None, str]:
|
|||||||
_download_file(f"{base_url}/{archive_name}", archive_path)
|
_download_file(f"{base_url}/{archive_name}", archive_path)
|
||||||
_download_file(f"{base_url}/checksums.txt", checksums_path)
|
_download_file(f"{base_url}/checksums.txt", checksums_path)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.warning("tirith download failed: %s", exc)
|
log("tirith download failed: %s", exc)
|
||||||
return None, "download_failed"
|
return None, "download_failed"
|
||||||
|
|
||||||
# Cosign provenance verification is mandatory for auto-install.
|
# Cosign provenance verification is mandatory for auto-install.
|
||||||
@@ -320,25 +322,25 @@ def _install_tirith() -> tuple[str | None, str]:
|
|||||||
_download_file(f"{base_url}/checksums.txt.sig", sig_path)
|
_download_file(f"{base_url}/checksums.txt.sig", sig_path)
|
||||||
_download_file(f"{base_url}/checksums.txt.pem", cert_path)
|
_download_file(f"{base_url}/checksums.txt.pem", cert_path)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.warning("tirith install skipped: cosign artifacts unavailable (%s). "
|
log("tirith install skipped: cosign artifacts unavailable (%s). "
|
||||||
"Install tirith manually or install cosign for auto-install.", exc)
|
"Install tirith manually or install cosign for auto-install.", exc)
|
||||||
return None, "cosign_artifacts_unavailable"
|
return None, "cosign_artifacts_unavailable"
|
||||||
|
|
||||||
# Check cosign availability before attempting verification so we can
|
# Check cosign availability before attempting verification so we can
|
||||||
# distinguish "not installed" (retryable) from "installed but broken."
|
# distinguish "not installed" (retryable) from "installed but broken."
|
||||||
if not shutil.which("cosign"):
|
if not shutil.which("cosign"):
|
||||||
logger.warning("tirith install skipped: cosign not found on PATH. "
|
log("tirith install skipped: cosign not found on PATH. "
|
||||||
"Install cosign for auto-install, or install tirith manually.")
|
"Install cosign for auto-install, or install tirith manually.")
|
||||||
return None, "cosign_missing"
|
return None, "cosign_missing"
|
||||||
|
|
||||||
cosign_result = _verify_cosign(checksums_path, sig_path, cert_path)
|
cosign_result = _verify_cosign(checksums_path, sig_path, cert_path)
|
||||||
if cosign_result is not True:
|
if cosign_result is not True:
|
||||||
# False = verification rejected, None = execution failure (timeout/OSError)
|
# False = verification rejected, None = execution failure (timeout/OSError)
|
||||||
if cosign_result is None:
|
if cosign_result is None:
|
||||||
logger.warning("tirith install aborted: cosign execution failed")
|
log("tirith install aborted: cosign execution failed")
|
||||||
return None, "cosign_exec_failed"
|
return None, "cosign_exec_failed"
|
||||||
else:
|
else:
|
||||||
logger.warning("tirith install aborted: cosign provenance verification failed")
|
log("tirith install aborted: cosign provenance verification failed")
|
||||||
return None, "cosign_verification_failed"
|
return None, "cosign_verification_failed"
|
||||||
|
|
||||||
if not _verify_checksum(archive_path, checksums_path, archive_name):
|
if not _verify_checksum(archive_path, checksums_path, archive_name):
|
||||||
@@ -354,7 +356,7 @@ def _install_tirith() -> tuple[str | None, str]:
|
|||||||
tar.extract(member, tmpdir)
|
tar.extract(member, tmpdir)
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
logger.warning("tirith binary not found in archive")
|
log("tirith binary not found in archive")
|
||||||
return None, "binary_not_in_archive"
|
return None, "binary_not_in_archive"
|
||||||
|
|
||||||
src = os.path.join(tmpdir, "tirith")
|
src = os.path.join(tmpdir, "tirith")
|
||||||
@@ -473,7 +475,7 @@ def _resolve_tirith_path(configured_path: str) -> str:
|
|||||||
return expanded
|
return expanded
|
||||||
|
|
||||||
|
|
||||||
def _background_install():
|
def _background_install(*, log_failures: bool = True):
|
||||||
"""Background thread target: download and install tirith."""
|
"""Background thread target: download and install tirith."""
|
||||||
global _resolved_path, _install_failure_reason
|
global _resolved_path, _install_failure_reason
|
||||||
with _install_lock:
|
with _install_lock:
|
||||||
@@ -494,7 +496,7 @@ def _background_install():
|
|||||||
_install_failure_reason = ""
|
_install_failure_reason = ""
|
||||||
return
|
return
|
||||||
|
|
||||||
installed, reason = _install_tirith()
|
installed, reason = _install_tirith(log_failures=log_failures)
|
||||||
if installed:
|
if installed:
|
||||||
_resolved_path = installed
|
_resolved_path = installed
|
||||||
_install_failure_reason = ""
|
_install_failure_reason = ""
|
||||||
@@ -505,7 +507,7 @@ def _background_install():
|
|||||||
_mark_install_failed(reason)
|
_mark_install_failed(reason)
|
||||||
|
|
||||||
|
|
||||||
def ensure_installed():
|
def ensure_installed(*, log_failures: bool = True):
|
||||||
"""Ensure tirith is available, downloading in background if needed.
|
"""Ensure tirith is available, downloading in background if needed.
|
||||||
|
|
||||||
Quick PATH/local checks are synchronous; network download runs in a
|
Quick PATH/local checks are synchronous; network download runs in a
|
||||||
@@ -578,7 +580,10 @@ def ensure_installed():
|
|||||||
# Need to download — launch background thread so startup doesn't block
|
# Need to download — launch background thread so startup doesn't block
|
||||||
if _install_thread is None or not _install_thread.is_alive():
|
if _install_thread is None or not _install_thread.is_alive():
|
||||||
_install_thread = threading.Thread(
|
_install_thread = threading.Thread(
|
||||||
target=_background_install, daemon=True)
|
target=_background_install,
|
||||||
|
kwargs={"log_failures": log_failures},
|
||||||
|
daemon=True,
|
||||||
|
)
|
||||||
_install_thread.start()
|
_install_thread.start()
|
||||||
|
|
||||||
return None # Not available yet; commands will fail-open until ready
|
return None # Not available yet; commands will fail-open until ready
|
||||||
|
|||||||
Reference in New Issue
Block a user