Files
hermes-agent/utils.py

240 lines
7.9 KiB
Python
Raw Normal View History

"""Shared utility functions for hermes-agent."""
import json
refactor: extract shared helpers to deduplicate repeated code patterns (#7917) * refactor: add shared helper modules for code deduplication New modules: - gateway/platforms/helpers.py: MessageDeduplicator, TextBatchAggregator, strip_markdown, ThreadParticipationTracker, redact_phone - hermes_cli/cli_output.py: print_info/success/warning/error, prompt helpers - tools/path_security.py: validate_within_dir, has_traversal_component - utils.py additions: safe_json_loads, read_json_file, read_jsonl, append_jsonl, env_str/lower/int/bool helpers - hermes_constants.py additions: get_config_path, get_skills_dir, get_logs_dir, get_env_path * refactor: migrate gateway adapters to shared helpers - MessageDeduplicator: discord, slack, dingtalk, wecom, weixin, mattermost - strip_markdown: bluebubbles, feishu, sms - redact_phone: sms, signal - ThreadParticipationTracker: discord, matrix - _acquire/_release_platform_lock: telegram, discord, slack, whatsapp, signal, weixin Net -316 lines across 19 files. * refactor: migrate CLI modules to shared helpers - tools_config.py: use cli_output print/prompt + curses_radiolist (-117 lines) - setup.py: use cli_output print helpers + curses_radiolist (-101 lines) - mcp_config.py: use cli_output prompt (-15 lines) - memory_setup.py: use curses_radiolist (-86 lines) Net -263 lines across 5 files. * refactor: migrate to shared utility helpers - safe_json_loads: agent/display.py (4 sites) - get_config_path: skill_utils.py, hermes_logging.py, hermes_time.py - get_skills_dir: skill_utils.py, prompt_builder.py - Token estimation dedup: skills_tool.py imports from model_metadata - Path security: skills_tool, cronjob_tools, skill_manager_tool, credential_files - Non-atomic YAML writes: doctor.py, config.py now use atomic_yaml_write - Platform dict: new platforms.py, skills_config + tools_config derive from it - Anthropic key: new get_anthropic_key() in auth.py, used by doctor/status/config/main * test: update tests for shared helper migrations - test_dingtalk: use _dedup.is_duplicate() instead of _is_duplicate() - test_mattermost: use _dedup instead of _seen_posts/_prune_seen - test_signal: import redact_phone from helpers instead of signal - test_discord_connect: _platform_lock_identity instead of _token_lock_identity - test_telegram_conflict: updated lock error message format - test_skill_manager_tool: 'escapes' instead of 'boundary' in error msgs
2026-04-11 13:59:52 -07:00
import logging
import os
import stat
import tempfile
from pathlib import Path
from typing import Any, Union
fix: extend hostname-match provider detection across remaining call sites Aslaaen's fix in the original PR covered _detect_api_mode_for_url and the two openai/xai sites in run_agent.py. This finishes the sweep: the same substring-match false-positive class (e.g. https://api.openai.com.evil/v1, https://proxy/api.openai.com/v1, https://api.anthropic.com.example/v1) existed in eight more call sites, and the hostname helper was duplicated in two modules. - utils: add shared base_url_hostname() (single source of truth). - hermes_cli/runtime_provider, run_agent: drop local duplicates, import from utils. Reuse the cached AIAgent._base_url_hostname attribute everywhere it's already populated. - agent/auxiliary_client: switch codex-wrap auto-detect, max_completion_tokens gate (auxiliary_max_tokens_param), and custom-endpoint max_tokens kwarg selection to hostname equality. - run_agent: native-anthropic check in the Claude-style model branch and in the AIAgent init provider-auto-detect branch. - agent/model_metadata: Anthropic /v1/models context-length lookup. - hermes_cli/providers.determine_api_mode: anthropic / openai URL heuristics for custom/unknown providers (the /anthropic path-suffix convention for third-party gateways is preserved). - tools/delegate_tool: anthropic detection for delegated subagent runtimes. - hermes_cli/setup, hermes_cli/tools_config: setup-wizard vision-endpoint native-OpenAI detection (paired with deduping the repeated check into a single is_native_openai boolean per branch). Tests: - tests/test_base_url_hostname.py covers the helper directly (path-containing-host, host-suffix, trailing dot, port, case). - tests/hermes_cli/test_determine_api_mode_hostname.py adds the same regression class for determine_api_mode, plus a test that the /anthropic third-party gateway convention still wins. Also: add asslaenn5@gmail.com → Aslaaen to scripts/release.py AUTHOR_MAP.
2026-04-20 20:58:01 -07:00
from urllib.parse import urlparse
import yaml
refactor: extract shared helpers to deduplicate repeated code patterns (#7917) * refactor: add shared helper modules for code deduplication New modules: - gateway/platforms/helpers.py: MessageDeduplicator, TextBatchAggregator, strip_markdown, ThreadParticipationTracker, redact_phone - hermes_cli/cli_output.py: print_info/success/warning/error, prompt helpers - tools/path_security.py: validate_within_dir, has_traversal_component - utils.py additions: safe_json_loads, read_json_file, read_jsonl, append_jsonl, env_str/lower/int/bool helpers - hermes_constants.py additions: get_config_path, get_skills_dir, get_logs_dir, get_env_path * refactor: migrate gateway adapters to shared helpers - MessageDeduplicator: discord, slack, dingtalk, wecom, weixin, mattermost - strip_markdown: bluebubbles, feishu, sms - redact_phone: sms, signal - ThreadParticipationTracker: discord, matrix - _acquire/_release_platform_lock: telegram, discord, slack, whatsapp, signal, weixin Net -316 lines across 19 files. * refactor: migrate CLI modules to shared helpers - tools_config.py: use cli_output print/prompt + curses_radiolist (-117 lines) - setup.py: use cli_output print helpers + curses_radiolist (-101 lines) - mcp_config.py: use cli_output prompt (-15 lines) - memory_setup.py: use curses_radiolist (-86 lines) Net -263 lines across 5 files. * refactor: migrate to shared utility helpers - safe_json_loads: agent/display.py (4 sites) - get_config_path: skill_utils.py, hermes_logging.py, hermes_time.py - get_skills_dir: skill_utils.py, prompt_builder.py - Token estimation dedup: skills_tool.py imports from model_metadata - Path security: skills_tool, cronjob_tools, skill_manager_tool, credential_files - Non-atomic YAML writes: doctor.py, config.py now use atomic_yaml_write - Platform dict: new platforms.py, skills_config + tools_config derive from it - Anthropic key: new get_anthropic_key() in auth.py, used by doctor/status/config/main * test: update tests for shared helper migrations - test_dingtalk: use _dedup.is_duplicate() instead of _is_duplicate() - test_mattermost: use _dedup instead of _seen_posts/_prune_seen - test_signal: import redact_phone from helpers instead of signal - test_discord_connect: _platform_lock_identity instead of _token_lock_identity - test_telegram_conflict: updated lock error message format - test_skill_manager_tool: 'escapes' instead of 'boundary' in error msgs
2026-04-11 13:59:52 -07:00
logger = logging.getLogger(__name__)
TRUTHY_STRINGS = frozenset({"1", "true", "yes", "on"})
def is_truthy_value(value: Any, default: bool = False) -> bool:
"""Coerce bool-ish values using the project's shared truthy string set."""
if value is None:
return default
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.strip().lower() in TRUTHY_STRINGS
return bool(value)
def env_var_enabled(name: str, default: str = "") -> bool:
"""Return True when an environment variable is set to a truthy value."""
return is_truthy_value(os.getenv(name, default), default=False)
def _preserve_file_mode(path: Path) -> "int | None":
"""Capture the permission bits of *path* if it exists, else ``None``."""
try:
return stat.S_IMODE(path.stat().st_mode) if path.exists() else None
except OSError:
return None
def _restore_file_mode(path: Path, mode: "int | None") -> None:
"""Re-apply *mode* to *path* after an atomic replace.
``tempfile.mkstemp`` creates files with 0o600 (owner-only). After
``os.replace`` swaps the temp file into place the target inherits
those restrictive permissions, breaking Docker / NAS volume mounts
that rely on broader permissions set by the user. Calling this
right after ``os.replace`` restores the original permissions.
"""
if mode is None:
return
try:
os.chmod(path, mode)
except OSError:
pass
def atomic_json_write(
path: Union[str, Path],
data: Any,
*,
indent: int = 2,
**dump_kwargs: Any,
) -> None:
"""Write JSON data to a file atomically.
Uses temp file + fsync + os.replace to ensure the target file is never
left in a partially-written state. If the process crashes mid-write,
the previous version of the file remains intact.
Args:
path: Target file path (will be created or overwritten).
data: JSON-serializable data to write.
indent: JSON indentation (default 2).
**dump_kwargs: Additional keyword args forwarded to json.dump(), such
as default=str for non-native types.
"""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
original_mode = _preserve_file_mode(path)
fd, tmp_path = tempfile.mkstemp(
dir=str(path.parent),
prefix=f".{path.stem}_",
suffix=".tmp",
)
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(
data,
f,
indent=indent,
ensure_ascii=False,
**dump_kwargs,
)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, path)
_restore_file_mode(path, original_mode)
except BaseException:
# Intentionally catch BaseException so temp-file cleanup still runs for
# KeyboardInterrupt/SystemExit before re-raising the original signal.
try:
os.unlink(tmp_path)
except OSError:
pass
raise
def atomic_yaml_write(
path: Union[str, Path],
data: Any,
*,
default_flow_style: bool = False,
sort_keys: bool = False,
extra_content: str | None = None,
) -> None:
"""Write YAML data to a file atomically.
Uses temp file + fsync + os.replace to ensure the target file is never
left in a partially-written state. If the process crashes mid-write,
the previous version of the file remains intact.
Args:
path: Target file path (will be created or overwritten).
data: YAML-serializable data to write.
default_flow_style: YAML flow style (default False).
sort_keys: Whether to sort dict keys (default False).
extra_content: Optional string to append after the YAML dump
(e.g. commented-out sections for user reference).
"""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
original_mode = _preserve_file_mode(path)
fd, tmp_path = tempfile.mkstemp(
dir=str(path.parent),
prefix=f".{path.stem}_",
suffix=".tmp",
)
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
yaml.dump(data, f, default_flow_style=default_flow_style, sort_keys=sort_keys)
if extra_content:
f.write(extra_content)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, path)
_restore_file_mode(path, original_mode)
except BaseException:
# Match atomic_json_write: cleanup must also happen for process-level
# interruptions before we re-raise them.
try:
os.unlink(tmp_path)
except OSError:
pass
raise
refactor: extract shared helpers to deduplicate repeated code patterns (#7917) * refactor: add shared helper modules for code deduplication New modules: - gateway/platforms/helpers.py: MessageDeduplicator, TextBatchAggregator, strip_markdown, ThreadParticipationTracker, redact_phone - hermes_cli/cli_output.py: print_info/success/warning/error, prompt helpers - tools/path_security.py: validate_within_dir, has_traversal_component - utils.py additions: safe_json_loads, read_json_file, read_jsonl, append_jsonl, env_str/lower/int/bool helpers - hermes_constants.py additions: get_config_path, get_skills_dir, get_logs_dir, get_env_path * refactor: migrate gateway adapters to shared helpers - MessageDeduplicator: discord, slack, dingtalk, wecom, weixin, mattermost - strip_markdown: bluebubbles, feishu, sms - redact_phone: sms, signal - ThreadParticipationTracker: discord, matrix - _acquire/_release_platform_lock: telegram, discord, slack, whatsapp, signal, weixin Net -316 lines across 19 files. * refactor: migrate CLI modules to shared helpers - tools_config.py: use cli_output print/prompt + curses_radiolist (-117 lines) - setup.py: use cli_output print helpers + curses_radiolist (-101 lines) - mcp_config.py: use cli_output prompt (-15 lines) - memory_setup.py: use curses_radiolist (-86 lines) Net -263 lines across 5 files. * refactor: migrate to shared utility helpers - safe_json_loads: agent/display.py (4 sites) - get_config_path: skill_utils.py, hermes_logging.py, hermes_time.py - get_skills_dir: skill_utils.py, prompt_builder.py - Token estimation dedup: skills_tool.py imports from model_metadata - Path security: skills_tool, cronjob_tools, skill_manager_tool, credential_files - Non-atomic YAML writes: doctor.py, config.py now use atomic_yaml_write - Platform dict: new platforms.py, skills_config + tools_config derive from it - Anthropic key: new get_anthropic_key() in auth.py, used by doctor/status/config/main * test: update tests for shared helper migrations - test_dingtalk: use _dedup.is_duplicate() instead of _is_duplicate() - test_mattermost: use _dedup instead of _seen_posts/_prune_seen - test_signal: import redact_phone from helpers instead of signal - test_discord_connect: _platform_lock_identity instead of _token_lock_identity - test_telegram_conflict: updated lock error message format - test_skill_manager_tool: 'escapes' instead of 'boundary' in error msgs
2026-04-11 13:59:52 -07:00
# ─── JSON Helpers ─────────────────────────────────────────────────────────────
def safe_json_loads(text: str, default: Any = None) -> Any:
"""Parse JSON, returning *default* on any parse error.
Replaces the ``try: json.loads(x) except (JSONDecodeError, TypeError)``
pattern duplicated across display.py, anthropic_adapter.py,
auxiliary_client.py, and others.
"""
try:
return json.loads(text)
except (json.JSONDecodeError, TypeError, ValueError):
return default
# ─── Environment Variable Helpers ─────────────────────────────────────────────
def env_int(key: str, default: int = 0) -> int:
"""Read an environment variable as an integer, with fallback."""
raw = os.getenv(key, "").strip()
if not raw:
return default
try:
return int(raw)
except (ValueError, TypeError):
return default
def env_bool(key: str, default: bool = False) -> bool:
"""Read an environment variable as a boolean."""
return is_truthy_value(os.getenv(key, ""), default=default)
fix: extend hostname-match provider detection across remaining call sites Aslaaen's fix in the original PR covered _detect_api_mode_for_url and the two openai/xai sites in run_agent.py. This finishes the sweep: the same substring-match false-positive class (e.g. https://api.openai.com.evil/v1, https://proxy/api.openai.com/v1, https://api.anthropic.com.example/v1) existed in eight more call sites, and the hostname helper was duplicated in two modules. - utils: add shared base_url_hostname() (single source of truth). - hermes_cli/runtime_provider, run_agent: drop local duplicates, import from utils. Reuse the cached AIAgent._base_url_hostname attribute everywhere it's already populated. - agent/auxiliary_client: switch codex-wrap auto-detect, max_completion_tokens gate (auxiliary_max_tokens_param), and custom-endpoint max_tokens kwarg selection to hostname equality. - run_agent: native-anthropic check in the Claude-style model branch and in the AIAgent init provider-auto-detect branch. - agent/model_metadata: Anthropic /v1/models context-length lookup. - hermes_cli/providers.determine_api_mode: anthropic / openai URL heuristics for custom/unknown providers (the /anthropic path-suffix convention for third-party gateways is preserved). - tools/delegate_tool: anthropic detection for delegated subagent runtimes. - hermes_cli/setup, hermes_cli/tools_config: setup-wizard vision-endpoint native-OpenAI detection (paired with deduping the repeated check into a single is_native_openai boolean per branch). Tests: - tests/test_base_url_hostname.py covers the helper directly (path-containing-host, host-suffix, trailing dot, port, case). - tests/hermes_cli/test_determine_api_mode_hostname.py adds the same regression class for determine_api_mode, plus a test that the /anthropic third-party gateway convention still wins. Also: add asslaenn5@gmail.com → Aslaaen to scripts/release.py AUTHOR_MAP.
2026-04-20 20:58:01 -07:00
# ─── URL Parsing Helpers ──────────────────────────────────────────────────────
def base_url_hostname(base_url: str) -> str:
"""Return the lowercased hostname for a base URL, or ``""`` if absent.
Use exact-hostname comparisons against known provider hosts
(``api.openai.com``, ``api.x.ai``, ``api.anthropic.com``) instead of
substring matches on the raw URL. Substring checks treat attacker- or
proxy-controlled paths/hosts like ``https://api.openai.com.example/v1``
or ``https://proxy.test/api.openai.com/v1`` as native endpoints, which
leads to wrong api_mode / auth routing.
"""
raw = (base_url or "").strip()
if not raw:
return ""
parsed = urlparse(raw if "://" in raw else f"//{raw}")
return (parsed.hostname or "").lower().rstrip(".")
fix: sweep remaining provider-URL substring checks across codebase Completes the hostname-hardening sweep — every substring check against a provider host in live-routing code is now hostname-based. This closes the same false-positive class for OpenRouter, GitHub Copilot, Kimi, Qwen, ChatGPT/Codex, Bedrock, GitHub Models, Vercel AI Gateway, Nous, Z.AI, Moonshot, Arcee, and MiniMax that the original PR closed for OpenAI, xAI, and Anthropic. New helper: - utils.base_url_host_matches(base_url, domain) — safe counterpart to 'domain in base_url'. Accepts hostname equality and subdomain matches; rejects path segments, host suffixes, and prefix collisions. Call sites converted (real-code only; tests, optional-skills, red-teaming scripts untouched): run_agent.py (10 sites): - AIAgent.__init__ Bedrock branch, ChatGPT/Codex branch (also path check) - header cascade for openrouter / copilot / kimi / qwen / chatgpt - interleaved-thinking trigger (openrouter + claude) - _is_openrouter_url(), _is_qwen_portal() - is_native_anthropic check - github-models-vs-copilot detection (3 sites) - reasoning-capable route gate (nousresearch, vercel, github) - codex-backend detection in API kwargs build - fallback api_mode Bedrock detection agent/auxiliary_client.py (7 sites): - extra-headers cascades in 4 distinct client-construction paths (resolve custom, resolve auto, OpenRouter-fallback-to-custom, _async_client_from_sync, resolve_provider_client explicit-custom, resolve_auto_with_codex) - _is_openrouter_client() base_url sniff agent/usage_pricing.py: - resolve_billing_route openrouter branch agent/model_metadata.py: - _is_openrouter_base_url(), Bedrock context-length lookup hermes_cli/providers.py: - determine_api_mode Bedrock heuristic hermes_cli/runtime_provider.py: - _is_openrouter_url flag for API-key preference (issues #420, #560) hermes_cli/doctor.py: - Kimi User-Agent header for /models probes tools/delegate_tool.py: - subagent Codex endpoint detection trajectory_compressor.py: - _detect_provider() cascade (8 providers: openrouter, nous, codex, zai, kimi-coding, arcee, minimax-cn, minimax) cli.py, gateway/run.py: - /model-switch cache-enabled hint (openrouter + claude) Bedrock detection tightened from 'bedrock-runtime in url' to 'hostname starts with bedrock-runtime. AND host is under amazonaws.com'. ChatGPT/Codex detection tightened from 'chatgpt.com/backend-api/codex in url' to 'hostname is chatgpt.com AND path contains /backend-api/codex'. Tests: - tests/test_base_url_hostname.py extended with a base_url_host_matches suite (exact match, subdomain, path-segment rejection, host-suffix rejection, host-prefix rejection, empty-input, case-insensitivity, trailing dot). Validation: 651 targeted tests pass (runtime_provider, minimax, bedrock, gemini, auxiliary, codex_cloudflare, usage_pricing, compressor_fallback, fallback_model, openai_client_lifecycle, provider_parity, cli_provider_resolution, delegate, credential_pool, context_compressor, plus the 4 hostname test modules). 26-assertion E2E call-site verification across 6 modules passes.
2026-04-20 21:17:28 -07:00
def base_url_host_matches(base_url: str, domain: str) -> bool:
"""Return True when the base URL's hostname is ``domain`` or a subdomain.
Safer counterpart to ``domain in base_url``, which is the substring
false-positive class documented on ``base_url_hostname``. Accepts bare
hosts, full URLs, and URLs with paths.
base_url_host_matches("https://api.moonshot.ai/v1", "moonshot.ai") == True
base_url_host_matches("https://moonshot.ai", "moonshot.ai") == True
base_url_host_matches("https://evil.com/moonshot.ai/v1", "moonshot.ai") == False
base_url_host_matches("https://moonshot.ai.evil/v1", "moonshot.ai") == False
"""
hostname = base_url_hostname(base_url)
if not hostname:
return False
domain = (domain or "").strip().lower().rstrip(".")
if not domain:
return False
return hostname == domain or hostname.endswith("." + domain)